-
Couldn't load subscription status.
- Fork 81
feat: add async scan #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add async scan #377
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the scan_vertex_async function,
scan_result = future.result() may throw an exception but is not handled.
In the scan_edge_async function,
while scan_result and scan_result.has_next():
batch = scan_result.next() If batch = scan_result.next() throws an exception, the scan_result may not be closed, leading to improper resource cleanup.
The comment may be incorrect:
"""
:type prop_names: if given empty, return all property # Should be :param
"""
Perhaps it should be:
"""
:param prop_names: if given empty, return all property
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds asynchronous scanning capabilities to the GraphStorageClient by introducing two new methods: scan_vertex_async() and scan_edge_async(). These methods enable concurrent, multi-partition scanning of vertices and edges using thread pools, addressing the limitation of only having blocking, single-threaded scan operations.
- Added concurrent.futures import for thread pool execution
- Implemented
scan_vertex_async()method for parallel vertex scanning across partitions - Implemented
scan_edge_async()method for parallel edge scanning across partitions
|
|
||
| for future in concurrent.futures.as_completed(future_to_part): | ||
| part = future_to_part[future] | ||
| scan_result = future.result() # ScanResult |
Copilot
AI
Jul 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future.result() call can raise exceptions from the background thread, but there's no exception handling. Consider wrapping this in a try-except block to handle potential errors gracefully, especially since the TODO comment mentions error handling as a known issue.
|
|
||
| for future in concurrent.futures.as_completed(future_to_part): | ||
| part = future_to_part[future] | ||
| scan_result = future.result() |
Copilot
AI
Jul 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as in scan_vertex_async: the future.result() call can raise exceptions from the background thread, but there's no exception handling. Consider wrapping this in a try-except block to handle potential errors gracefully.
|
Tks to @Malone-AI @wey-gu, I will fix these errors in the next few days. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
What type of PR is this?
What problem(s) does this PR solve?
Issue(s) number:
N/A
Description:
scan_vertex_async()andscan_edge_async()asyncio‑friendly way, filling the gap where only blocking, single‑threaded scans existed.How do you solve it?
concurrent.futures.ThreadPoolExecutorto submit onescan_*_with_partjob per partition (viameta_cache.get_part_leaders), dispatching them in parallel.ScanResultin awhile scan_result.has_next(): yield part, batchloop to stream(partition_id, batch)pairs.Special notes for your reviewer, ex. impact of this fix, design document, etc: