-
Notifications
You must be signed in to change notification settings - Fork 647
/
pipeline.py
385 lines (352 loc) · 16 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
"""Module houses ``PandasQueryPipeline`` and ``PandasQuery`` classes, that implement a batch pipeline protocol for Modin Dataframes."""
from typing import Callable, Optional
import numpy as np
import modin.pandas as pd
from modin.core.storage_formats.pandas import PandasQueryCompiler
from modin.error_message import ErrorMessage
from modin.core.execution.ray.implementations.pandas_on_ray.dataframe.dataframe import (
PandasOnRayDataframe,
)
from modin.config import NPartitions
from modin.utils import get_current_execution
class PandasQuery(object):
"""
Internal representation of a single query in a pipeline.
This object represents a single function to be pipelined in a batch pipeline.
Parameters
----------
func : Callable
The function to apply to the dataframe.
is_output : bool, default: False
Whether this query is an output query and should be passed both to the next query, and
directly to postprocessing.
repartition_after : bool, default: False
Whether to repartition after this query is computed. Currently, repartitioning is only
supported if there is 1 partition prior to repartitioning.
fan_out : bool, default: False
Whether to fan out this node. If True and only 1 partition is passed as input, the partition
is replicated `PandasQueryPipeline.num_partitions` (default: `NPartitions.get`) times, and
the function is called on each. The `reduce_fn` must also be specified.
pass_partition_id : bool, default: False
Whether to pass the numerical partition id to the query.
reduce_fn : Callable, default: None
The reduce function to apply if `fan_out` is set to True. This takes the
`PandasQueryPipeline.num_partitions` (default: `NPartitions.get`) partitions that result from
this query, and combines them into 1 partition.
output_id : int, default: None
An id to assign to this node if it is an output.
Notes
-----
`func` must be a function that is applied along an axis of the dataframe.
Use `pandas` for any module level functions inside `func` since it operates directly on
partitions.
"""
def __init__(
self,
func: Callable,
is_output: bool = False,
repartition_after: bool = False,
fan_out: bool = False,
pass_partition_id: bool = False,
reduce_fn: Optional[Callable] = None,
output_id: Optional[int] = None,
):
self.func = func
self.is_output = is_output
self.repartition_after = repartition_after
self.fan_out = fan_out
self.pass_partition_id = pass_partition_id
self.reduce_fn = reduce_fn
self.output_id = output_id
# List of sub-queries to feed into this query, if this query is an output node.
self.operators = None
class PandasQueryPipeline(object):
"""
Internal representation of a query pipeline.
This object keeps track of the functions that compose to form a query pipeline.
Parameters
----------
df : modin.pandas.Dataframe
The dataframe to perform this pipeline on.
num_partitions : int, optional
The number of partitions to maintain for the batched dataframe.
If not specified, the value is assumed equal to ``NPartitions.get()``.
Notes
-----
Only row-parallel pipelines are supported. All queries will be applied along the row axis.
"""
def __init__(self, df, num_partitions: Optional[int] = None):
if get_current_execution() != "PandasOnRay" or (
not isinstance(df._query_compiler._modin_frame, PandasOnRayDataframe)
): # pragma: no cover
ErrorMessage.not_implemented(
"Batch Pipeline API is only implemented for `PandasOnRay` execution."
)
ErrorMessage.single_warning(
"The Batch Pipeline API is an experimental feature and still under development in Modin."
)
self.df = df
self.num_partitions = num_partitions if num_partitions else NPartitions.get()
self.outputs = [] # List of output queries.
self.query_list = [] # List of all queries.
self.is_output_id_specified = (
False # Flag to indicate that `output_id` has been specified for a node.
)
def update_df(self, df):
"""
Update the dataframe to perform this pipeline on.
Parameters
----------
df : modin.pandas.DataFrame
The new dataframe to perform this pipeline on.
"""
if get_current_execution() != "PandasOnRay" or (
not isinstance(df._query_compiler._modin_frame, PandasOnRayDataframe)
): # pragma: no cover
ErrorMessage.not_implemented(
"Batch Pipeline API is only implemented for `PandasOnRay` execution."
)
self.df = df
def add_query(
self,
func: Callable,
is_output: bool = False,
repartition_after: bool = False,
fan_out: bool = False,
pass_partition_id: bool = False,
reduce_fn: Optional[Callable] = None,
output_id: Optional[int] = None,
):
"""
Add a query to the current pipeline.
Parameters
----------
func : Callable
DataFrame query to perform.
is_output : bool, default: False
Whether this query should be designated as an output query. If `True`, the output of
this query is passed both to the next query and directly to postprocessing.
repartition_after : bool, default: False
Whether the dataframe should be repartitioned after this query. Currently,
repartitioning is only supported if there is 1 partition prior.
fan_out : bool, default: False
Whether to fan out this node. If True and only 1 partition is passed as input, the
partition is replicated `self.num_partitions` (default: `NPartitions.get`) times,
and the function is called on each. The `reduce_fn` must also be specified.
pass_partition_id : bool, default: False
Whether to pass the numerical partition id to the query.
reduce_fn : Callable, default: None
The reduce function to apply if `fan_out` is set to True. This takes the
`self.num_partitions` (default: `NPartitions.get`) partitions that result from this
query, and combines them into 1 partition.
output_id : int, default: None
An id to assign to this node if it is an output.
Notes
-----
Use `pandas` for any module level functions inside `func` since it operates directly on
partitions.
"""
if not is_output and output_id is not None:
raise ValueError("Output ID cannot be specified for non-output node.")
if is_output:
if not self.is_output_id_specified and output_id is not None:
if len(self.outputs) != 0:
raise ValueError("Output ID must be specified for all nodes.")
if output_id is None and self.is_output_id_specified:
raise ValueError("Output ID must be specified for all nodes.")
self.query_list.append(
PandasQuery(
func,
is_output,
repartition_after,
fan_out,
pass_partition_id,
reduce_fn,
output_id,
)
)
if is_output:
self.outputs.append(self.query_list[-1])
if output_id is not None:
self.is_output_id_specified = True
self.outputs[-1].operators = self.query_list[:-1]
self.query_list = []
def _complete_nodes(self, list_of_nodes, partitions):
"""
Run a sub-query end to end.
Parameters
----------
list_of_nodes : list of PandasQuery
The functions that compose this query.
partitions : list of PandasOnRayDataframeVirtualPartition
The partitions that compose the dataframe that is input to this sub-query.
Returns
-------
list of PandasOnRayDataframeVirtualPartition
The partitions that result from computing the functions represented by `list_of_nodes`.
"""
for node in list_of_nodes:
if node.fan_out:
if len(partitions) > 1:
ErrorMessage.not_implemented(
"Fan out is only supported with DataFrames with 1 partition."
)
partitions[0] = partitions[0].force_materialization()
partition_list = partitions[0].list_of_block_partitions
partitions[0] = partitions[0].add_to_apply_calls(node.func, 0)
partitions[0].drain_call_queue(num_splits=1)
new_dfs = []
for i in range(1, self.num_partitions):
new_dfs.append(
type(partitions[0])(
partition_list,
full_axis=partitions[0].full_axis,
).add_to_apply_calls(node.func, i)
)
new_dfs[-1].drain_call_queue(num_splits=1)
def reducer(df):
df_inputs = [df]
for df in new_dfs:
df_inputs.append(df.to_pandas())
return node.reduce_fn(df_inputs)
partitions = [partitions[0].add_to_apply_calls(reducer)]
elif node.repartition_after:
if len(partitions) > 1:
ErrorMessage.not_implemented(
"Dynamic repartitioning is currently only supported for DataFrames with 1 partition."
)
partitions[0] = (
partitions[0].add_to_apply_calls(node.func).force_materialization()
)
new_dfs = []
def mask_partition(df, i):
new_length = len(df.index) // self.num_partitions
if i == self.num_partitions - 1:
return df.iloc[i * new_length :]
return df.iloc[i * new_length : (i + 1) * new_length]
for i in range(self.num_partitions):
new_dfs.append(
type(partitions[0])(
partitions[0].list_of_block_partitions,
full_axis=partitions[0].full_axis,
).add_to_apply_calls(mask_partition, i)
)
partitions = new_dfs
else:
if node.pass_partition_id:
partitions = [
part.add_to_apply_calls(node.func, i)
for i, part in enumerate(partitions)
]
else:
partitions = [
part.add_to_apply_calls(node.func) for part in partitions
]
return partitions
def compute_batch(
self,
postprocessor: Optional[Callable] = None,
pass_partition_id: Optional[bool] = False,
pass_output_id: Optional[bool] = False,
):
"""
Run the completed pipeline + any postprocessing steps end to end.
Parameters
----------
postprocessor : Callable, default: None
A postprocessing function to be applied to each output partition.
The order of arguments passed is `df` (the partition), `output_id`
(if `pass_output_id=True`), and `partition_id` (if `pass_partition_id=True`).
pass_partition_id : bool, default: False
Whether or not to pass the numerical partition id to the postprocessing function.
pass_output_id : bool, default: False
Whether or not to pass the output ID associated with output queries to the
postprocessing function.
Returns
-------
list or dict or DataFrame
If output ids are specified, a dictionary mapping output id to the resulting dataframe
is returned, otherwise, a list of the resulting dataframes is returned.
"""
if len(self.outputs) == 0:
ErrorMessage.single_warning(
"No outputs to compute. Returning an empty list. Please specify outputs by calling `add_query` with `is_output=True`."
)
return []
if not self.is_output_id_specified and pass_output_id:
raise ValueError(
"`pass_output_id` is set to True, but output ids have not been specified. "
+ "To pass output ids, please specify them using the `output_id` kwarg with pipeline.add_query"
)
if self.is_output_id_specified:
outs = {}
else:
outs = []
modin_frame = self.df._query_compiler._modin_frame
partitions = modin_frame._partition_mgr_cls.row_partitions(
modin_frame._partitions
)
for node in self.outputs:
partitions = self._complete_nodes(node.operators + [node], partitions)
for part in partitions:
part.drain_call_queue(num_splits=1)
if postprocessor:
output_partitions = []
for partition_id, partition in enumerate(partitions):
args = []
if pass_output_id:
args.append(node.output_id)
if pass_partition_id:
args.append(partition_id)
output_partitions.append(
partition.add_to_apply_calls(postprocessor, *args)
)
else:
output_partitions = [
part.add_to_apply_calls(lambda df: df) for part in partitions
]
[
part.drain_call_queue(num_splits=self.num_partitions)
for part in output_partitions
] # Ensures our result df is block partitioned.
if not self.is_output_id_specified:
outs.append(output_partitions)
else:
outs[node.output_id] = output_partitions
if self.is_output_id_specified:
final_results = {}
id_df_iter = outs.items()
else:
final_results = [None] * len(outs)
id_df_iter = enumerate(outs)
for id, df in id_df_iter:
partitions = []
for row_partition in df:
partitions.append(row_partition.list_of_block_partitions)
partitions = np.array(partitions)
partition_mgr_class = PandasOnRayDataframe._partition_mgr_cls
index, internal_rows = partition_mgr_class.get_indices(0, partitions)
columns, internal_cols = partition_mgr_class.get_indices(1, partitions)
result_modin_frame = PandasOnRayDataframe(
partitions,
index,
columns,
row_lengths=list(map(len, internal_rows)),
column_widths=list(map(len, internal_cols)),
)
query_compiler = PandasQueryCompiler(result_modin_frame)
result_df = pd.DataFrame(query_compiler=query_compiler)
final_results[id] = result_df
return final_results