diff --git a/substreams/substream.py b/substreams/substream.py index f86b96a..fe6e931 100644 --- a/substreams/substream.py +++ b/substreams/substream.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 import base64 -import os +import os, sys import subprocess from collections import defaultdict from dataclasses import dataclass @@ -12,18 +12,19 @@ import pandas as pd from google.protobuf.descriptor_pb2 import DescriptorProto from google.protobuf.json_format import MessageToDict +from importlib import import_module DEFAULT_ENDPOINT = "api.streamingfast.io:443" def retrieve_class(module_name: str, class_name: str): - module = __import__(module_name) + module = import_module(module_name) return getattr(module, class_name) -def generate_pb2_files(spkg_path: str, commands: str) -> None: +def generate_pb2_files(spkg_path: str, commands: str, out_path: str) -> None: command = f""" - alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out=. --grpc_python_out=."; + alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out={out_path} --grpc_python_out={out_path}"; {commands} unalias protogen_py; """ @@ -41,14 +42,17 @@ class SubstreamOutput: class Substream: def __init__( - self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False + self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False, sf_out_dir: str = '.' ): self.token: Optional[str] = os.getenv("SUBSTREAMS_API_TOKEN", None) or token + sf_dir_path = os.path.join(sf_out_dir, 'sf') + if not Path(sf_out_dir).exists(): + os.makedirs(sf_out_dir) if not self.token: raise Exception("Must set SUBSTREAMS_API_TOKEN") if not Path(spkg_path).exists() or not spkg_path.endswith(".spkg"): raise Exception("Must provide a valid .spkg file!") - if not Path("sf/substreams").exists() or regenerate: + if not Path(sf_dir_path).exists() or regenerate: # generate sf/ directory commands = """ protogen_py sf/substreams/v1/substreams.proto; @@ -56,7 +60,9 @@ def __init__( protogen_py sf/substreams/v1/modules.proto; protogen_py sf/substreams/v1/clock.proto; """ - generate_pb2_files(spkg_path, commands) + generate_pb2_files(spkg_path, commands, out_path=sf_out_dir) + + sys.path.append(sf_out_dir) from sf.substreams.v1.package_pb2 import Package from sf.substreams.v1.substreams_pb2_grpc import StreamStub @@ -72,7 +78,7 @@ def __init__( if not file.startswith("sf/") and not file.startswith("google/") ] ) - generate_pb2_files(spkg_path, custom_proto_files) + generate_pb2_files(spkg_path, custom_proto_files, out_path=sf_out_dir) credentials = grpc.composite_channel_credentials( grpc.ssl_channel_credentials(), @@ -95,8 +101,7 @@ def _class_from_module(self, module_name: str): raw_module_path: str = self.proto_file_map.get(output_type) if raw_module_path is None: return None - module_path: str = raw_module_path.split("/")[-1].split(".proto")[0] - pb2_path: str = f"{module_path}_pb2" + pb2_path: str = raw_module_path.replace('.proto', '_pb2').replace('/', '.') return retrieve_class(pb2_path, output_type) def _parse_from_string(self, raw: str, key: str, output_class) -> dict: