Skip to content

feature: Support streaming device queries#95

Merged
gilbert-sci merged 14 commits intomainfrom
gilbert/query-stream-impl
Apr 23, 2025
Merged

feature: Support streaming device queries#95
gilbert-sci merged 14 commits intomainfrom
gilbert/query-stream-impl

Conversation

@gilbert-sci
Copy link
Copy Markdown
Contributor

@gilbert-sci gilbert-sci commented Mar 31, 2025

Summary

Self tests and impedance queries are long running, which can be confusing or concerning for the users. We add the option to stream query requests from a synapse device, which allows us to monitor the progress in real time. This PR updates the Python Client to reflect these changes.

  • Updated Synapse API
  • Added stream option to query cli
  • StreamingQueryClient a class that wraps the two supported query requests in progress indicators
  • Hooks up log output if --verbose is set.

Testing

  • Hooked device up to a peripheral, ran impedance and self test
python3 synapse/cli/__main__.py --uri <device_uri> query test_query.json --stream --verbose

Comment thread synapse/cli/rpc.py
Comment thread synapse/client/query.py
Comment thread synapse/client/query.py Outdated
@gilbert-sci
Copy link
Copy Markdown
Contributor Author

Ping on this if anyone has time to review and or test.

Copy link
Copy Markdown
Contributor

@polymerizedsage polymerizedsage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could successfully run a full streaming query after installing matplotlib. However if I ctrl + C during a query I get the following stacktrace, and no output file is saved:

Traceback (most recent call last):
  File "/home/sage/synapse-client-python/.venv/bin/synapsectl", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/__main__.py", line 74, in main
    args.func(args)
  File "/home/sage/synapse-client-python/synapse/cli/rpc.py", line 112, in query
    return client.stream_query(StreamQueryRequest(request=query_proto))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 62, in stream_query
    return self.handle_impedance_stream(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 186, in handle_impedance_stream
    for response in self.device.stream_query(request):
  File "/home/sage/synapse-client-python/synapse/client/device.py", line 179, in stream_query
    for response in self.rpc.StreamQuery(stream_request):
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 960, in _next
    _common.wait(self._state.condition.wait, _response_ready)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 156, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 116, in _wait_once
    wait_fn(timeout=timeout)
  File "/usr/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

I think we should be able to handle this case since one of the main advantages of streaming queries is getting partial data back if a query fails.

Comment thread synapse/cli/query.py Outdated
@polymerizedsage
Copy link
Copy Markdown
Contributor

I'm getting this error when a streaming query completes:

Traceback (most recent call last):
  File "/home/sage/synapse-client-python/.venv/bin/synapsectl", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/__main__.py", line 74, in main
    args.func(args)
  File "/home/sage/synapse-client-python/synapse/cli/rpc.py", line 111, in query
    return client.stream_query(StreamQueryRequest(request=query_proto))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 58, in stream_query
    return self.handle_impedance_stream(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 237, in handle_impedance_stream
    self.save_impedance_results(all_measurements)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'StreamingQueryClient' object has no attribute 'save_impedance_results'. Did you mean: 'display_impedance_results'?

@polymerizedsage
Copy link
Copy Markdown
Contributor

I also still get the same stacktrace when ctrl + C'ing a running query. The output CSV does save however.

Traceback (most recent call last):
  File "/home/sage/synapse-client-python/.venv/bin/synapsectl", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/__main__.py", line 74, in main
    args.func(args)
  File "/home/sage/synapse-client-python/synapse/cli/rpc.py", line 111, in query
    return client.stream_query(StreamQueryRequest(request=query_proto))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 58, in stream_query
    return self.handle_impedance_stream(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 189, in handle_impedance_stream
    for response in self.device.stream_query(request):
  File "/home/sage/synapse-client-python/synapse/client/device.py", line 179, in stream_query
    for response in self.rpc.StreamQuery(stream_request):
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 960, in _next
    _common.wait(self._state.condition.wait, _response_ready)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 156, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 116, in _wait_once
    wait_fn(timeout=timeout)
  File "/usr/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

Copy link
Copy Markdown
Contributor

@polymerizedsage polymerizedsage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alongside the other comments some small changes.

Comment thread synapse/cli/query.py
Comment thread synapse/cli/query.py
@gilbert-sci
Copy link
Copy Markdown
Contributor Author

I also still get the same stacktrace when ctrl + C'ing a running query. The output CSV does save however.

Traceback (most recent call last):
  File "/home/sage/synapse-client-python/.venv/bin/synapsectl", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/__main__.py", line 74, in main
    args.func(args)
  File "/home/sage/synapse-client-python/synapse/cli/rpc.py", line 111, in query
    return client.stream_query(StreamQueryRequest(request=query_proto))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 58, in stream_query
    return self.handle_impedance_stream(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/synapse/cli/query.py", line 189, in handle_impedance_stream
    for response in self.device.stream_query(request):
  File "/home/sage/synapse-client-python/synapse/client/device.py", line 179, in stream_query
    for response in self.rpc.StreamQuery(stream_request):
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 960, in _next
    _common.wait(self._state.condition.wait, _response_ready)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 156, in wait
    _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
  File "/home/sage/synapse-client-python/.venv/lib/python3.12/site-packages/grpc/_common.py", line 116, in _wait_once
    wait_fn(timeout=timeout)
  File "/usr/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

I thought you just wanted to have the csv output - not silent keyboard. I can fix.

@gilbert-sci
Copy link
Copy Markdown
Contributor Author

Should be fixed now, sorry:

[15:19:42]  Starting impedance_stream with 2 electordes                                       query.py:147
 Started saving measurements to impedance_measurements_20250423-151942.csv
⠇  Processing impedance measurements  ━━━━━━━━━━━━━━━━━━━━╺━━━━━━━━━━━━━━━━━━━  50% (1/2) 0:00:02
[15:19:44]  Operation cancelled by user                                                        query.py:69

Copy link
Copy Markdown
Contributor

@polymerizedsage polymerizedsage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all good!

@gilbert-sci gilbert-sci merged commit e45e5f5 into main Apr 23, 2025
2 checks passed
@gilbert-sci gilbert-sci deleted the gilbert/query-stream-impl branch April 23, 2025 22:37
antoniaelsen pushed a commit that referenced this pull request Apr 30, 2025
* Better displays and plots

* added logs while self test is running in verbose mode

* added logs while self test is running in verbose mode

* Added cli

* Added better options for device log

* updated the submodule

* Updated synapse api

* Updated api to main

* Feedback from review

* increase sleep

* Updated from feedback

* Feedback from review

* Fixes during streaming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants