-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: use another thread to write to mda zarr #258
Conversation
if (meta := sequence.metadata.get(SEQUENCE_META_KEY)) is not None: | ||
# TODO: make sure io thread finishes before saving | ||
# thread workers don't seem to have .join method? | ||
# self._io_t.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tlambert03 am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, QRunnables
are slightly more restricted than true QThreads. They don't have a join()
... but the worker objects do have an is_running()
method that you could wait on. They also emit a finished
signal that you could connect to a new method that does the saving.
create_worker(self._watch_mda, _start_thread=True, _connect={'finished': self._on_io_finished})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an advantage to QRunnables that comes with the restrictions? I'm not sure I fully grasp the breakdown of thread vs runnable
pass | ||
import time | ||
|
||
time.sleep(0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Make a real choice for this number. I chose this based on what was convenient for development. Ideally it would be also be adjustable - open to suggestions.
except IndexError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe sleep for an extra long time here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think something like this would work. doesn't need to be an extra long sleep, i think 100ms-200ms is a reasonable starting point (5-10 checks a second is no biggie). Also, add a yield to turn your worker into a Generator Worker (These are easier to kill from the main thread if need be, since each time they yield is an opportunity to communicate with it)
def _watch_mda(self) -> Generator[None, None, None]:
while self._mda_running:
if self._deck:
self._process_frame(*self._deck.pop())
else:
time.sleep(0.1)
yield
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome!! it's way better 🤩
happy to go with this and use it for a while,
@@ -72,6 +72,7 @@ def __init__(self, mmcore: CMMCorePlus, viewer: napari.viewer.Viewer) -> None: | |||
|
|||
# mapping of id -> (zarr.Array, temporary directory) for each layer created | |||
self._tmp_arrays: dict[str, tuple[zarr.Array, tempfile.TemporaryDirectory]] = {} | |||
self._deck: deque = deque() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._deck: deque = deque() | |
self._deck: Deque[tuple[np.ndarray, MDAEvent]] = Deque() |
... and then change above to from typing import Deque
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call. Also for the future we should go to just collections.deque (at least once napari-micro is python 3.9+ only)
https://docs.python.org/3/library/typing.html#typing.Deque
Deprecated since version 3.9: collections.deque now supports subscripting ([]). See PEP 585 and Generic Alias Type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, all the generic collections can be updated then. 3.8 end of life is Oct 2024
except IndexError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think something like this would work. doesn't need to be an extra long sleep, i think 100ms-200ms is a reasonable starting point (5-10 checks a second is no biggie). Also, add a yield to turn your worker into a Generator Worker (These are easier to kill from the main thread if need be, since each time they yield is an opportunity to communicate with it)
def _watch_mda(self) -> Generator[None, None, None]:
while self._mda_running:
if self._deck:
self._process_frame(*self._deck.pop())
else:
time.sleep(0.1)
yield
cs = list(self.viewer.dims.current_step) | ||
for a, v in enumerate(im_idx): | ||
cs[a] = v | ||
self._update_viewer_dims(cs, layer_name, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slightly wary of accessing self.viewer
from a thread. might want some locks in here. Also, I wonder it would be possible for the thing triggering _update_viewer_dims
to not be _process_frame
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one option here is to connect a method to the yielded
event that self._io_t
will emit (provided you make _watch_mda
a generator). Then you could yield a (cs, layer_name, event)
tuple back to the main thread.
I don't know though... it's possible that ensure_main_thread
is equally effective and efficient here. But that would be a different option if we ever need to try it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just moving that access in to the function that has ensure_main_thread
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, and then it would just be self._update_viewer_dims(layer_name, event)
? if that works too, I like that
if (meta := sequence.metadata.get(SEQUENCE_META_KEY)) is not None: | ||
# TODO: make sure io thread finishes before saving | ||
# thread workers don't seem to have .join method? | ||
# self._io_t.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, QRunnables
are slightly more restricted than true QThreads. They don't have a join()
... but the worker objects do have an is_running()
method that you could wait on. They also emit a finished
signal that you could connect to a new method that does the saving.
create_worker(self._watch_mda, _start_thread=True, _connect={'finished': self._on_io_finished})
Something seems to have gone very wrong in the explorer tests - they now hang forever :( |
working on rebasing. but running into what seem like some issues in pymmcore-widgets:
attn @fdrgsp |
@tlambert03 I believe I addressed all review comments - this should be ok. I went ahead with this because it was pretty close and is more self contained than #264 which has several unrelated changes making it trickier to parse through. If this one does get merged then I would very much support a follow up PR with performance enhancements (as isolated changes ideally). |
trying to get tests to run |
tests/test_layer_scale.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make sure we are not saving the files in this test, I would switch from
sequence = mda_sequence_splits.replace(axis_order=axis_order)
to
sequence = mda_sequence_splits.replace(
axis_order=axis_order,
metadata={SEQUENCE_META_KEY: SequenceMeta(should_save=False)},
)
@ianhi yes that's true sorry about that:). Maybe you can ignore everything else and just have a look at the one think I found useful and I would add is to specifying the chunk size when creating the zarr since it improves the image assignment when they are popped from the _deck (https://github.com/fdrgsp/napari-micromanager/blob/3256672406344d387f8151fcfbc4b1214e96fd19/src/napari_micromanager/_mda_handler.py#L128) I also added a comment about Anyway I think this PR is ok, I did not tested it on a real microscope yet but It's close to #264 which I did test. |
does #272 close this? |
superseded by #272 |
Fixes: #256
Untested because i'm not yet sure this is the fully correct way to do this. Would be great to get some eyes on this from @tlambert03 and @fdrgsp
This tries to do as much of the work of processing an MDA frame in a third thread as possible. I also wanted to ensure that the most recently captured image is the most recently displayed, so this end
deque
for its LIFO behavior and because it is higher performance than queue (https://bugs.python.org/issue15329#msg199368).Then when the worker thread has a chance it processes all the previous indices that it didn't get a chance to deal with before a new image came in. This is the source of the complexity of checking
_largest_idx
To user test: