|
16 | 16 |
|
17 | 17 | DEFAULT_ENDPOINT = "api.streamingfast.io:443"
|
18 | 18 |
|
19 |
| - |
20 | 19 | def retrieve_class(module_name: str, class_name: str):
|
21 | 20 | module = import_module(module_name)
|
22 | 21 | return getattr(module, class_name)
|
@@ -126,11 +125,10 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]:
|
126 | 125 | for x in snapshot["deltas"].get("deltas", list())
|
127 | 126 | ]
|
128 | 127 |
|
129 |
| - def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]: |
| 128 | + def _parse_data_outputs(self, data: dict, module_name: str) -> list[dict]: |
130 | 129 | outputs = list()
|
131 |
| - module_set = set(module_names) |
132 | 130 | for output in data["outputs"]:
|
133 |
| - if "mapOutput" not in output or output["name"] not in module_set: |
| 131 | + if "mapOutput" not in output or output["name"] != module_name: |
134 | 132 | continue
|
135 | 133 | map_output = output["mapOutput"]
|
136 | 134 | for key, items in map_output.items():
|
@@ -164,85 +162,90 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
|
164 | 162 |
|
165 | 163 | def poll(
|
166 | 164 | self,
|
167 |
| - output_modules: list[str], |
| 165 | + output_module: str, |
168 | 166 | start_block: int,
|
169 | 167 | end_block: int,
|
170 |
| - stream_callback: Optional[callable] = None, |
171 | 168 | return_first_result: bool = False,
|
172 | 169 | initial_snapshot: bool = False,
|
173 |
| - highest_processed_block: int = 0, |
174 |
| - return_progress: bool = False, |
| 170 | + return_type: str = "df" |
175 | 171 | ):
|
176 |
| - from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request |
177 |
| - for module in output_modules: |
178 |
| - if module not in self.output_modules: |
179 |
| - raise Exception(f"module '{module}' is not supported for {self.name}") |
180 |
| - if self.output_modules[module].get('is_map') is False: |
181 |
| - raise Exception(f"module '{module}' is not a map module") |
182 |
| - self._class_from_module(module) |
183 |
| - |
184 |
| - stream = self.service.Blocks( |
185 |
| - Request( |
186 |
| - start_block_num=start_block, |
187 |
| - stop_block_num=end_block, |
188 |
| - fork_steps=[STEP_IRREVERSIBLE], |
189 |
| - modules=self.pkg.modules, |
190 |
| - output_modules=output_modules, |
191 |
| - initial_store_snapshot_for_modules=output_modules |
192 |
| - if initial_snapshot |
193 |
| - else None, |
194 |
| - ) |
195 |
| - ) |
196 |
| - raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) |
| 172 | + |
| 173 | + return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None} |
| 174 | + valid_return_types = ["dict", "df"] |
197 | 175 | results = []
|
198 |
| - data_block = 0 |
199 |
| - module_name = "" |
| 176 | + raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) |
200 | 177 |
|
201 | 178 | try:
|
| 179 | + if isinstance(output_module, str) is False: |
| 180 | + raise Exception("The 'output_module' parameter passed into the poll() function is not a string.") |
| 181 | + return_type = return_type.lower() |
| 182 | + if return_type not in valid_return_types: |
| 183 | + return_type = "df" |
| 184 | + |
| 185 | + from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request |
| 186 | + if output_module not in self.output_modules: |
| 187 | + raise Exception(f"module '{output_module}' is not supported for {self.name}") |
| 188 | + if self.output_modules[output_module].get('is_map') is False: |
| 189 | + raise Exception(f"module '{output_module}' is not a map module") |
| 190 | + self._class_from_module(output_module) |
| 191 | + |
| 192 | + stream = self.service.Blocks( |
| 193 | + Request( |
| 194 | + start_block_num=start_block, |
| 195 | + stop_block_num=end_block, |
| 196 | + fork_steps=[STEP_IRREVERSIBLE], |
| 197 | + modules=self.pkg.modules, |
| 198 | + output_modules=[output_module], |
| 199 | + initial_store_snapshot_for_modules=[output_module] |
| 200 | + if initial_snapshot |
| 201 | + else None, |
| 202 | + ) |
| 203 | + ) |
| 204 | + |
202 | 205 | for response in stream:
|
203 | 206 | snapshot = MessageToDict(response.snapshot_data)
|
204 | 207 | data = MessageToDict(response.data)
|
205 |
| - progress = MessageToDict(response.progress) |
206 | 208 | session = MessageToDict(response.session)
|
207 | 209 |
|
208 | 210 | if session:
|
209 | 211 | continue
|
210 | 212 |
|
211 | 213 | if snapshot:
|
212 |
| - module_name = snapshot["moduleName"] |
213 | 214 | snapshot_deltas = self._parse_snapshot_deltas(snapshot)
|
214 |
| - raw_results[module_name]["snapshots"].extend(snapshot_deltas) |
| 215 | + raw_results[output_module]["snapshots"].extend(snapshot_deltas) |
215 | 216 |
|
216 | 217 | if data:
|
217 |
| - parsed = self._parse_data_outputs(data, output_modules) |
218 |
| - module_name = data["outputs"][0]["name"] |
219 |
| - raw_results[module_name]["data"].extend(parsed) |
220 |
| - data_block = data["clock"]["number"] |
| 218 | + parsed = self._parse_data_outputs(data, output_module) |
| 219 | + raw_results[output_module]["data"].extend(parsed) |
| 220 | + return_dict_interface["data_block"] = data["clock"]["number"] |
221 | 221 | if len(parsed) > 0:
|
222 |
| - parsed = [dict(item, **{'block':data_block}) for item in parsed] |
| 222 | + parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed] |
223 | 223 | if return_first_result is True:
|
224 | 224 | break
|
225 |
| - if callable(stream_callback): |
226 |
| - stream_callback(module_name, parsed) |
227 |
| - else: |
228 |
| - continue |
229 |
| - elif progress and return_progress is True: |
230 |
| - if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]: |
231 |
| - continue |
232 |
| - endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) |
233 |
| - data_block = endBlock |
234 |
| - if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: |
235 |
| - return {"block": int(endBlock)} |
236 |
| - if return_first_result is True: |
237 |
| - return {"data": parsed, "module_name": module_name, "data_block": data_block} |
238 |
| - for output_module in output_modules: |
| 225 | + elif int(return_dict_interface["data_block"]) + 1 == end_block: |
| 226 | + results = return_dict_interface |
| 227 | + |
| 228 | + if return_first_result is True and parsed: |
| 229 | + return_dict_interface["data"] = parsed |
| 230 | + if return_type == "dict": |
| 231 | + results = return_dict_interface |
| 232 | + if return_type == "df": |
| 233 | + results = pd.DataFrame(parsed) |
| 234 | + if return_first_result is False and raw_results: |
239 | 235 | result = SubstreamOutput(module_name=output_module)
|
240 | 236 | data_dict: dict = raw_results.get(output_module)
|
241 | 237 | for k, v in data_dict.items():
|
242 | 238 | df = pd.DataFrame(v)
|
243 | 239 | df["output_module"] = output_module
|
244 | 240 | setattr(result, k, df)
|
245 | 241 | results.append(result)
|
| 242 | + if return_type == "dict": |
| 243 | + return_dict_interface["data"] = results.to_dict() |
| 244 | + results = return_dict_interface |
246 | 245 | except Exception as err:
|
247 |
| - results = {"error": err} |
| 246 | + error_to_pass = err |
| 247 | + if isinstance(err, Exception): |
| 248 | + error_to_pass = str(err) |
| 249 | + return_dict_interface["error"] = error_to_pass |
| 250 | + results = return_dict_interface |
248 | 251 | return results
|
0 commit comments