Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19f14a9
create AsyncPeriodicExecutor
sleepyStick Oct 4, 2024
59c0733
fix tests
sleepyStick Oct 4, 2024
3f5dda7
undo lock changes
sleepyStick Oct 4, 2024
65301cf
found some more lock changes to undo and remove atexit for async
sleepyStick Oct 7, 2024
8852bc9
hacky fix to PeriodicExecutor being defined twice and fix unwanted co…
sleepyStick Oct 7, 2024
5c9b1d9
add missing awaits
sleepyStick Oct 7, 2024
ce322f5
attempt to fix test
sleepyStick Oct 7, 2024
549645b
left out an await
sleepyStick Oct 7, 2024
c92f6af
add missing await and fix typing errors
sleepyStick Oct 8, 2024
c58448f
realizing condition needs to be reverted too
sleepyStick Oct 8, 2024
43660d9
reset client context after primary steps down
sleepyStick Oct 9, 2024
dbb51e2
ignore type error on sync
sleepyStick Oct 9, 2024
3c640e8
temp changing pytests's setup and teardown are run to see if this wil…
sleepyStick Oct 9, 2024
0fc60ba
only reset client context after primary_stepdown
sleepyStick Oct 9, 2024
f78ee91
only recreate client context in async version
sleepyStick Oct 9, 2024
cebbd6f
move periodic_executor.py up a lvl
sleepyStick Oct 9, 2024
adf0504
mutate client context to reset it
sleepyStick Oct 10, 2024
56b37c2
remove type ignore
sleepyStick Oct 10, 2024
938c8a7
make PeriodicExecutor sync only
sleepyStick Oct 10, 2024
99c3815
fix reset_client_context
sleepyStick Oct 10, 2024
7ece812
change pytest fixture scope
sleepyStick Oct 10, 2024
3924d80
update AsyncPeriodicExecutors docs
sleepyStick Oct 10, 2024
9631ed6
fix scope typo
sleepyStick Oct 10, 2024
0aa9075
fix task/thread name and delete commented out code
sleepyStick Oct 10, 2024
6597803
fix string typo in synchro and comment string
sleepyStick Oct 10, 2024
541b72d
Merge remote-tracking branch 'upstream/async-improvements' into PYTHO…
sleepyStick Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@

from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry
from bson.timestamp import Timestamp
from pymongo import _csot, common, helpers_shared, uri_parser
from pymongo.asynchronous import client_session, database, periodic_executor
from pymongo import _csot, common, helpers_shared, periodic_executor, uri_parser
from pymongo.asynchronous import client_session, database
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
from pymongo.asynchronous.client_session import _EmptyServerSession
Expand Down Expand Up @@ -908,7 +908,7 @@ async def target() -> bool:
await AsyncMongoClient._process_periodic_tasks(client)
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.KILL_CURSOR_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down
20 changes: 10 additions & 10 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast

from pymongo import common
from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo.asynchronous import periodic_executor
from pymongo.asynchronous.periodic_executor import _shutdown_executors
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
from pymongo.periodic_executor import _shutdown_executors
from pymongo.pool_options import _is_faas
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
Expand Down Expand Up @@ -76,7 +75,7 @@ async def target() -> bool:
await monitor._run() # type:ignore[attr-defined]
return True

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=interval, min_interval=min_interval, target=target, name=name
)

Expand Down Expand Up @@ -112,9 +111,9 @@ async def close(self) -> None:
"""
self.gc_safe_close()

def join(self, timeout: Optional[int] = None) -> None:
async def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
self._executor.join(timeout)
await self._executor.join(timeout)

def request_check(self) -> None:
"""If the monitor is sleeping, wake it soon."""
Expand All @@ -139,7 +138,7 @@ def __init__(
"""
super().__init__(
topology,
"pymongo_server_monitor_thread",
"pymongo_server_monitor_task",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL,
)
Expand Down Expand Up @@ -250,7 +249,7 @@ async def _check_server(self) -> ServerDescription:
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
await self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
except ReferenceError:
raise
Expand Down Expand Up @@ -434,7 +433,7 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool
"""
super().__init__(
topology,
"pymongo_server_rtt_thread",
"pymongo_server_rtt_task",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL,
)
Expand Down Expand Up @@ -531,4 +530,5 @@ def _shutdown_resources() -> None:
shutdown()


atexit.register(_shutdown_resources)
if _IS_SYNC:
atexit.register(_shutdown_resources)
219 changes: 0 additions & 219 deletions pymongo/asynchronous/periodic_executor.py

This file was deleted.

7 changes: 3 additions & 4 deletions pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Mapping, Optional, cast

from pymongo import _csot, common, helpers_shared
from pymongo.asynchronous import periodic_executor
from pymongo import _csot, common, helpers_shared, periodic_executor
from pymongo.asynchronous.client_session import _ServerSession, _ServerSessionPool
from pymongo.asynchronous.monitor import SrvMonitor
from pymongo.asynchronous.pool import Pool
Expand Down Expand Up @@ -185,7 +184,7 @@ def __init__(self, topology_settings: TopologySettings):
async def target() -> bool:
return process_events_queue(weak)

executor = periodic_executor.PeriodicExecutor(
executor = periodic_executor.AsyncPeriodicExecutor(
interval=common.EVENTS_QUEUE_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
Expand Down Expand Up @@ -742,7 +741,7 @@ async def close(self) -> None:
if self._publish_server or self._publish_tp:
# Make sure the events executor thread is fully closed before publishing the remaining events
self.__events_executor.close()
self.__events_executor.join(1)
await self.__events_executor.join(1)
process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type]

@property
Expand Down
Loading
Loading