Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions substreams/substream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
import base64
import os
import os, sys
import subprocess
from collections import defaultdict
from dataclasses import dataclass
Expand All @@ -12,18 +12,19 @@
import pandas as pd
from google.protobuf.descriptor_pb2 import DescriptorProto
from google.protobuf.json_format import MessageToDict
from importlib import import_module

DEFAULT_ENDPOINT = "api.streamingfast.io:443"


def retrieve_class(module_name: str, class_name: str):
module = __import__(module_name)
module = import_module(module_name)
return getattr(module, class_name)


def generate_pb2_files(spkg_path: str, commands: str) -> None:
def generate_pb2_files(spkg_path: str, commands: str, out_path: str) -> None:
command = f"""
alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out=. --grpc_python_out=.";
alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out={out_path} --grpc_python_out={out_path}";
{commands}
unalias protogen_py;
"""
Expand All @@ -41,22 +42,27 @@ class SubstreamOutput:

class Substream:
def __init__(
self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False
self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False, sf_out_dir: str = '.'
):
self.token: Optional[str] = os.getenv("SUBSTREAMS_API_TOKEN", None) or token
sf_dir_path = os.path.join(sf_out_dir, 'sf')
if not Path(sf_out_dir).exists():
os.makedirs(sf_out_dir)
if not self.token:
raise Exception("Must set SUBSTREAMS_API_TOKEN")
if not Path(spkg_path).exists() or not spkg_path.endswith(".spkg"):
raise Exception("Must provide a valid .spkg file!")
if not Path("sf/substreams").exists() or regenerate:
if not Path(sf_dir_path).exists() or regenerate:
# generate sf/ directory
commands = """
protogen_py sf/substreams/v1/substreams.proto;
protogen_py sf/substreams/v1/package.proto;
protogen_py sf/substreams/v1/modules.proto;
protogen_py sf/substreams/v1/clock.proto;
"""
generate_pb2_files(spkg_path, commands)
generate_pb2_files(spkg_path, commands, out_path=sf_out_dir)

sys.path.append(sf_out_dir)

from sf.substreams.v1.package_pb2 import Package
from sf.substreams.v1.substreams_pb2_grpc import StreamStub
Expand All @@ -72,7 +78,7 @@ def __init__(
if not file.startswith("sf/") and not file.startswith("google/")
]
)
generate_pb2_files(spkg_path, custom_proto_files)
generate_pb2_files(spkg_path, custom_proto_files, out_path=sf_out_dir)

credentials = grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(),
Expand All @@ -95,8 +101,7 @@ def _class_from_module(self, module_name: str):
raw_module_path: str = self.proto_file_map.get(output_type)
if raw_module_path is None:
return None
module_path: str = raw_module_path.split("/")[-1].split(".proto")[0]
pb2_path: str = f"{module_path}_pb2"
pb2_path: str = raw_module_path.replace('.proto', '_pb2').replace('/', '.')
return retrieve_class(pb2_path, output_type)

def _parse_from_string(self, raw: str, key: str, output_class) -> dict:
Expand Down