diff --git a/substreams/substream.py b/substreams/substream.py index f86b96a..ee69645 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -103,7 +103,6 @@ def _parse_from_string(self, raw: str, key: str, output_class) -> dict: decoded: bytes = base64.b64decode(raw) obj = {} if output_class is None: - # PASS THE VALUE TYPE HERE TO SANITIZE BASE64 DECODE(bigint OR string) obj["value"] = str(decoded).split("b'")[1].split("'")[0] if ":" in key: split_key = key.split(":") @@ -174,15 +173,17 @@ def proto_file_map(self) -> dict[str, DescriptorProto]: name_map[mt.name] = pf.name return name_map - # TODO how do I type annotate this stuff? def poll( self, output_modules: list[str], start_block: int, end_block: int, + stream_callback=None, + return_first_result=False, initial_snapshot=False, + highest_processed_block: int = 0, + return_progress=False ): - # TODO make this general from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request for module in output_modules: @@ -203,29 +204,78 @@ def poll( ) ) raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) - for response in stream: - snapshot = MessageToDict(response.snapshot_data) - data = MessageToDict(response.data) - if snapshot: - module_name: str = snapshot["moduleName"] - snapshot_deltas = self._parse_snapshot_deltas(snapshot) - raw_results[module_name]["snapshots"].extend(snapshot_deltas) - if data: - print("data block #", data["clock"]["number"]) - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - raw_results[module_name]["data"].extend(parsed) - results = [] - for output_module in output_modules: - result = SubstreamOutput(module_name=output_module) - data_dict: dict = raw_results.get(output_module) - for k, v in data_dict.items(): - df = pd.DataFrame(v) - df["output_module"] = output_module - setattr(result, k, df) - results.append(result) - return results + if callable(stream_callback): + # This logic executes a function passed as a parameter on every block data has been streamed + try: + for response in stream: + data = MessageToDict(response.data) + if data: + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + if len(parsed) > 0: + stream_callback(module_name, parsed) + except Exception as e: + return {"error": e} + return + elif return_first_result is True: + # This logic indexes all blocks until the first block that data is received + # Ideal for live streaming events and receiveing them on the front end + module_name = "" + parsed = None + data_block = 0 + try: + for response in stream: + data = MessageToDict(response.data) + progress = MessageToDict(response.progress) + if data: + data_block = data["clock"]["number"] + module_name: str = data["outputs"][0]["name"] + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + if len(parsed) > 0: + break + elif progress and return_progress is True: + endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) + data_block = endBlock + if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: + return {"block": int(endBlock)} + return {"data": parsed, "module_name": module_name, "data_block": data_block} + except Exception as e: + return {"error": e} + else: + # This logic indexes all blocks from start_block until end_block + # Returns one single dataframe with all data in rows once all blocks have been indexed + try: + for response in stream: + snapshot = MessageToDict(response.snapshot_data) + data = MessageToDict(response.data) + if snapshot: + module_name: str = snapshot["moduleName"] + snapshot_deltas = self._parse_snapshot_deltas(snapshot) + raw_results[module_name]["snapshots"].extend(snapshot_deltas) + if data: + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name: str = data["outputs"][0]["name"] + raw_results[module_name]["data"].extend(parsed) + for output_module in output_modules: + result = SubstreamOutput(module_name=output_module) + data_dict: dict = raw_results.get(output_module) + for k, v in data_dict.items(): + df = pd.DataFrame(v) + df["output_module"] = output_module + setattr(result, k, df) + results.append(result) + except Exception as e: + results.append({"error": e}) + return results +