Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ With the default inputs, this function outputs Pandas Dataframes after streaming
- Defaults to False
- initial_snapshot
- Boolean value, defaults to False
- return_first_result_function
- Custom filtering function that accepts parsed data passed as an argument and returns as either True or False
- Gets called when return_first_result is True and a block has applicable events/txs
- If function resolves to True, the polling function returns the data from the block
- If function resolves to False, the polling function continues iteration
- return_type
- Specifies the type of value to return
- Passing "df" returns the data in a pandas DataFrame
- Passing "dict" returns in the format {"data": [], "module_name": String, "data_block": int, error: str | None}
- Passing "csv" returns in the format {"data": String(CSV), "module_name": String, "data_block": int, error: str | None}

The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing:

Expand Down
23 changes: 18 additions & 5 deletions substreams/substream.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ def poll(
start_block: int,
end_block: int,
return_first_result: bool = False,
return_first_result_function: Optional[callable] = None,
initial_snapshot: bool = False,
return_type: str = "df"
):

return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None}
valid_return_types = ["dict", "df"]
valid_return_types = ["dict", "df", "csv"]
results = []
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})

Expand Down Expand Up @@ -221,16 +222,25 @@ def poll(
if len(parsed) > 0:
parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed]
if return_first_result is True:
break
if callable(return_first_result_function):
func_result = return_first_result_function(parsed)
if func_result is True:
break
else:
continue
else:
break
elif int(return_dict_interface["data_block"]) + 1 == end_block:
results = return_dict_interface

if return_first_result is True and parsed:
return_dict_interface["data"] = parsed
if return_type == "dict":
results = return_dict_interface
return_dict_interface["data"] = parsed
if return_type == "df":
results = pd.DataFrame(parsed)
return_dict_interface["data"] = pd.DataFrame(parsed)
if return_type == "csv":
return_dict_interface["data"] = pd.DataFrame(parsed).to_csv(index=False)
results = return_dict_interface
if return_first_result is False and raw_results:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
Expand All @@ -242,6 +252,9 @@ def poll(
if return_type == "dict":
return_dict_interface["data"] = results.to_dict()
results = return_dict_interface
if return_type == "csv":
return_dict_interface["data"] = results.to_csv(index=False)
results = return_dict_interface
except Exception as err:
error_to_pass = err
if isinstance(err, Exception):
Expand Down