Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DocumentRoom, throttle saves instead of debouncing saves #250

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
212 changes: 121 additions & 91 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from pycrdt import MapEvent
from pycrdt_websocket.websocket_server import YRoom
from pycrdt_websocket.ystore import BaseYStore, YDocNotFound

Expand Down Expand Up @@ -43,11 +44,27 @@ def __init__(
self._logger = logger
self._save_delay = save_delay

self._update_lock = asyncio.Lock()
self._initialization_lock = asyncio.Lock()
self._cleaner: asyncio.Task | None = None
self._saving_document: asyncio.Task | None = None
self._messages: dict[str, asyncio.Lock] = {}

# the current `self._maybe_save_document()` task.
self._maybe_save_task: asyncio.Task | None = None

# the task currently saving to disk via FileLoader, if any.
self._save_task: asyncio.Task | None = None

# flag that indicates whether a previous call to
# `self._maybe_save_document()` is waiting to save.
#
# if `self._maybe_save_document()` is called while this flag is `True`,
# then the call does nothing, as a previous task will save the Ydoc
# within `self._save_delay` seconds.
self._waiting_to_save = False

# flag that indicates whether a previous call to
# `self._maybe_save_document()` should call itself again after
# `self._save_task` finishes. this is set to `True` when a document
# update occurs while `self._save_task` is running.
self._should_resave = False

# Listen for document changes
self._document.observe(self._on_document_change)
Expand Down Expand Up @@ -78,7 +95,8 @@ async def initialize(self) -> None:
"""
Initializes the room.

This method is thread safe so only one client can initialize the room.
This method is not coroutine-safe, i.e. consumers must await this method
before calling any other methods.

To initialize the room, we check if the content was already in the store
as a Y updates and if it is up to date with the content on disk. In this
Expand All @@ -89,64 +107,54 @@ async def initialize(self) -> None:
It is important to set the ready property in the parent class (`self.ready = True`),
this setter will subscribe for updates on the shared document.
"""
async with self._initialization_lock:
if self.ready: # type: ignore[has-type]
return
if self.ready: # type: ignore[has-type]
return

self.log.info("Initializing room %s", self._room_id)
self.log.info("Initializing room %s", self._room_id)
model = await self._file.load_content(self._file_format, self._file_type)

model = await self._file.load_content(self._file_format, self._file_type)
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
f"Content loaded from the store {self.ystore.__class__.__qualname__}",
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass

if not read_from_source and self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore.")
self.log.info(
"Content in file %s is out-of-sync with the ystore %s",
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

# if YStore updates and source file are out-of-sync, resync updates with source
if read_from_source:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info("Content in room %s loaded from file %s", self._room_id, self._file.path)
self._document.source = model["content"]

async with self._update_lock:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass

if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
self.log.info(
"Content in file %s is out-of-sync with the ystore %s",
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

if read_from_source:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)

self._document.dirty = False
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")
if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)

self._document.dirty = False
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
Expand All @@ -165,8 +173,8 @@ def stop(self) -> None:
"""
super().stop()
# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
if self._save_task:
self._save_task.cancel()

self._document.unobserve()
self._file.unobserve(self.room_id)
Expand All @@ -193,9 +201,8 @@ async def _on_outofband_change(self) -> None:
self._emit(LogLevel.ERROR, None, msg)
return

async with self._update_lock:
self._document.source = model["content"]
self._document.dirty = False
self._document.source = model["content"]
self._document.dirty = False

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand All @@ -212,47 +219,72 @@ def _on_document_change(self, target: str, event: Any) -> None:
document. This tasks are debounced (60 seconds by default) so we
need to cancel previous tasks before creating a new one.
"""
if self._update_lock.locked():
if (
target == "state"
and isinstance(event, MapEvent)
and list(event.keys.keys()) == ["dirty"]
):
# do not write when we are just setting the `dirty` attribute to
# `False` for the JupyterLab UI. this prevents a save loop, as this
# is set when the Ydoc is saved.
return

self._saving_document = asyncio.create_task(
self._maybe_save_document(self._saving_document)
)
if self._maybe_save_task and not self._maybe_save_task.done():
# only one `self._maybe_save_task` needs to be running.
# if this method is called after the save delay, then we need to set
# `self._should_resave` to `True` to reschedule
# `self._maybe_save_document()` on the event loop after the current
# `self._maybe_save_task` completes.
if not self._waiting_to_save:
self._should_resave = True
return

self._maybe_save_task = asyncio.create_task(self._maybe_save_document())
self._maybe_save_task.add_done_callback(self._maybe_save_done_callback)

def _maybe_save_done_callback(self, _future):
if not self._should_resave:
return

self._should_resave = False
self._maybe_save_task = asyncio.create_task(self._maybe_save_document())
self._maybe_save_task.add_done_callback(self._maybe_save_done_callback)

async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None:
async def _maybe_save_document(self) -> None:
"""
Saves the content of the document to disk.

### Note:
There is a save delay to debounce the save since we could receive a high
There is a save delay to throttle the save since we could receive a high
amount of changes in a short period of time. This way we can cancel the
previous save.
"""
if self._save_delay is None:
# TODO: fix this; if _save_delay is unset, then this never writes to disk
return

if saving_document is not None and not saving_document.done():
# the document is being saved, cancel that
saving_document.cancel()
# save after `self._save_delay` seconds of inactivity
self._waiting_to_save = True
await asyncio.sleep(self._save_delay)
self._waiting_to_save = False

# all async code (i.e. await statements) must be part of this try/except block
# because this coroutine is run in a cancellable task and cancellation is handled here

try:
# save after X seconds of inactivity
await asyncio.sleep(self._save_delay)

# do not write to `self._document` in this `try` block, as that will
# trigger the observer and result in a save loop.
self.log.info("Saving the content from room %s", self._room_id)
await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"content": self._document.source,
}
self._save_task = asyncio.create_task(
self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"content": self._document.source,
}
)
)
async with self._update_lock:
self._document.dirty = False

await self._save_task
self._document.dirty = False
self._emit(LogLevel.INFO, "save", "Content saved.")

except asyncio.CancelledError:
Expand All @@ -268,10 +300,8 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._document.dirty = False

self._document.source = model["content"]
self._document.dirty = False
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")

except Exception as e:
Expand Down
Loading