Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved performance of Dask preprocessing by adding parallelism #1193

Merged
merged 9 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ludwig/backend/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def shutdown(self):


class DaskBackend(LocalTrainingMixin, Backend):
def __init__(self, data_format=PARQUET, **kwargs):
def __init__(self, data_format=PARQUET, engine=None, **kwargs):
super().__init__(data_format=data_format, **kwargs)
self._df_engine = DaskEngine()
engine = engine or {}
self._df_engine = DaskEngine(**engine)
if data_format != PARQUET:
raise ValueError(
f'Data format {data_format} is not supported when using the Dask backend. '
Expand Down
2 changes: 1 addition & 1 deletion ludwig/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@

HDF5 = 'hdf5'
PARQUET = 'parquet'

SRC = 'dataset_src'
21 changes: 12 additions & 9 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import dask
import dask.array as da
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

from ludwig.constants import NAME, PROC_COLUMN
from ludwig.data.dataset.parquet import ParquetDataset
Expand All @@ -38,8 +39,9 @@ def set_scheduler(scheduler):


class DaskEngine(DataFrameEngine):
def __init__(self):
self._parallelism = multiprocessing.cpu_count()
def __init__(self, parallelism=None, persist=False):
self._parallelism = parallelism or multiprocessing.cpu_count()
self._persist = persist

def set_parallelism(self, parallelism):
self._parallelism = parallelism
Expand All @@ -54,7 +56,7 @@ def parallelize(self, data):
return data.repartition(self.parallelism)

def persist(self, data):
return data.persist()
return data.persist() if self._persist else data

def compute(self, data):
return data.compute()
Expand All @@ -74,12 +76,13 @@ def reduce_objects(self, series, reduce_fn):
return series.reduction(reduce_fn, aggregate=reduce_fn, meta=('data', 'object')).compute()[0]

def to_parquet(self, df, path):
df.to_parquet(
path,
engine='pyarrow',
write_index=False,
schema='infer',
)
with ProgressBar():
df.to_parquet(
path,
engine='pyarrow',
write_index=False,
schema='infer',
)

@property
def array_lib(self):
Expand Down
Loading