Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "substreams"
version = "0.0.5"
version = "0.0.6"
authors = [
{ name="Ryan Sudhakaran", email="ryan.sudhakaran@messari.io" },
{ name="Michael Carroll", email="michaelcarroll1999@gmail.com" },
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="substreams",
version="0.0.5",
version="0.0.6",
packages=[".substreams"],
author="Ryan Sudhakaran",
author_email="ryan.sudhakaran@messari.io",
Expand Down
112 changes: 40 additions & 72 deletions substreams/substream.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,77 +210,45 @@ def poll(
)
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
results = []
if callable(stream_callback):
# This logic executes a function passed as a parameter on every block data has been streamed
try:
for response in stream:
data = MessageToDict(response.data)
if data:
module_name: str = data["outputs"][0]["name"]
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
if len(parsed) > 0:
stream_callback(module_name, parsed)
except Exception as e:
return {"error": e}
return
elif return_first_result is True:
# This logic indexes all blocks until the first block that data is received
# Ideal for live streaming events and receiveing them on the front end
module_name = ""
parsed = None
data_block = 0
try:
for response in stream:
data = MessageToDict(response.data)
progress = MessageToDict(response.progress)
if data:
data_block = data["clock"]["number"]
module_name: str = data["outputs"][0]["name"]
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name: str = data["outputs"][0]["name"]
if len(parsed) > 0:
data_block = 0
module_name: str = ""
try:
for response in stream:
snapshot = MessageToDict(response.snapshot_data)
data = MessageToDict(response.data)
progress = MessageToDict(response.progress)
if snapshot:
module_name = snapshot["moduleName"]
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
if data:
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name = data["outputs"][0]["name"]
raw_results[module_name]["data"].extend(parsed)
data_block = data["clock"]["number"]
if len(parsed) > 0:
if return_first_result is True:
break
elif progress and return_progress is True:
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
data_block = endBlock
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
return {"block": int(endBlock)}
if callable(stream_callback):
stream_callback(module_name, parsed)
elif progress and return_progress is True:
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
data_block = endBlock
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
return {"block": int(endBlock)}
if return_first_result is True:
return {"data": parsed, "module_name": module_name, "data_block": data_block}
except Exception as e:
return {"error": e}
else:
# This logic indexes all blocks from start_block until end_block
# Returns one single dataframe with all data in rows once all blocks have been indexed
try:
for response in stream:
snapshot = MessageToDict(response.snapshot_data)
data = MessageToDict(response.data)
if snapshot:
module_name: str = snapshot["moduleName"]
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
if data:
if self.output_modules[module]["is_map"]:
parsed = self._parse_data_outputs(data)
else:
parsed = self._parse_data_deltas(data)
module_name: str = data["outputs"][0]["name"]
raw_results[module_name]["data"].extend(parsed)
for output_module in output_modules:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
for k, v in data_dict.items():
df = pd.DataFrame(v)
df["output_module"] = output_module
setattr(result, k, df)
results.append(result)
except Exception as e:
results.append({"error": e})
return results

for output_module in output_modules:
result = SubstreamOutput(module_name=output_module)
data_dict: dict = raw_results.get(output_module)
for k, v in data_dict.items():
df = pd.DataFrame(v)
df["output_module"] = output_module
setattr(result, k, df)
results.append(result)
except Exception as e:
results.append({"error": e})
return results