Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 78 additions & 28 deletions substreams/substream.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def _parse_from_string(self, raw: str, key: str, output_class) -> dict:
decoded: bytes = base64.b64decode(raw)
obj = {}
if output_class is None:
# PASS THE VALUE TYPE HERE TO SANITIZE BASE64 DECODE(bigint OR string)
obj["value"] = str(decoded).split("b'")[1].split("'")[0]
if ":" in key:
split_key = key.split(":")
Expand Down Expand Up @@ -174,15 +173,17 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
name_map[mt.name] = pf.name
return name_map

# TODO how do I type annotate this stuff?
def poll(
self,
output_modules: list[str],
start_block: int,
end_block: int,
stream_callback=None,
return_first_result=False,
initial_snapshot=False,
highest_processed_block: int = 0,
return_progress=False
):
# TODO make this general
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request

for module in output_modules:
Expand All @@ -203,29 +204,78 @@ def poll(
)
)
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
for response in stream:
snapshot = MessageToDict(response.snapshot_data)
data = MessageToDict(response.data)
if snapshot:
module_name: str = snapshot["moduleName"]
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
if data:
print("data block #", data["clock"]["number"])
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name: str = data["outputs"][0]["name"]
raw_results[module_name]["data"].extend(parsed)

results = []
for output_module in output_modules:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
for k, v in data_dict.items():
df = pd.DataFrame(v)
df["output_module"] = output_module
setattr(result, k, df)
results.append(result)
return results
if callable(stream_callback):
# This logic executes a function passed as a parameter on every block data has been streamed
try:
for response in stream:
data = MessageToDict(response.data)
if data:
module_name: str = data["outputs"][0]["name"]
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
if len(parsed) > 0:
stream_callback(module_name, parsed)
except Exception as e:
return {"error": e}
return
elif return_first_result is True:
# This logic indexes all blocks until the first block that data is received
# Ideal for live streaming events and receiveing them on the front end
module_name = ""
parsed = None
data_block = 0
try:
for response in stream:
data = MessageToDict(response.data)
progress = MessageToDict(response.progress)
if data:
data_block = data["clock"]["number"]
module_name: str = data["outputs"][0]["name"]
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name: str = data["outputs"][0]["name"]
if len(parsed) > 0:
break
elif progress and return_progress is True:
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
data_block = endBlock
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
return {"block": int(endBlock)}
return {"data": parsed, "module_name": module_name, "data_block": data_block}
except Exception as e:
return {"error": e}
else:
# This logic indexes all blocks from start_block until end_block
# Returns one single dataframe with all data in rows once all blocks have been indexed
try:
for response in stream:
snapshot = MessageToDict(response.snapshot_data)
data = MessageToDict(response.data)
if snapshot:
module_name: str = snapshot["moduleName"]
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
if data:
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name: str = data["outputs"][0]["name"]
raw_results[module_name]["data"].extend(parsed)
for output_module in output_modules:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
for k, v in data_dict.items():
df = pd.DataFrame(v)
df["output_module"] = output_module
setattr(result, k, df)
results.append(result)
except Exception as e:
results.append({"error": e})
return results