/
mutate.py
195 lines (157 loc) · 6.02 KB
/
mutate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from inspect import getsourcelines
import numpy as np
from merlin.core.dispatch import DataFrameType
from merlin.schema import ColumnSchema
from merlin.schema import Schema
from nvtabular.ops.operator import ColumnSelector
from nvtabular.ops.operator import Operator
from morpheus.utils.nvt.decorators import annotate
class MutateOp(Operator):
def __init__(self,
func: typing.Callable,
output_columns: typing.Optional[typing.List] = None,
dependencies: typing.Optional[typing.List] = None,
label: typing.Optional[str] = None):
"""
Initialize MutateOp class.
Parameters
----------
func : Callable
Function to perform mutation operation.
output_columns : Optional[List], optional
List of output columns, by default None.
dependencies : Optional[List], optional
List of dependencies, by default None.
label : Optional[str], optional
Label for MutateOp, by default None.
"""
super().__init__()
self._dependencies = dependencies or []
self._func = func
self._label = label
self._output_columns = output_columns or []
def _remove_deps(self, column_selector: ColumnSelector):
"""
Remove dependencies from column selector.
Parameters
----------
column_selector : ColumnSelector
Instance of ColumnSelector from which dependencies will be removed.
Returns
-------
ColumnSelector
Updated instance of ColumnSelector.
"""
to_skip = ColumnSelector(
[dep if isinstance(dep, str) else dep.output_schema.column_names for dep in self._dependencies])
return column_selector.filter_columns(to_skip)
@property
def label(self):
"""
Get the label of the MutateOp instance.
Returns
-------
str
The label of the MutateOp instance.
"""
if (self._label is not None):
return self._label
# if we have a named function (not a lambda) return the function name
name = self._func.__name__.split(".")[-1]
if name != "<lambda>":
return f"MutateOp: {name}"
try:
# otherwise get the lambda source code from the inspect module if possible
source = getsourcelines(self.f)[0][0] # pylint: disable=no-member
lambdas = [op.strip() for op in source.split(">>") if "lambda " in op]
if len(lambdas) == 1 and lambdas[0].count("lambda") == 1:
return lambdas[0]
except Exception: # pylint: disable=broad-except
# we can fail to load the source in distributed environments. Since the
# label is mainly used for diagnostics, don't worry about the error here and
# fallback to the default labelling
pass
# Failed to figure out the source
return "MutateOp"
# pylint: disable=arguments-renamed
@annotate("MutateOp", color="darkgreen", domain="nvt_python")
def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
"""
Apply the transformation function on the dataframe.
Parameters
----------
col_selector : ColumnSelector
Instance of ColumnSelector.
df : DataFrameType
Input dataframe.
Returns
-------
DataFrameType
Transformed dataframe.
"""
df = self._func(col_selector, df)
# If our dataframe doesn't contain the expected output columns, even after processing, we add dummy columns.
# This could occur if our JSON data doesn't always contain columns we expect to be expanded.
df_cols_set = set(df.columns)
new_cols = {
col[0]: np.zeros(df.shape[0], dtype=col[1])
for col in self._output_columns if col[0] not in df_cols_set
}
df = df.assign(**new_cols)
return df
def column_mapping(self, col_selector: ColumnSelector) -> typing.Dict[str, str]:
"""
Generate a column mapping.
Parameters
----------
col_selector : ColumnSelector
Instance of ColumnSelector.
Returns
-------
Dict[str, str]
Dictionary of column mappings.
"""
column_mapping = {}
for col_name, _ in self._output_columns:
column_mapping[col_name] = col_selector.names
return column_mapping
def compute_output_schema(
self,
input_schema: Schema,
col_selector: ColumnSelector,
prev_output_schema: typing.Optional[Schema] = None,
) -> Schema:
"""
Compute the output schema.
Parameters
----------
input_schema : Schema
The input schema.
col_selector : ColumnSelector
Instance of ColumnSelector.
prev_output_schema : Optional[Schema], optional
Previous output schema, by default None.
Returns
-------
Schema
The output schema.
"""
output_schema = super().compute_output_schema(input_schema, col_selector, prev_output_schema)
# Add new columns to the output schema
for col, dtype in self._output_columns:
output_schema += Schema([ColumnSchema(col, dtype=dtype)])
return output_schema