/
stream_wrapper.py
415 lines (317 loc) · 12.7 KB
/
stream_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import functools
import inspect
import logging
import typing
from abc import ABC
from abc import abstractmethod
import mrc
import morpheus.pipeline as _pipeline
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.atomic_integer import AtomicInteger
from morpheus.utils.type_utils import _DecoratorType
logger = logging.getLogger(__name__)
def _save_init_vals(func: _DecoratorType) -> _DecoratorType:
# Save the signature only once
sig = inspect.signature(func, follow_wrapped=True)
@functools.wraps(func)
def inner(self: "StreamWrapper", *args, **kwargs):
# Actually call init first. This way any super classes strings will be overridden
func(self, *args, **kwargs)
# Determine all set values
bound = sig.bind(self, *args, **kwargs)
bound.apply_defaults()
init_pairs = []
for key, val in bound.arguments.items():
# We really dont care about these
if (key == "self" or sig.parameters[key].annotation == Config):
continue
init_pairs.append(f"{key}={val}")
# Save values on self
self._init_str = ", ".join(init_pairs)
return
return typing.cast(_DecoratorType, inner)
class StreamWrapper(ABC, collections.abc.Hashable):
"""
This abstract class serves as the morpheus pipeline's base class. This class wraps a `mrc.SegmentObject`
object and aids in hooking stages up together.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
"""
__ID_COUNTER = AtomicInteger(0)
def __init__(self, c: Config):
# Save the config
self._config = c
self._id = StreamWrapper.__ID_COUNTER.get_and_inc()
self._pipeline: _pipeline.Pipeline = None
self._init_str: str = "" # Stores the initialization parameters used for creation. Needed for __repr__
# Indicates whether or not this wrapper has been built. Can only be built once
self._is_built = False
# Input/Output ports used for connecting stages
self._input_ports: typing.List[_pipeline.Receiver] = []
self._output_ports: typing.List[_pipeline.Sender] = []
# Mapping of {`column_name`: `TyepId`}
self._needed_columns = collections.OrderedDict()
def __init_subclass__(cls) -> None:
# Wrap __init__ to save the arg values
cls.__init__ = _save_init_vals(cls.__init__)
return super().__init_subclass__()
def __hash__(self) -> int:
return self._id
def __str__(self):
text = f"<{self.unique_name}; {self.__class__.__name__}({self._init_str})>"
return text
__repr__ = __str__
@property
@abstractmethod
def name(self) -> str:
"""
The name of the stage. Used in logging. Each derived class should override this property with a unique
name.
Returns
-------
str
Name of a stage.
"""
pass
@property
def unique_name(self) -> str:
"""
Unique name of stage. Generated by appending stage id to stage name.
Returns
-------
str
Unique name of stage.
"""
return f"{self.name}-{self._id}"
@property
def is_built(self) -> bool:
"""
Indicates if this stage has been built.
Returns
-------
bool
True if stage is built, False otherwise.
"""
return self._is_built
@property
def input_ports(self) -> typing.List[_pipeline.Receiver]:
"""Input ports to this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.Receiver`]
Input ports to this stage.
"""
return self._input_ports
@property
def output_ports(self) -> typing.List[_pipeline.Sender]:
"""
Output ports from this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.Sender`]
Output ports from this stage.
"""
return self._output_ports
@property
def has_multi_input_ports(self) -> bool:
"""
Indicates if this stage has multiple input ports.
Returns
-------
bool
True if stage has multiple input ports, False otherwise.
"""
return len(self._input_ports) > 1
@property
def has_multi_output_ports(self) -> bool:
"""
Indicates if this stage has multiple output ports.
Returns
-------
bool
True if stage has multiple output ports, False otherwise.
"""
return len(self._output_ports) > 1
def get_all_inputs(self) -> typing.List[_pipeline.Sender]:
"""
Get all input senders to this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.Sender`]
All input senders.
"""
senders = []
for in_port in self._input_ports:
senders.extend(in_port._input_senders)
return senders
def get_all_input_stages(self) -> typing.List["StreamWrapper"]:
"""
Get all input stages to this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.StreamWrapper`]
All input stages.
"""
return [x.parent for x in self.get_all_inputs()]
def get_all_outputs(self) -> typing.List[_pipeline.Receiver]:
"""
Get all output receivers from this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.Receiver`]
All output receivers.
"""
receivers = []
for out_port in self._output_ports:
receivers.extend(out_port._output_receivers)
return receivers
def get_all_output_stages(self) -> typing.List["StreamWrapper"]:
"""
Get all output stages from this stage.
Returns
-------
typing.List[`morpheus.pipeline.pipeline.StreamWrapper`]
All output stages.
"""
return [x.parent for x in self.get_all_outputs()]
@abstractmethod
def supports_cpp_node(self):
"""
Specifies whether this Stage is capable of creating C++ nodes. During the build phase, this value will be
combined with `CppConfig.get_should_use_cpp()` to determine whether or not a C++ node is created. This is an
instance method to allow runtime decisions and derived classes to override base implementations.
"""
# By default, return False unless otherwise specified
# return False
pass
def _build_cpp_node(self):
"""
Specifies whether or not to build a C++ node. Only should be called during the build phase.
"""
return CppConfig.get_should_use_cpp() and self.supports_cpp_node()
def can_build(self, check_ports=False) -> bool:
"""
Determines if all inputs have been built allowing this node to be built.
Parameters
----------
check_ports : bool, optional
Check if we can build based on the input ports, by default False.
Returns
-------
bool
True if we can build, False otherwise.
"""
# Can only build once
if (self.is_built):
return False
if (not check_ports):
# We can build if all input stages have been built. Easy and quick check. Works for non-circular pipelines
for in_stage in self.get_all_input_stages():
if (not in_stage.is_built):
return False
return True
else:
# Check if we can build based on the input ports. We can build
for r in self.input_ports:
if (not r.is_partial):
return False
return True
def build(self, builder: mrc.Builder, do_propagate=True):
"""Build this stage.
Parameters
----------
builder : `mrc.Builder`
MRC segment for this stage.
do_propagate : bool, optional
Whether to propagate to build output stages, by default True.
"""
assert not self.is_built, "Can only build stages once!"
assert self._pipeline is not None, "Must be attached to a pipeline before building!"
# Pre-Build returns the input pairs for each port
in_ports_pairs = self._pre_build(builder=builder)
out_ports_pair = self._build(builder=builder, in_ports_streams=in_ports_pairs)
# Allow stages to do any post build steps (i.e., for sinks, or timing functions)
out_ports_pair = self._post_build(builder=builder, out_ports_pair=out_ports_pair)
assert len(out_ports_pair) == len(self.output_ports), \
"Build must return same number of output pairs as output ports"
# Assign the output ports
for port_idx, out_pair in enumerate(out_ports_pair):
self.output_ports[port_idx]._out_stream_pair = out_pair
self._is_built = True
if (not do_propagate):
return
# Now build for any dependents
for dep in self.get_all_output_stages():
if (not dep.can_build()):
continue
dep.build(builder, do_propagate=do_propagate)
def _pre_build(self, builder: mrc.Builder) -> typing.List[StreamPair]:
in_pairs: typing.List[StreamPair] = [x.get_input_pair(builder=builder) for x in self.input_ports]
return in_pairs
@abstractmethod
def _build(self, builder: mrc.Builder, in_ports_streams: typing.List[StreamPair]) -> typing.List[StreamPair]:
"""
This function is responsible for constructing this stage's internal `mrc.SegmentObject` object. The input
of this function contains the returned value from the upstream stage.
The input values are the `mrc.Builder` for this stage and a `StreamPair` tuple which contain the input
`mrc.SegmentObject` object and the message data type.
:meta public:
Parameters
----------
builder : `mrc.Builder`
`mrc.Builder` object for the pipeline. This should be used to construct/attach the internal
`mrc.SegmentObject`.
in_ports_streams : `morpheus.pipeline.pipeline.StreamPair`
List of tuples containing the input `mrc.SegmentObject` object and the message data type.
Returns
-------
`typing.List[morpheus.pipeline.pipeline.StreamPair]`
List of tuples containing the output `mrc.SegmentObject` object from this stage and the message data type.
"""
pass
def _post_build(self, builder: mrc.Builder, out_ports_pair: typing.List[StreamPair]) -> typing.List[StreamPair]:
return out_ports_pair
def _start(self):
pass
def stop(self):
"""
Stages can implement this to perform cleanup steps when pipeline is stopped.
"""
pass
async def join(self):
"""
Awaitable method that stages can implement this to perform cleanup steps when pipeline is stopped.
Typically this is called after `stop` during a graceful shutdown, but may not be called if the pipeline is
terminated.
"""
pass
def _create_ports(self, input_count: int, output_count: int):
assert len(self._input_ports) == 0 and len(self._output_ports) == 0, "Can only create ports once!"
self._input_ports = [_pipeline.Receiver(parent=self, port_number=i) for i in range(input_count)]
self._output_ports = [_pipeline.Sender(parent=self, port_number=i) for i in range(output_count)]
def get_needed_columns(self):
"""
Stages which need to have columns inserted into the dataframe, should populate the `self._needed_columns`
dictionary with mapping of column names to `morpheus.common.TypeId`. This will ensure that the columns are
allocated and populated with null values.
"""
return self._needed_columns.copy()