/
autoencoder_source_stage.py
337 lines (266 loc) · 12.6 KB
/
autoencoder_source_stage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import typing
from abc import abstractmethod
from functools import partial
import mrc
import pandas as pd
from mrc.core import operators as ops
from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.messages import UserMessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.directory_watcher import DirectoryWatcher
logger = logging.getLogger(__name__)
class AutoencoderSourceStage(PreallocatorMixin, SingleOutputSource):
"""
All AutoEncoder source stages must extend this class and implement the `files_to_dfs_per_user` abstract method.
Feature columns can be managed by overriding the `derive_features` method. Otherwise, all columns from input
data pass through to next stage.
Extend this class to load messages from a files and dump contents into a DFP pipeline immediately. Useful for
testing performance and accuracy of a pipeline.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
input_glob : str
Input glob pattern to match files to read. For example, `./input_dir/*.json` would read all files with the
'json' extension in the directory input_dir.
watch_directory : bool, default = False
The watch directory option instructs this stage to not close down once all files have been read. Instead it will
read all files that match the 'input_glob' pattern, and then continue to watch the directory for additional
files. Any new files that are added that match the glob will then be processed.
max_files: int, default = -1
Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited.
file_type : `morpheus.common.FileTypes`, default = 'FileTypes.Auto'.
Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension.
Supported extensions: 'json', 'csv'
repeat: int, default = 1
How many times to repeat the dataset. Useful for extending small datasets in debugging.
sort_glob : bool, default = False
If true the list of files matching `input_glob` will be processed in sorted order.
recursive: bool, default = True
If true, events will be emitted for the files in subdirectories that match `input_glob`.
queue_max_size: int, default = 128
Maximum queue size to hold the file paths to be processed that match `input_glob`.
batch_timeout: float, default = 5.0
Timeout to retrieve batch messages from the queue.
"""
def __init__(self,
c: Config,
input_glob: str,
watch_directory: bool = False,
max_files: int = -1,
file_type: FileTypes = FileTypes.Auto,
repeat: int = 1,
sort_glob: bool = False,
recursive: bool = True,
queue_max_size: int = 128,
batch_timeout: float = 5.0):
SingleOutputSource.__init__(self, c)
self._input_glob = input_glob
self._file_type = file_type
self._feature_columns = c.ae.feature_columns
self._user_column_name = c.ae.userid_column_name
self._userid_filter = c.ae.userid_filter
self._input_count = None
# Hold the max index we have seen to ensure sequential and increasing indexes
self._rows_per_user: typing.Dict[str, int] = {}
# Iterative mode will emit dataframes one at a time. Otherwise a list of dataframes is emitted. Iterative mode
# is good for interleaving source stages.
self._repeat_count = repeat
self._watcher = DirectoryWatcher(input_glob=input_glob,
watch_directory=watch_directory,
max_files=max_files,
sort_glob=sort_glob,
recursive=recursive,
queue_max_size=queue_max_size,
batch_timeout=batch_timeout)
@property
def input_count(self) -> int:
"""Return None for no max input count"""
return self._input_count if self._input_count is not None else 0
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(UserMessageMeta)
def get_match_pattern(self, glob_split):
"""Return a file match pattern"""
dir_to_watch = os.path.dirname(glob_split[0])
match_pattern = self._input_glob.replace(dir_to_watch + "/", "", 1)
return match_pattern
@staticmethod
def repeat_df(df: pd.DataFrame, repeat_count: int) -> typing.List[pd.DataFrame]:
"""
This function iterates over the same dataframe to extending small datasets in debugging with incremental
updates to the `event_dt` and `eventTime` columns.
Parameters
----------
df : pd.DataFrame
To be repeated dataframe.
repeat_count : int
Number of times the given dataframe should be repeated.
Returns
-------
df_array : typing.List[pd.DataFrame]
List of repeated dataframes.
"""
df_array = []
df_array.append(df)
for _ in range(1, repeat_count):
x = df.copy()
# Now increment the timestamps by the interval in the df
x["event_dt"] = x["event_dt"] + (x["event_dt"].iloc[-1] - x["event_dt"].iloc[0])
x["eventTime"] = x["event_dt"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
df_array.append(x)
# Set df for next iteration
df = x
return df_array
@staticmethod
def batch_user_split(x: typing.List[pd.DataFrame],
userid_column_name: str,
userid_filter: str,
datetime_column_name="event_dt"):
"""
Creates a dataframe for each userid.
Parameters
----------
x : typing.List[pd.DataFrame]
List of dataframes.
userid_column_name : str
Name of a dataframe column used for categorization.
userid_filter : str
Only rows with the supplied userid are filtered.
datetime_column_name : str
Name of the dataframe column used to sort the rows.
Returns
-------
user_dfs : typing.Dict[str, pd.DataFrame]
Dataframes, each of which is associated with a single userid.
"""
combined_df = pd.concat(x)
if (datetime_column_name in combined_df):
# Convert to date_time column
# combined_df["event_dt"] = pd.to_datetime(combined_df["eventTime"])
# Set the index name so we can sort first by time then by index (to keep things all in order). Then restore
# the name
saved_index_name = combined_df.index.name
combined_df.index.name = "idx"
# Sort by time
combined_df = combined_df.sort_values(by=[datetime_column_name, "idx"])
combined_df.index.name = saved_index_name
logger.debug(
"CloudTrail loading complete. Total rows: %d. Timespan: %s",
len(combined_df),
str(combined_df.loc[combined_df.index[-1], datetime_column_name] -
combined_df.loc[combined_df.index[0], datetime_column_name]))
# Get the users in this DF
unique_users = combined_df[userid_column_name].unique()
user_dfs = {}
for user_name in unique_users:
if (userid_filter is not None and user_name != userid_filter):
continue
# Get just this users data and make a copy to remove link to grouped DF
user_df = combined_df[combined_df[userid_column_name] == user_name].copy()
user_dfs[user_name] = user_df
return user_dfs
@staticmethod
@abstractmethod
def files_to_dfs_per_user(x: typing.List[str],
userid_column_name: str,
feature_columns: typing.List[str],
userid_filter: str = None,
repeat_count: int = 1) -> typing.Dict[str, pd.DataFrame]:
"""
Stages that extend `AutoencoderSourceStage` must implement this abstract function
in order to convert messages in the files to dataframes per userid.
Parameters
----------
x : typing.List[str]
List of messages.
userid_column_name : str
Name of the column used for categorization.
feature_columns : typing.List[str]
Feature column names.
userid_filter : str
Only rows with the supplied userid are filtered.
repeat_count : str
Number of times the given rows should be repeated.
Returns
-------
: typing.Dict[str, pd.DataFrame]
Dataframe per userid.
"""
pass
@staticmethod
def derive_features(df: pd.DataFrame, feature_columns: typing.List[str]): # pylint: disable=unused-argument
"""
If any features are available to be derived, can be implemented by overriding this function.
Parameters
----------
df : pd.DataFrame
A dataframe.
feature_columns : typing.List[str]
Names of columns that are need to be derived.
Returns
-------
df : typing.List[pd.DataFrame]
Dataframe with actual and derived columns.
"""
return df
def _add_derived_features(self, x: typing.Dict[str, pd.DataFrame]):
for user_name in x.keys():
x[user_name] = self.derive_features(x[user_name], None)
return x
def _build_user_metadata(self, x: typing.Dict[str, pd.DataFrame]):
user_metas = []
for user_name, user_df in x.items():
# See if we have seen this user before
if (user_name not in self._rows_per_user):
self._rows_per_user[user_name] = 0
# Combine the original index with itself so it shows up as a named column
user_df.index.name = "_index_" + (user_df.index.name or "")
user_df = user_df.reset_index()
# Now ensure the index for this user is correct
user_df.index = range(self._rows_per_user[user_name], self._rows_per_user[user_name] + len(user_df))
self._rows_per_user[user_name] += len(user_df)
# Now make a UserMessageMeta with the user name
meta = UserMessageMeta(user_df, user_name)
user_metas.append(meta)
return user_metas
def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
# The first source just produces filenames
return self._watcher.build_node(self.unique_name, builder)
def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) -> mrc.SegmentObject:
# At this point, we have batches of filenames to process. Make a node for processing batches of
# filenames into batches of dataframes
post_node = builder.make_node(
self.unique_name + "-post",
ops.map(
partial(
self.files_to_dfs_per_user,
userid_column_name=self._user_column_name,
feature_columns=None, # Use None here to leave all columns in
userid_filter=self._userid_filter,
repeat_count=self._repeat_count)),
ops.map(self._add_derived_features),
# Now group the batch of dataframes into a single df, split by user, and send a single UserMessageMeta
# per user
ops.map(self._build_user_metadata),
# Finally flatten to single meta
ops.flatten())
builder.make_edge(out_node, post_node)
return super()._post_build_single(builder, post_node)