diff --git a/nebula3/sclient/GraphStorageClient.py b/nebula3/sclient/GraphStorageClient.py index c1e73c4a..77fd7256 100644 --- a/nebula3/sclient/GraphStorageClient.py +++ b/nebula3/sclient/GraphStorageClient.py @@ -10,7 +10,9 @@ The client to scan vertex and edge from storage, the return data is from thr graph database """ + import sys +import concurrent.futures from nebula3.sclient.ScanResult import ScanResult from nebula3.sclient.net import GraphStorageConnection @@ -192,6 +194,69 @@ def scan_vertex_with_part( partial_success, ) + # TODO: 1.Native async or PyO3 + # 2.Error Handling + # 3.Statistical indicators + def scan_vertex_async( + self, + space_name, + tag_name, + prop_names=[], + start_time=DEFAULT_START_TIME, + end_time=DEFAULT_END_TIME, + where=None, + only_latest_version=False, + enable_read_from_follower=True, + partial_success=False, + batch_size=1000, + max_workers=8, + ): + """ + scan_vertex_async:Multi-partition concurrency and streaming batch yield + + :param space_name: the space name + :param tag_name: the tag name + :param prop_names: if given empty, return all property + :param start_time: the min version of vertex + :param end_time: the max version of vertex + :param where: now is unsupported + :param only_latest_version: when storage enable multi versions and only_latest_version is true, + only return latest version. + when storage disable multi versions, just use the default value. + :param enable_read_from_follower: if set to false, forbid follower read + :param partial_success: if set true, when partial success, it will continue until finish + :param batch_size: The number of points in each batch (passed to scan_vertex_with_part) + :param max_workers: Number of concurrent threads + :yield: part_id, VertexResult # Each batch of data + + """ + part_leaders = self._meta_cache.get_part_leaders(space_name) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_part = {} + for part, leader in part_leaders.items(): + future = executor.submit( + self.scan_vertex_with_part, + space_name, + part, + tag_name, + prop_names, + batch_size, # The limit passed to scan_vertex_with_part + start_time, + end_time, + where, + only_latest_version, + enable_read_from_follower, + partial_success, + ) + future_to_part[future] = part + + for future in concurrent.futures.as_completed(future_to_part): + part = future_to_part[future] + scan_result = future.result() # ScanResult + while scan_result is not None and scan_result.has_next(): + batch = scan_result.next() + yield part, batch + def _scan_vertex( self, space_name, @@ -337,6 +402,65 @@ def scan_edge_with_part( partial_success, ) + def scan_edge_async( + self, + space_name, + edge_name, + prop_names=[], + start_time=DEFAULT_START_TIME, + end_time=DEFAULT_END_TIME, + where=None, + only_latest_version=False, + enable_read_from_follower=True, + partial_success=False, + batch_size=1000, + max_workers=8, + ): + """ + scan_edge_async:Multi-partition concurrency and streaming batch yield + + :param space_name: the space name + :param prop_names: if given empty, return all property + :param edge_name: the edge name + :param start_time: the min version of edge + :param end_time: the max version of edge + :param where: now is unsupported + :param only_latest_version: when storage enable multi versions and only_latest_version is true, + only return latest version. + when storage disable multi versions, just use the default value. + :param enable_read_from_follower: if set to false, forbid follower read + :param partial_success: if set true, when partial success, it will continue until finish + :param batch_size: The number of edges per batch (passed to scan_edge_with_part) + :param max_workers: Number of concurrent threads + :yield: part_id, EdgeResult # Each batch of data + """ + part_leaders = self._meta_cache.get_part_leaders(space_name) + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_part = {} + for part, leader in part_leaders.items(): + future = executor.submit( + self.scan_edge_with_part, + space_name, + part, + edge_name, + prop_names, + batch_size, + start_time, + end_time, + where, + only_latest_version, + enable_read_from_follower, + partial_success, + ) + future_to_part[future] = part + + for future in concurrent.futures.as_completed(future_to_part): + part = future_to_part[future] + scan_result = future.result() + while scan_result is not None and scan_result.has_next(): + batch = scan_result.next() + yield part, batch + def _scan_edge( self, space_name,