From c08df6e11077e959694406ae3ed4e15d1a3d1965 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 25 Jan 2023 22:34:32 -0300 Subject: [PATCH 1/4] Initial dict/df return routing --- substreams/substream.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index 0914f44..36085d7 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -170,8 +170,7 @@ def poll( stream_callback: Optional[callable] = None, return_first_result: bool = False, initial_snapshot: bool = False, - highest_processed_block: int = 0, - return_progress: bool = False, + return_type: str = "df" ): from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request for module in output_modules: @@ -202,7 +201,6 @@ def poll( for response in stream: snapshot = MessageToDict(response.snapshot_data) data = MessageToDict(response.data) - progress = MessageToDict(response.progress) session = MessageToDict(response.session) if session: @@ -226,15 +224,13 @@ def poll( stream_callback(module_name, parsed) else: continue - elif progress and return_progress is True: - if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]: - continue - endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) - data_block = endBlock - if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: - return {"block": int(endBlock)} + + # Want to refactir the below logic to create a singlepoint of return and provide more consistency if return_first_result is True: - return {"data": parsed, "module_name": module_name, "data_block": data_block} + if return_type == "dict": + return {"data": parsed, "module_name": module_name, "data_block": data_block} + elif return_type == "df": + return pd.DataFrame(parsed) for output_module in output_modules: result = SubstreamOutput(module_name=output_module) data_dict: dict = raw_results.get(output_module) @@ -243,6 +239,8 @@ def poll( df["output_module"] = output_module setattr(result, k, df) results.append(result) + if return_type == "dict": + results = results.to_dict() except Exception as err: results = {"error": err} return results From 8a3a77b28089b7e05c1bc2a07dedfd5f154425b2 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 27 Jan 2023 00:37:59 -0300 Subject: [PATCH 2/4] Change return logic --- substreams/substream.py | 92 +++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index 36085d7..63eb5ba 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -16,7 +16,6 @@ DEFAULT_ENDPOINT = "api.streamingfast.io:443" - def retrieve_class(module_name: str, class_name: str): module = import_module(module_name) return getattr(module, class_name) @@ -172,32 +171,40 @@ def poll( initial_snapshot: bool = False, return_type: str = "df" ): - from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request - for module in output_modules: - if module not in self.output_modules: - raise Exception(f"module '{module}' is not supported for {self.name}") - if self.output_modules[module].get('is_map') is False: - raise Exception(f"module '{module}' is not a map module") - self._class_from_module(module) - - stream = self.service.Blocks( - Request( - start_block_num=start_block, - stop_block_num=end_block, - fork_steps=[STEP_IRREVERSIBLE], - modules=self.pkg.modules, - output_modules=output_modules, - initial_store_snapshot_for_modules=output_modules - if initial_snapshot - else None, - ) - ) - raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) - results = [] - data_block = 0 + module_name = "" + has_error = False + return_dict_interface = {"data": [], "module_name": module_name, "data_block": start_block, "error": None} + valid_return_types = ["dict", "df"] + results = [] + raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) try: + return_type = return_type.lower() + if return_type not in valid_return_types: + return_type = "df" + + from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request + for module in output_modules: + if module not in self.output_modules: + raise Exception(f"module '{module}' is not supported for {self.name}") + if self.output_modules[module].get('is_map') is False: + raise Exception(f"module '{module}' is not a map module") + self._class_from_module(module) + + stream = self.service.Blocks( + Request( + start_block_num=start_block, + stop_block_num=end_block, + fork_steps=[STEP_IRREVERSIBLE], + modules=self.pkg.modules, + output_modules=output_modules, + initial_store_snapshot_for_modules=output_modules + if initial_snapshot + else None, + ) + ) + for response in stream: snapshot = MessageToDict(response.snapshot_data) data = MessageToDict(response.data) @@ -215,32 +222,35 @@ def poll( parsed = self._parse_data_outputs(data, output_modules) module_name = data["outputs"][0]["name"] raw_results[module_name]["data"].extend(parsed) - data_block = data["clock"]["number"] + return_dict_interface["data_block"] = data["clock"]["number"] if len(parsed) > 0: - parsed = [dict(item, **{'block':data_block}) for item in parsed] + parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed] if return_first_result is True: break if callable(stream_callback): stream_callback(module_name, parsed) - else: - continue - - # Want to refactir the below logic to create a singlepoint of return and provide more consistency - if return_first_result is True: + + if return_first_result is True and parsed: + return_dict_interface["data"] = parsed if return_type == "dict": - return {"data": parsed, "module_name": module_name, "data_block": data_block} - elif return_type == "df": - return pd.DataFrame(parsed) - for output_module in output_modules: - result = SubstreamOutput(module_name=output_module) - data_dict: dict = raw_results.get(output_module) + results = return_dict_interface + if return_type == "df": + results = pd.DataFrame(parsed) + elif raw_results: + result = SubstreamOutput(module_name=output_modules[0]) + data_dict: dict = raw_results.get(output_modules[0]) for k, v in data_dict.items(): df = pd.DataFrame(v) - df["output_module"] = output_module + df["output_module"] = output_modules[0] setattr(result, k, df) results.append(result) - if return_type == "dict": - results = results.to_dict() + if return_type == "dict": + return_dict_interface["data"] = results.to_dict() + results = return_dict_interface + else: + raise Exception("No Valid Data Results Returned by Substream") except Exception as err: - results = {"error": err} + has_error = True + return_dict_interface["error"] = err + results = return_dict_interface return results From 490bc30af6eb87002da1b079de064743bfb4f719 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 27 Jan 2023 14:02:03 -0300 Subject: [PATCH 3/4] Error Handling + replace module_name list with str --- README.md | 18 +++++-------- substreams/substream.py | 57 +++++++++++++++++++---------------------- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 2740b25..43a2f52 100644 --- a/README.md +++ b/README.md @@ -39,33 +39,27 @@ In order to poll the substream, you will need to call the `poll()` function on t print(sb.output_modules) # Poll the module and return a list of SubstreamOutput objects in the order of the specified modules -result = sb.poll(["store_swap_events"], start_block=10000835, end_block=10000835+20000) +result = sb.poll(["map_swap_events"], start_block=10000835, end_block=10000835+20000) ``` With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs - output_modules - - List of strings of output modules to stream + - String of the output module to stream - start_block - Integer block number to start the polling - end_block - Integer block number to end the polling. In theory, there is no max block number as any block number past chain head will stream the blocks in real time. Its recommended to use an end_block far off into the future if building a data app that will be streaming datain real time as blocks finalize, such as block 20,000,000 -- stream_callback - - An optional callback function to be passed into the polling function to execute when valid streamed data is received - return_first_result - Boolean value that if True will return data on the first block after the start block to have an applicable TX/Event. - Can be called recursively on the front end while incrementing the start_block to return data as its streamed rather than all data at once after streaming is completed - Defaults to False - - If True, the data is returned in the format {"data": [], "module_name": String, "data_block": int} - initial_snapshot - Boolean value, defaults to False -- highest_processed_block - - Integer block number that is used in measuring indexing and processing progress, in cases where return_progress is True - - Defaults to 0 -- return_progress: bool = False, - - Boolean value that if True returns progress in back processing - - Defaults to False - +- return_type + - Specifies the type of value to return + - Passing "df" returns the data in a pandas DataFrame + - Passing "dict" returns in the format {"data": [], "module_name": String, "data_block": int, error: str | None} The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing: diff --git a/substreams/substream.py b/substreams/substream.py index 63eb5ba..555d83e 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -125,11 +125,10 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]: for x in snapshot["deltas"].get("deltas", list()) ] - def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]: + def _parse_data_outputs(self, data: dict, module_name: str) -> list[dict]: outputs = list() - module_set = set(module_names) for output in data["outputs"]: - if "mapOutput" not in output or output["name"] not in module_set: + if "mapOutput" not in output or output["name"] != module_name: continue map_output = output["mapOutput"] for key, items in map_output.items(): @@ -163,34 +162,32 @@ def proto_file_map(self) -> dict[str, DescriptorProto]: def poll( self, - output_modules: list[str], + output_module: str, start_block: int, end_block: int, - stream_callback: Optional[callable] = None, return_first_result: bool = False, initial_snapshot: bool = False, return_type: str = "df" ): - module_name = "" - has_error = False - return_dict_interface = {"data": [], "module_name": module_name, "data_block": start_block, "error": None} + return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None} valid_return_types = ["dict", "df"] results = [] raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) try: + if isinstance(output_module, str) is False: + raise Exception("The 'output_module' parameter passed into the poll() function is not a string.") return_type = return_type.lower() if return_type not in valid_return_types: return_type = "df" from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request - for module in output_modules: - if module not in self.output_modules: - raise Exception(f"module '{module}' is not supported for {self.name}") - if self.output_modules[module].get('is_map') is False: - raise Exception(f"module '{module}' is not a map module") - self._class_from_module(module) + if output_module not in self.output_modules: + raise Exception(f"module '{output_module}' is not supported for {self.name}") + if self.output_modules[output_module].get('is_map') is False: + raise Exception(f"module '{output_module}' is not a map module") + self._class_from_module(output_module) stream = self.service.Blocks( Request( @@ -198,8 +195,8 @@ def poll( stop_block_num=end_block, fork_steps=[STEP_IRREVERSIBLE], modules=self.pkg.modules, - output_modules=output_modules, - initial_store_snapshot_for_modules=output_modules + output_modules=[output_module], + initial_store_snapshot_for_modules=[output_module] if initial_snapshot else None, ) @@ -214,21 +211,19 @@ def poll( continue if snapshot: - module_name = snapshot["moduleName"] snapshot_deltas = self._parse_snapshot_deltas(snapshot) - raw_results[module_name]["snapshots"].extend(snapshot_deltas) + raw_results[output_module]["snapshots"].extend(snapshot_deltas) if data: - parsed = self._parse_data_outputs(data, output_modules) - module_name = data["outputs"][0]["name"] - raw_results[module_name]["data"].extend(parsed) + parsed = self._parse_data_outputs(data, output_module) + raw_results[output_module]["data"].extend(parsed) return_dict_interface["data_block"] = data["clock"]["number"] if len(parsed) > 0: parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed] if return_first_result is True: break - if callable(stream_callback): - stream_callback(module_name, parsed) + elif int(return_dict_interface["data_block"]) + 1 == end_block: + results = return_dict_interface if return_first_result is True and parsed: return_dict_interface["data"] = parsed @@ -236,21 +231,21 @@ def poll( results = return_dict_interface if return_type == "df": results = pd.DataFrame(parsed) - elif raw_results: - result = SubstreamOutput(module_name=output_modules[0]) - data_dict: dict = raw_results.get(output_modules[0]) + if return_first_result is False and raw_results: + result = SubstreamOutput(module_name=output_module) + data_dict: dict = raw_results.get(output_module) for k, v in data_dict.items(): df = pd.DataFrame(v) - df["output_module"] = output_modules[0] + df["output_module"] = output_module setattr(result, k, df) results.append(result) if return_type == "dict": return_dict_interface["data"] = results.to_dict() results = return_dict_interface - else: - raise Exception("No Valid Data Results Returned by Substream") except Exception as err: - has_error = True - return_dict_interface["error"] = err + error_to_pass = err + if isinstance(err, Exception): + error_to_pass = str(err) + return_dict_interface["error"] = error_to_pass results = return_dict_interface return results From b8d7cc40e571b87ee3667cbbad5c3dd1070cfdf8 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 31 Jan 2023 09:55:50 -0300 Subject: [PATCH 4/4] Doc correction --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 43a2f52..0630b28 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ In order to poll the substream, you will need to call the `poll()` function on t print(sb.output_modules) # Poll the module and return a list of SubstreamOutput objects in the order of the specified modules -result = sb.poll(["map_swap_events"], start_block=10000835, end_block=10000835+20000) +result = sb.poll("map_swap_events", start_block=10000835, end_block=10000835+20000) ``` With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs