From fd76d77e97764f8b2470657c3be742d9d64c2d1a Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 13 Feb 2023 12:19:13 -0300 Subject: [PATCH] Return filter func and csv type --- README.md | 6 ++++++ substreams/substream.py | 23 ++++++++++++++++++----- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 0630b28..e8aa1d3 100644 --- a/README.md +++ b/README.md @@ -56,10 +56,16 @@ With the default inputs, this function outputs Pandas Dataframes after streaming - Defaults to False - initial_snapshot - Boolean value, defaults to False +- return_first_result_function + - Custom filtering function that accepts parsed data passed as an argument and returns as either True or False + - Gets called when return_first_result is True and a block has applicable events/txs + - If function resolves to True, the polling function returns the data from the block + - If function resolves to False, the polling function continues iteration - return_type - Specifies the type of value to return - Passing "df" returns the data in a pandas DataFrame - Passing "dict" returns in the format {"data": [], "module_name": String, "data_block": int, error: str | None} + - Passing "csv" returns in the format {"data": String(CSV), "module_name": String, "data_block": int, error: str | None} The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing: diff --git a/substreams/substream.py b/substreams/substream.py index 555d83e..7a82875 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -166,12 +166,13 @@ def poll( start_block: int, end_block: int, return_first_result: bool = False, + return_first_result_function: Optional[callable] = None, initial_snapshot: bool = False, return_type: str = "df" ): return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None} - valid_return_types = ["dict", "df"] + valid_return_types = ["dict", "df", "csv"] results = [] raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) @@ -221,16 +222,25 @@ def poll( if len(parsed) > 0: parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed] if return_first_result is True: - break + if callable(return_first_result_function): + func_result = return_first_result_function(parsed) + if func_result is True: + break + else: + continue + else: + break elif int(return_dict_interface["data_block"]) + 1 == end_block: results = return_dict_interface if return_first_result is True and parsed: - return_dict_interface["data"] = parsed if return_type == "dict": - results = return_dict_interface + return_dict_interface["data"] = parsed if return_type == "df": - results = pd.DataFrame(parsed) + return_dict_interface["data"] = pd.DataFrame(parsed) + if return_type == "csv": + return_dict_interface["data"] = pd.DataFrame(parsed).to_csv(index=False) + results = return_dict_interface if return_first_result is False and raw_results: result = SubstreamOutput(module_name=output_module) data_dict: dict = raw_results.get(output_module) @@ -242,6 +252,9 @@ def poll( if return_type == "dict": return_dict_interface["data"] = results.to_dict() results = return_dict_interface + if return_type == "csv": + return_dict_interface["data"] = results.to_csv(index=False) + results = return_dict_interface except Exception as err: error_to_pass = err if isinstance(err, Exception):