diff --git a/pyproject.toml b/pyproject.toml index 6b28380..6b2efa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "substreams" -version = "0.0.5" +version = "0.0.6" authors = [ { name="Ryan Sudhakaran", email="ryan.sudhakaran@messari.io" }, { name="Michael Carroll", email="michaelcarroll1999@gmail.com" }, diff --git a/setup.py b/setup.py index 76e1b4f..abcc099 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="substreams", - version="0.0.5", + version="0.0.6", packages=[".substreams"], author="Ryan Sudhakaran", author_email="ryan.sudhakaran@messari.io", diff --git a/substreams/substream.py b/substreams/substream.py index 314ac82..a8ad625 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -210,77 +210,45 @@ def poll( ) raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) results = [] - if callable(stream_callback): - # This logic executes a function passed as a parameter on every block data has been streamed - try: - for response in stream: - data = MessageToDict(response.data) - if data: - module_name: str = data["outputs"][0]["name"] - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - if len(parsed) > 0: - stream_callback(module_name, parsed) - except Exception as e: - return {"error": e} - return - elif return_first_result is True: - # This logic indexes all blocks until the first block that data is received - # Ideal for live streaming events and receiveing them on the front end - module_name = "" - parsed = None - data_block = 0 - try: - for response in stream: - data = MessageToDict(response.data) - progress = MessageToDict(response.progress) - if data: - data_block = data["clock"]["number"] - module_name: str = data["outputs"][0]["name"] - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - if len(parsed) > 0: + data_block = 0 + module_name: str = "" + try: + for response in stream: + snapshot = MessageToDict(response.snapshot_data) + data = MessageToDict(response.data) + progress = MessageToDict(response.progress) + if snapshot: + module_name = snapshot["moduleName"] + snapshot_deltas = self._parse_snapshot_deltas(snapshot) + raw_results[module_name]["snapshots"].extend(snapshot_deltas) + if data: + if self.output_modules[module]["is_map"]: + parsed = self._parse_data_outputs(data) + else: + parsed = self._parse_data_deltas(data) + module_name = data["outputs"][0]["name"] + raw_results[module_name]["data"].extend(parsed) + data_block = data["clock"]["number"] + if len(parsed) > 0: + if return_first_result is True: break - elif progress and return_progress is True: - endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) - data_block = endBlock - if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: - return {"block": int(endBlock)} + if callable(stream_callback): + stream_callback(module_name, parsed) + elif progress and return_progress is True: + endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) + data_block = endBlock + if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: + return {"block": int(endBlock)} + if return_first_result is True: return {"data": parsed, "module_name": module_name, "data_block": data_block} - except Exception as e: - return {"error": e} - else: - # This logic indexes all blocks from start_block until end_block - # Returns one single dataframe with all data in rows once all blocks have been indexed - try: - for response in stream: - snapshot = MessageToDict(response.snapshot_data) - data = MessageToDict(response.data) - if snapshot: - module_name: str = snapshot["moduleName"] - snapshot_deltas = self._parse_snapshot_deltas(snapshot) - raw_results[module_name]["snapshots"].extend(snapshot_deltas) - if data: - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) - module_name: str = data["outputs"][0]["name"] - raw_results[module_name]["data"].extend(parsed) - for output_module in output_modules: - result = SubstreamOutput(module_name=output_module) - data_dict: dict = raw_results.get(output_module) - for k, v in data_dict.items(): - df = pd.DataFrame(v) - df["output_module"] = output_module - setattr(result, k, df) - results.append(result) - except Exception as e: - results.append({"error": e}) - return results - + for output_module in output_modules: + result = SubstreamOutput(module_name=output_module) + data_dict: dict = raw_results.get(output_module) + for k, v in data_dict.items(): + df = pd.DataFrame(v) + df["output_module"] = output_module + setattr(result, k, df) + results.append(result) + except Exception as e: + results.append({"error": e}) + return results