Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,27 @@ In order to poll the substream, you will need to call the `poll()` function on t
print(sb.output_modules)

# Poll the module and return a list of SubstreamOutput objects in the order of the specified modules
result = sb.poll(["store_swap_events"], start_block=10000835, end_block=10000835+20000)
result = sb.poll("map_swap_events", start_block=10000835, end_block=10000835+20000)
```

With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs

- output_modules
- List of strings of output modules to stream
- String of the output module to stream
- start_block
- Integer block number to start the polling
- end_block
- Integer block number to end the polling. In theory, there is no max block number as any block number past chain head will stream the blocks in real time. Its recommended to use an end_block far off into the future if building a data app that will be streaming datain real time as blocks finalize, such as block 20,000,000
- stream_callback
- An optional callback function to be passed into the polling function to execute when valid streamed data is received
- return_first_result
- Boolean value that if True will return data on the first block after the start block to have an applicable TX/Event.
- Can be called recursively on the front end while incrementing the start_block to return data as its streamed rather than all data at once after streaming is completed
- Defaults to False
- If True, the data is returned in the format {"data": [], "module_name": String, "data_block": int}
- initial_snapshot
- Boolean value, defaults to False
- highest_processed_block
- Integer block number that is used in measuring indexing and processing progress, in cases where return_progress is True
- Defaults to 0
- return_progress: bool = False,
- Boolean value that if True returns progress in back processing
- Defaults to False

- return_type
- Specifies the type of value to return
- Passing "df" returns the data in a pandas DataFrame
- Passing "dict" returns in the format {"data": [], "module_name": String, "data_block": int, error: str | None}

The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing:

Expand Down
111 changes: 57 additions & 54 deletions substreams/substream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

DEFAULT_ENDPOINT = "api.streamingfast.io:443"


def retrieve_class(module_name: str, class_name: str):
module = import_module(module_name)
return getattr(module, class_name)
Expand Down Expand Up @@ -126,11 +125,10 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]:
for x in snapshot["deltas"].get("deltas", list())
]

def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]:
def _parse_data_outputs(self, data: dict, module_name: str) -> list[dict]:
outputs = list()
module_set = set(module_names)
for output in data["outputs"]:
if "mapOutput" not in output or output["name"] not in module_set:
if "mapOutput" not in output or output["name"] != module_name:
continue
map_output = output["mapOutput"]
for key, items in map_output.items():
Expand Down Expand Up @@ -164,85 +162,90 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:

def poll(
self,
output_modules: list[str],
output_module: str,
start_block: int,
end_block: int,
stream_callback: Optional[callable] = None,
return_first_result: bool = False,
initial_snapshot: bool = False,
highest_processed_block: int = 0,
return_progress: bool = False,
return_type: str = "df"
):
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
for module in output_modules:
if module not in self.output_modules:
raise Exception(f"module '{module}' is not supported for {self.name}")
if self.output_modules[module].get('is_map') is False:
raise Exception(f"module '{module}' is not a map module")
self._class_from_module(module)

stream = self.service.Blocks(
Request(
start_block_num=start_block,
stop_block_num=end_block,
fork_steps=[STEP_IRREVERSIBLE],
modules=self.pkg.modules,
output_modules=output_modules,
initial_store_snapshot_for_modules=output_modules
if initial_snapshot
else None,
)
)
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})

return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None}
valid_return_types = ["dict", "df"]
results = []
data_block = 0
module_name = ""
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})

try:
if isinstance(output_module, str) is False:
raise Exception("The 'output_module' parameter passed into the poll() function is not a string.")
return_type = return_type.lower()
if return_type not in valid_return_types:
return_type = "df"

from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
if output_module not in self.output_modules:
raise Exception(f"module '{output_module}' is not supported for {self.name}")
if self.output_modules[output_module].get('is_map') is False:
raise Exception(f"module '{output_module}' is not a map module")
self._class_from_module(output_module)

stream = self.service.Blocks(
Request(
start_block_num=start_block,
stop_block_num=end_block,
fork_steps=[STEP_IRREVERSIBLE],
modules=self.pkg.modules,
output_modules=[output_module],
initial_store_snapshot_for_modules=[output_module]
if initial_snapshot
else None,
)
)

for response in stream:
snapshot = MessageToDict(response.snapshot_data)
data = MessageToDict(response.data)
progress = MessageToDict(response.progress)
session = MessageToDict(response.session)

if session:
continue

if snapshot:
module_name = snapshot["moduleName"]
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
raw_results[output_module]["snapshots"].extend(snapshot_deltas)

if data:
parsed = self._parse_data_outputs(data, output_modules)
module_name = data["outputs"][0]["name"]
raw_results[module_name]["data"].extend(parsed)
data_block = data["clock"]["number"]
parsed = self._parse_data_outputs(data, output_module)
raw_results[output_module]["data"].extend(parsed)
return_dict_interface["data_block"] = data["clock"]["number"]
if len(parsed) > 0:
parsed = [dict(item, **{'block':data_block}) for item in parsed]
parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed]
if return_first_result is True:
break
if callable(stream_callback):
stream_callback(module_name, parsed)
else:
continue
elif progress and return_progress is True:
if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]:
continue
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
data_block = endBlock
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
return {"block": int(endBlock)}
if return_first_result is True:
return {"data": parsed, "module_name": module_name, "data_block": data_block}
for output_module in output_modules:
elif int(return_dict_interface["data_block"]) + 1 == end_block:
results = return_dict_interface

if return_first_result is True and parsed:
return_dict_interface["data"] = parsed
if return_type == "dict":
results = return_dict_interface
if return_type == "df":
results = pd.DataFrame(parsed)
if return_first_result is False and raw_results:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
for k, v in data_dict.items():
df = pd.DataFrame(v)
df["output_module"] = output_module
setattr(result, k, df)
results.append(result)
if return_type == "dict":
return_dict_interface["data"] = results.to_dict()
results = return_dict_interface
except Exception as err:
results = {"error": err}
error_to_pass = err
if isinstance(err, Exception):
error_to_pass = str(err)
return_dict_interface["error"] = error_to_pass
results = return_dict_interface
return results