-
Couldn't load subscription status.
- Fork 81
feat: add async scan #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add async scan #377
Changes from all commits
ffdacac
a0e5fcd
a82776f
670d1ee
ef217a2
7fb8b5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,9 @@ | |
| The client to scan vertex and edge from storage, | ||
| the return data is from thr graph database | ||
| """ | ||
|
|
||
| import sys | ||
| import concurrent.futures | ||
|
|
||
| from nebula3.sclient.ScanResult import ScanResult | ||
| from nebula3.sclient.net import GraphStorageConnection | ||
|
|
@@ -192,6 +194,69 @@ def scan_vertex_with_part( | |
| partial_success, | ||
| ) | ||
|
|
||
| # TODO: 1.Native async or PyO3 | ||
| # 2.Error Handling | ||
| # 3.Statistical indicators | ||
| def scan_vertex_async( | ||
| self, | ||
| space_name, | ||
| tag_name, | ||
| prop_names=[], | ||
| start_time=DEFAULT_START_TIME, | ||
| end_time=DEFAULT_END_TIME, | ||
| where=None, | ||
| only_latest_version=False, | ||
| enable_read_from_follower=True, | ||
| partial_success=False, | ||
| batch_size=1000, | ||
| max_workers=8, | ||
| ): | ||
| """ | ||
| scan_vertex_async:Multi-partition concurrency and streaming batch yield | ||
|
|
||
| :param space_name: the space name | ||
| :param tag_name: the tag name | ||
| :param prop_names: if given empty, return all property | ||
| :param start_time: the min version of vertex | ||
| :param end_time: the max version of vertex | ||
| :param where: now is unsupported | ||
| :param only_latest_version: when storage enable multi versions and only_latest_version is true, | ||
| only return latest version. | ||
| when storage disable multi versions, just use the default value. | ||
| :param enable_read_from_follower: if set to false, forbid follower read | ||
| :param partial_success: if set true, when partial success, it will continue until finish | ||
| :param batch_size: The number of points in each batch (passed to scan_vertex_with_part) | ||
| :param max_workers: Number of concurrent threads | ||
| :yield: part_id, VertexResult # Each batch of data | ||
|
|
||
| """ | ||
| part_leaders = self._meta_cache.get_part_leaders(space_name) | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
| future_to_part = {} | ||
| for part, leader in part_leaders.items(): | ||
| future = executor.submit( | ||
| self.scan_vertex_with_part, | ||
| space_name, | ||
| part, | ||
| tag_name, | ||
| prop_names, | ||
| batch_size, # The limit passed to scan_vertex_with_part | ||
| start_time, | ||
| end_time, | ||
| where, | ||
| only_latest_version, | ||
| enable_read_from_follower, | ||
| partial_success, | ||
| ) | ||
| future_to_part[future] = part | ||
|
|
||
| for future in concurrent.futures.as_completed(future_to_part): | ||
| part = future_to_part[future] | ||
| scan_result = future.result() # ScanResult | ||
| while scan_result is not None and scan_result.has_next(): | ||
| batch = scan_result.next() | ||
| yield part, batch | ||
|
|
||
| def _scan_vertex( | ||
| self, | ||
| space_name, | ||
|
|
@@ -337,6 +402,65 @@ def scan_edge_with_part( | |
| partial_success, | ||
| ) | ||
|
|
||
| def scan_edge_async( | ||
| self, | ||
| space_name, | ||
| edge_name, | ||
| prop_names=[], | ||
| start_time=DEFAULT_START_TIME, | ||
| end_time=DEFAULT_END_TIME, | ||
| where=None, | ||
| only_latest_version=False, | ||
| enable_read_from_follower=True, | ||
| partial_success=False, | ||
| batch_size=1000, | ||
| max_workers=8, | ||
| ): | ||
| """ | ||
| scan_edge_async:Multi-partition concurrency and streaming batch yield | ||
|
|
||
| :param space_name: the space name | ||
| :param prop_names: if given empty, return all property | ||
| :param edge_name: the edge name | ||
| :param start_time: the min version of edge | ||
| :param end_time: the max version of edge | ||
| :param where: now is unsupported | ||
| :param only_latest_version: when storage enable multi versions and only_latest_version is true, | ||
| only return latest version. | ||
| when storage disable multi versions, just use the default value. | ||
| :param enable_read_from_follower: if set to false, forbid follower read | ||
| :param partial_success: if set true, when partial success, it will continue until finish | ||
| :param batch_size: The number of edges per batch (passed to scan_edge_with_part) | ||
| :param max_workers: Number of concurrent threads | ||
| :yield: part_id, EdgeResult # Each batch of data | ||
| """ | ||
| part_leaders = self._meta_cache.get_part_leaders(space_name) | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
| future_to_part = {} | ||
| for part, leader in part_leaders.items(): | ||
| future = executor.submit( | ||
| self.scan_edge_with_part, | ||
| space_name, | ||
| part, | ||
| edge_name, | ||
| prop_names, | ||
| batch_size, | ||
| start_time, | ||
| end_time, | ||
| where, | ||
| only_latest_version, | ||
| enable_read_from_follower, | ||
| partial_success, | ||
| ) | ||
| future_to_part[future] = part | ||
|
|
||
| for future in concurrent.futures.as_completed(future_to_part): | ||
| part = future_to_part[future] | ||
| scan_result = future.result() | ||
|
||
| while scan_result is not None and scan_result.has_next(): | ||
| batch = scan_result.next() | ||
| yield part, batch | ||
|
|
||
| def _scan_edge( | ||
| self, | ||
| space_name, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
future.result()call can raise exceptions from the background thread, but there's no exception handling. Consider wrapping this in a try-except block to handle potential errors gracefully, especially since the TODO comment mentions error handling as a known issue.