Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of DataFrameEngine.df_like #2029

Merged
merged 22 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f743e1
with type logs for debugging
geoffreyangus May 12, 2022
cecc57e
cleanup
geoffreyangus May 12, 2022
39169cd
implements fix
geoffreyangus May 12, 2022
d282633
changes
geoffreyangus May 12, 2022
0cf3add
logs for gh test
geoffreyangus May 12, 2022
7d47db3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 12, 2022
6ce1813
more logs for comet test
geoffreyangus May 12, 2022
370c9f7
Merge branch 'speedup-dask-df-like' of https://github.com/ludwig-ai/l…
geoffreyangus May 12, 2022
707a377
updated to check np.ndarray objects before casting
geoffreyangus May 13, 2022
45bd172
with debugging statements
geoffreyangus May 13, 2022
e1cd1ce
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 13, 2022
6649bec
astype on whole dataframe
geoffreyangus May 13, 2022
0ecfb5b
Merge branch 'speedup-dask-df-like' of https://github.com/ludwig-ai/l…
geoffreyangus May 13, 2022
1a863cb
Merge branch 'master' into speedup-dask-df-like
geoffreyangus May 13, 2022
d9a3747
remove some print statements and added back tests
geoffreyangus May 13, 2022
3b7a75d
removes extraneous import
geoffreyangus May 13, 2022
d0cb9ed
remove workaround code in comet
geoffreyangus May 13, 2022
8e98efc
faster sampling
geoffreyangus May 13, 2022
21c4411
more precise condition for skipping dtype
geoffreyangus May 13, 2022
e30e3f3
cleanup comment
geoffreyangus May 13, 2022
9582742
add type
geoffreyangus May 13, 2022
b2f7308
removed extraneous print statements
geoffreyangus May 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ def df_like(self, df: dd.DataFrame, proc_cols: Dict[str, dd.Series]):
# Our goal is to preserve the index of the input dataframe but to drop
# all its columns. Because to_frame() creates a column from the index,
# we need to drop it immediately following creation.
dataset = df.index.to_frame(name=TMP_COLUMN).drop(columns=[TMP_COLUMN])
# TODO: address if following results in fragmented DataFrame
# TODO: see if we can get divisions. concat (instead of iterative join) should work if divs are known. Source:
# https://github.com/dask/dask/blob/5fbda77cfc5bc1b8f1453a2dbb034b048fc10726/dask/dataframe/multi.py#L1245
for col_name, col in proc_cols.items():
col.name = col_name
dataset = dataset.join(col, how="inner") # inner join handles Series with dropped rows
dataset = df.index.to_frame(name=TMP_COLUMN).drop(columns=TMP_COLUMN)
for k, v in proc_cols.items():
dataset[k] = v
return dataset

def parallelize(self, data):
Expand Down
15 changes: 2 additions & 13 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,8 @@ def __init__(self, **kwargs):
super().__init__()

def df_like(self, df, proc_cols):
# Our goal is to preserve the index of the input dataframe but to drop
# all its columns. Because to_frame() creates a column from the index,
# we need to drop it immediately following creation.
col_names, cols = zip(*proc_cols.items())
series_cols = []
for col in cols:
if type(col) not in {pd.Series, pd.DataFrame}:
series_cols.append(pd.Series(col))
else:
series_cols.append(col)
dataset = pd.concat(series_cols, join="inner", axis=1) # inner join handles Series with dropped rows
dataset.columns = col_names
return dataset
# df argument unused for pandas, which can instantiate df directly
return pd.DataFrame(proc_cols)

def parallelize(self, data):
return data
Expand Down
11 changes: 11 additions & 0 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import numpy as np
import pandas as pd
import torch

from ludwig.backend import Backend, LOCAL_BACKEND
from ludwig.constants import (
Expand Down Expand Up @@ -1132,13 +1133,23 @@ def build_dataset(
if reshape is not None:
proc_cols[proc_column] = backend.df_engine.map_objects(proc_cols[proc_column], lambda x: x.reshape(-1))

# Implements an outer join of proc_cols
dataset = backend.df_engine.df_like(dataset_df, proc_cols)

# At this point, there should be no missing values left in the dataframe, unless
# the DROP_ROW preprocessing option was selected, in which case we need to drop those
# rows.
dataset = dataset.dropna()

# NaNs introduced by outer join change dtype of dataset cols (upcast to float64), so we need to cast them back.
col_name_to_dtype = {}
for col_name, col in proc_cols.items():
# if col is a list of list-like objects, we assume the internal dtype of each col[i] remains unchanged.
if type(col) == list and type(col[0]) in {list, np.ndarray, torch.Tensor}:
continue
col_name_to_dtype[col_name] = col.dtype
dataset = dataset.astype(col_name_to_dtype)

return dataset, metadata


Expand Down