From f3bddb4241e0adf53d752d90030be9f4c86e183e Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Dec 2022 14:49:55 -0300 Subject: [PATCH 1/2] Non protobuf module support --- substreams/substream.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/substreams/substream.py b/substreams/substream.py index 3c545a1..3a71418 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -93,21 +93,32 @@ def _class_from_module(self, module_name: str): output_type = raw_output_type raw_module_path: str = self.proto_file_map.get(output_type) + if raw_module_path is None: + return None module_path: str = raw_module_path.split("/")[-1].split(".proto")[0] pb2_path: str = f"{module_path}_pb2" return retrieve_class(pb2_path, output_type) - def _parse_from_string(self, raw: str, output_class) -> dict: + def _parse_from_string(self, raw: str, key: str, output_class) -> dict: decoded: bytes = base64.b64decode(raw) - obj = output_class() - obj.ParseFromString(decoded) - return MessageToDict(obj) + obj = {} + if output_class is None: + # PASS THE VALUE TYPE HERE TO SANITIZE BASE64 DECODE(bigint OR string) + obj["value"] = str(decoded).split("b'")[1].split("'")[0] + if ":" in key: + split_key = key.split(":") + obj[split_key[0]] = split_key[1] + else: + obj = output_class() + obj.ParseFromString(decoded) + obj = MessageToDict(obj) + return obj def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]: module_name: str = snapshot["moduleName"] obj_class = self._class_from_module(module_name) return [ - self._parse_from_string(x["newValue"], obj_class) + self._parse_from_string(x["newValue"], x["key"], obj_class) for x in snapshot["deltas"].get("deltas", list()) ] @@ -121,7 +132,8 @@ def _parse_data_deltas(self, data: dict) -> list[dict]: raw_deltas = store_deltas["deltas"] for delta in raw_deltas: raw = delta["newValue"] - d = self._parse_from_string(raw, obj_class) + key = delta["key"] + d = self._parse_from_string(raw, key, obj_class) d.update(data["clock"]) deltas.append(d) return deltas @@ -158,7 +170,6 @@ def poll(self, output_modules: list[str], start_block: int, end_block: int): for module in output_modules: if module not in self.output_modules: raise Exception(f"module '{module}' is not supported for {self.name}") - stream = self.service.Blocks( Request( start_block_num=start_block, @@ -178,10 +189,11 @@ def poll(self, output_modules: list[str], start_block: int, end_block: int): snapshot_deltas = self._parse_snapshot_deltas(snapshot) raw_results[module_name]["snapshots"].extend(snapshot_deltas) if data: + print('data block #', data["clock"]["number"]) module_name: str = data["outputs"][0]["name"] data_deltas = self._parse_data_deltas(data) raw_results[module_name]["data"].extend(data_deltas) - + print('FINISH STREAM') results = [] for output_module in output_modules: result = SubstreamOutput(module_name=output_module) From 4639f478e8934c606adb717228e75e262d62983a Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 13 Dec 2022 15:34:47 -0300 Subject: [PATCH 2/2] version and metadata --- pyproject.toml | 3 ++- setup.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9594ead..4dd6144 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,10 @@ build-backend = "hatchling.build" [project] name = "substreams" -version = "0.0.2" +version = "0.0.3" authors = [ { name="Ryan Sudhakaran", email="ryan.sudhakaran@messari.io" }, + { name="Michael Carroll", email="michaelcarroll1999@gmail.com} ] description = "WIP Substreams Python Adapter" readme = "README.md" diff --git a/setup.py b/setup.py index 8b7d9fe..cf6596f 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="substreams", - version="0.0.2", + version="0.0.3", packages=[".substreams"], author="Ryan Sudhakaran", author_email="ryan.sudhakaran@messari.io",