From 2c1d5d035d24337f4286eaebb935e7d70f1f10b6 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 20 Jan 2023 14:51:31 -0300 Subject: [PATCH] Map module support and deprecate store logic, docs --- README.md | 29 +++++++++++++++++++-- pyproject.toml | 2 +- setup.py | 2 +- substreams/substream.py | 58 ++++++++++++++++++----------------------- 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index b5338cc..2740b25 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,36 @@ In order to poll the substream, you will need to call the `poll()` function on t # View available modules on .spkg print(sb.output_modules) -# Poll the module and return a list of SubstreamOutput objects in the order of teh specified modules +# Poll the module and return a list of SubstreamOutput objects in the order of the specified modules result = sb.poll(["store_swap_events"], start_block=10000835, end_block=10000835+20000) ``` -The result here is a `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing: +With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs + +- output_modules + - List of strings of output modules to stream +- start_block + - Integer block number to start the polling +- end_block + - Integer block number to end the polling. In theory, there is no max block number as any block number past chain head will stream the blocks in real time. Its recommended to use an end_block far off into the future if building a data app that will be streaming datain real time as blocks finalize, such as block 20,000,000 +- stream_callback + - An optional callback function to be passed into the polling function to execute when valid streamed data is received +- return_first_result + - Boolean value that if True will return data on the first block after the start block to have an applicable TX/Event. + - Can be called recursively on the front end while incrementing the start_block to return data as its streamed rather than all data at once after streaming is completed + - Defaults to False + - If True, the data is returned in the format {"data": [], "module_name": String, "data_block": int} +- initial_snapshot + - Boolean value, defaults to False +- highest_processed_block + - Integer block number that is used in measuring indexing and processing progress, in cases where return_progress is True + - Defaults to 0 +- return_progress: bool = False, + - Boolean value that if True returns progress in back processing + - Defaults to False + + +The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing: ```python # These will return pandas DataFrames diff --git a/pyproject.toml b/pyproject.toml index 6b2efa4..86ed370 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "substreams" -version = "0.0.6" +version = "0.0.7" authors = [ { name="Ryan Sudhakaran", email="ryan.sudhakaran@messari.io" }, { name="Michael Carroll", email="michaelcarroll1999@gmail.com" }, diff --git a/setup.py b/setup.py index abcc099..625d692 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="substreams", - version="0.0.6", + version="0.0.7", packages=[".substreams"], author="Ryan Sudhakaran", author_email="ryan.sudhakaran@messari.io", diff --git a/substreams/substream.py b/substreams/substream.py index a8ad625..0914f44 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -126,25 +126,12 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]: for x in snapshot["deltas"].get("deltas", list()) ] - def _parse_data_deltas(self, data: dict) -> list[dict]: - module_name: str = data["outputs"][0]["name"] - obj_class = self._class_from_module(module_name) - deltas = list() - for output in data["outputs"]: - store_deltas = output["storeDeltas"] - if store_deltas: - raw_deltas = store_deltas["deltas"] - for delta in raw_deltas: - raw = delta["newValue"] - key = delta["key"] - d = self._parse_from_string(raw, key, obj_class) - d.update(data["clock"]) - deltas.append(d) - return deltas - - def _parse_data_outputs(self, data: dict) -> list[dict]: + def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]: outputs = list() + module_set = set(module_names) for output in data["outputs"]: + if "mapOutput" not in output or output["name"] not in module_set: + continue map_output = output["mapOutput"] for key, items in map_output.items(): if key == "items": @@ -157,11 +144,8 @@ def output_modules(self) -> dict[str, Any]: module_map = {} for module in self.pkg.modules.ListFields()[0][1]: map_output_type = module.kind_map.output_type - store_output_type = module.kind_store.value_type if map_output_type != "": output_type = map_output_type - else: - output_type = store_output_type module_map[module.name] = { "is_map": map_output_type != "", @@ -183,17 +167,18 @@ def poll( output_modules: list[str], start_block: int, end_block: int, - stream_callback=None, - return_first_result=False, - initial_snapshot=False, + stream_callback: Optional[callable] = None, + return_first_result: bool = False, + initial_snapshot: bool = False, highest_processed_block: int = 0, - return_progress=False + return_progress: bool = False, ): from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request - for module in output_modules: if module not in self.output_modules: raise Exception(f"module '{module}' is not supported for {self.name}") + if self.output_modules[module].get('is_map') is False: + raise Exception(f"module '{module}' is not a map module") self._class_from_module(module) stream = self.service.Blocks( @@ -211,30 +196,39 @@ def poll( raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) results = [] data_block = 0 - module_name: str = "" + module_name = "" + try: for response in stream: snapshot = MessageToDict(response.snapshot_data) data = MessageToDict(response.data) progress = MessageToDict(response.progress) + session = MessageToDict(response.session) + + if session: + continue + if snapshot: module_name = snapshot["moduleName"] snapshot_deltas = self._parse_snapshot_deltas(snapshot) raw_results[module_name]["snapshots"].extend(snapshot_deltas) + if data: - if self.output_modules[module]["is_map"]: - parsed = self._parse_data_outputs(data) - else: - parsed = self._parse_data_deltas(data) + parsed = self._parse_data_outputs(data, output_modules) module_name = data["outputs"][0]["name"] raw_results[module_name]["data"].extend(parsed) data_block = data["clock"]["number"] if len(parsed) > 0: + parsed = [dict(item, **{'block':data_block}) for item in parsed] if return_first_result is True: break if callable(stream_callback): stream_callback(module_name, parsed) + else: + continue elif progress and return_progress is True: + if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]: + continue endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) data_block = endBlock if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: @@ -249,6 +243,6 @@ def poll( df["output_module"] = output_module setattr(result, k, df) results.append(result) - except Exception as e: - results.append({"error": e}) + except Exception as err: + results = {"error": err} return results