From c43ac17c86c8a73fe144607324516d1e904b2ea6 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 23 Dec 2022 16:22:20 -0300 Subject: [PATCH 1/3] Poll cb and poll first dict, error handling --- substreams/substream.py | 113 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 4 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index f86b96a..42a8f54 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -103,7 +103,6 @@ def _parse_from_string(self, raw: str, key: str, output_class) -> dict: decoded: bytes = base64.b64decode(raw) obj = {} if output_class is None: - # PASS THE VALUE TYPE HERE TO SANITIZE BASE64 DECODE(bigint OR string) obj["value"] = str(decoded).split("b'")[1].split("'")[0] if ":" in key: split_key = key.split(":") @@ -174,7 +173,8 @@ def proto_file_map(self) -> dict[str, DescriptorProto]: name_map[mt.name] = pf.name return name_map - # TODO how do I type annotate this stuff? + # This method indexes all blocks from start_block until end_block + # Returns one single dataframe with all data in rows once all blocks have been indexed def poll( self, output_modules: list[str], @@ -182,7 +182,6 @@ def poll( end_block: int, initial_snapshot=False, ): - # TODO make this general from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request for module in output_modules: @@ -218,7 +217,7 @@ def poll( parsed = self._parse_data_deltas(data) module_name: str = data["outputs"][0]["name"] raw_results[module_name]["data"].extend(parsed) - + print('FINISH STREAM') results = [] for output_module in output_modules: result = SubstreamOutput(module_name=output_module) @@ -229,3 +228,109 @@ def poll( setattr(result, k, df) results.append(result) return results + + # This method executes a function passed as a parameter on every block data has been streamed + def poll_callback_on_data( + self, + output_modules: list[str], + start_block: int, + end_block: int, + stream_callback, + initial_snapshot=False, + ): + # TODO make this general + from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request + + for module in output_modules: + if module not in self.output_modules: + raise Exception(f"module '{module}' is not supported for {self.name}") + self._class_from_module(module) + + stream = self.service.Blocks( + Request( + start_block_num=start_block, + stop_block_num=end_block, + fork_steps=[STEP_IRREVERSIBLE], + modules=self.pkg.modules, + output_modules=output_modules, + initial_store_snapshot_for_modules=output_modules + if initial_snapshot + else None, + ) + ) + raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) + for response in stream: + data = MessageToDict(response.data) + if data: + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + if len(parsed) > 0: + print('data block #', data["clock"]["number"]) + stream_callback(module_name, parsed) + print('FINISH STREAM') + return + + # This method indexes all blocks until the first block that data is received + # Ideal for live streaming events and receiveing them on the front end + def poll_return_first_dict( + self, + output_modules: list[str], + start_block: int, + end_block: int, + highest_processed_block: int = 0, + initial_snapshot=False, + return_progress=False + ): + from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request + + for module in output_modules: + if module not in self.output_modules: + raise Exception(f"module '{module}' is not supported for {self.name}") + self._class_from_module(module) + + # initial_store_snapshot_for_modules could possibly import the starting snapshot to start indexing at chain head? + stream = self.service.Blocks( + Request( + start_block_num=start_block, + stop_block_num=end_block, + fork_steps=[STEP_IRREVERSIBLE], + modules=self.pkg.modules, + output_modules=output_modules, + initial_store_snapshot_for_modules=output_modules + if initial_snapshot + else None, + ) + ) + module_name = "" + parsed = None + data_block = 0 + + print(stream.time_remaining(), start_block, end_block, highest_processed_block) + try: + for response in stream: + data = MessageToDict(response.data) + progress = MessageToDict(response.progress) + if data: + data_block = data["clock"]["number"] + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + if len(parsed) > 0: + print('data block #', data["clock"]["number"]) + break + elif progress and return_progress is True: + endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) + data_block = endBlock + if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: + print(data_block, 'datablock') + return {"block": int(endBlock)} + print('FINISH STREAM') + return {"data": parsed, "module_name": module_name, "data_block": data_block} + except Exception as e: + return {"error": e} From a1ce56991c35430d64ec00fbe3ea01470ba2fbb8 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 23 Dec 2022 16:25:49 -0300 Subject: [PATCH 2/3] Poll error try except --- substreams/substream.py | 51 ++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index 42a8f54..2a68d41 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -202,31 +202,34 @@ def poll( ) ) raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) - for response in stream: - snapshot = MessageToDict(response.snapshot_data) - data = MessageToDict(response.data) - if snapshot: - module_name: str = snapshot["moduleName"] - snapshot_deltas = self._parse_snapshot_deltas(snapshot) - raw_results[module_name]["snapshots"].extend(snapshot_deltas) - if data: - print("data block #", data["clock"]["number"]) - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - raw_results[module_name]["data"].extend(parsed) - print('FINISH STREAM') results = [] - for output_module in output_modules: - result = SubstreamOutput(module_name=output_module) - data_dict: dict = raw_results.get(output_module) - for k, v in data_dict.items(): - df = pd.DataFrame(v) - df["output_module"] = output_module - setattr(result, k, df) - results.append(result) + try: + for response in stream: + snapshot = MessageToDict(response.snapshot_data) + data = MessageToDict(response.data) + if snapshot: + module_name: str = snapshot["moduleName"] + snapshot_deltas = self._parse_snapshot_deltas(snapshot) + raw_results[module_name]["snapshots"].extend(snapshot_deltas) + if data: + print("data block #", data["clock"]["number"]) + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + raw_results[module_name]["data"].extend(parsed) + print('FINISH STREAM') + for output_module in output_modules: + result = SubstreamOutput(module_name=output_module) + data_dict: dict = raw_results.get(output_module) + for k, v in data_dict.items(): + df = pd.DataFrame(v) + df["output_module"] = output_module + setattr(result, k, df) + results.append(result) + except Exception as e: + results.append({"error": e}) return results # This method executes a function passed as a parameter on every block data has been streamed From 1bb8f97c3ae12051fe9261e5491bfe44544c8155 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 3 Jan 2023 21:56:17 -0300 Subject: [PATCH 3/3] Combine poll funcs and remove logging --- substreams/substream.py | 214 +++++++++++++++------------------------- 1 file changed, 78 insertions(+), 136 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index 2a68d41..ee69645 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -173,118 +173,15 @@ def proto_file_map(self) -> dict[str, DescriptorProto]: name_map[mt.name] = pf.name return name_map - # This method indexes all blocks from start_block until end_block - # Returns one single dataframe with all data in rows once all blocks have been indexed def poll( self, output_modules: list[str], start_block: int, end_block: int, + stream_callback=None, + return_first_result=False, initial_snapshot=False, - ): - from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request - - for module in output_modules: - if module not in self.output_modules: - raise Exception(f"module '{module}' is not supported for {self.name}") - self._class_from_module(module) - - stream = self.service.Blocks( - Request( - start_block_num=start_block, - stop_block_num=end_block, - fork_steps=[STEP_IRREVERSIBLE], - modules=self.pkg.modules, - output_modules=output_modules, - initial_store_snapshot_for_modules=output_modules - if initial_snapshot - else None, - ) - ) - raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) - results = [] - try: - for response in stream: - snapshot = MessageToDict(response.snapshot_data) - data = MessageToDict(response.data) - if snapshot: - module_name: str = snapshot["moduleName"] - snapshot_deltas = self._parse_snapshot_deltas(snapshot) - raw_results[module_name]["snapshots"].extend(snapshot_deltas) - if data: - print("data block #", data["clock"]["number"]) - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - raw_results[module_name]["data"].extend(parsed) - print('FINISH STREAM') - for output_module in output_modules: - result = SubstreamOutput(module_name=output_module) - data_dict: dict = raw_results.get(output_module) - for k, v in data_dict.items(): - df = pd.DataFrame(v) - df["output_module"] = output_module - setattr(result, k, df) - results.append(result) - except Exception as e: - results.append({"error": e}) - return results - - # This method executes a function passed as a parameter on every block data has been streamed - def poll_callback_on_data( - self, - output_modules: list[str], - start_block: int, - end_block: int, - stream_callback, - initial_snapshot=False, - ): - # TODO make this general - from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request - - for module in output_modules: - if module not in self.output_modules: - raise Exception(f"module '{module}' is not supported for {self.name}") - self._class_from_module(module) - - stream = self.service.Blocks( - Request( - start_block_num=start_block, - stop_block_num=end_block, - fork_steps=[STEP_IRREVERSIBLE], - modules=self.pkg.modules, - output_modules=output_modules, - initial_store_snapshot_for_modules=output_modules - if initial_snapshot - else None, - ) - ) - raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) - for response in stream: - data = MessageToDict(response.data) - if data: - module_name: str = data["outputs"][0]["name"] - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - if len(parsed) > 0: - print('data block #', data["clock"]["number"]) - stream_callback(module_name, parsed) - print('FINISH STREAM') - return - - # This method indexes all blocks until the first block that data is received - # Ideal for live streaming events and receiveing them on the front end - def poll_return_first_dict( - self, - output_modules: list[str], - start_block: int, - end_block: int, highest_processed_block: int = 0, - initial_snapshot=False, return_progress=False ): from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request @@ -294,7 +191,6 @@ def poll_return_first_dict( raise Exception(f"module '{module}' is not supported for {self.name}") self._class_from_module(module) - # initial_store_snapshot_for_modules could possibly import the starting snapshot to start indexing at chain head? stream = self.service.Blocks( Request( start_block_num=start_block, @@ -307,33 +203,79 @@ def poll_return_first_dict( else None, ) ) - module_name = "" - parsed = None - data_block = 0 - - print(stream.time_remaining(), start_block, end_block, highest_processed_block) - try: - for response in stream: - data = MessageToDict(response.data) - progress = MessageToDict(response.progress) - if data: - data_block = data["clock"]["number"] - module_name: str = data["outputs"][0]["name"] - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - if len(parsed) > 0: - print('data block #', data["clock"]["number"]) - break - elif progress and return_progress is True: - endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) - data_block = endBlock - if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: - print(data_block, 'datablock') - return {"block": int(endBlock)} - print('FINISH STREAM') - return {"data": parsed, "module_name": module_name, "data_block": data_block} - except Exception as e: - return {"error": e} + raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) + results = [] + if callable(stream_callback): + # This logic executes a function passed as a parameter on every block data has been streamed + try: + for response in stream: + data = MessageToDict(response.data) + if data: + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + if len(parsed) > 0: + stream_callback(module_name, parsed) + except Exception as e: + return {"error": e} + return + elif return_first_result is True: + # This logic indexes all blocks until the first block that data is received + # Ideal for live streaming events and receiveing them on the front end + module_name = "" + parsed = None + data_block = 0 + try: + for response in stream: + data = MessageToDict(response.data) + progress = MessageToDict(response.progress) + if data: + data_block = data["clock"]["number"] + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + if len(parsed) > 0: + break + elif progress and return_progress is True: + endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) + data_block = endBlock + if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: + return {"block": int(endBlock)} + return {"data": parsed, "module_name": module_name, "data_block": data_block} + except Exception as e: + return {"error": e} + else: + # This logic indexes all blocks from start_block until end_block + # Returns one single dataframe with all data in rows once all blocks have been indexed + try: + for response in stream: + snapshot = MessageToDict(response.snapshot_data) + data = MessageToDict(response.data) + if snapshot: + module_name: str = snapshot["moduleName"] + snapshot_deltas = self._parse_snapshot_deltas(snapshot) + raw_results[module_name]["snapshots"].extend(snapshot_deltas) + if data: + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + raw_results[module_name]["data"].extend(parsed) + for output_module in output_modules: + result = SubstreamOutput(module_name=output_module) + data_dict: dict = raw_results.get(output_module) + for k, v in data_dict.items(): + df = pd.DataFrame(v) + df["output_module"] = output_module + setattr(result, k, df) + results.append(result) + except Exception as e: + results.append({"error": e}) + return results +