Skip to content

Commit

Permalink
Improve performance of DataFrameEngine.df_like (#2029)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffreyangus committed May 13, 2022
1 parent 7e2b464 commit caf7fdc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
10 changes: 3 additions & 7 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ def df_like(self, df: dd.DataFrame, proc_cols: Dict[str, dd.Series]):
# Our goal is to preserve the index of the input dataframe but to drop
# all its columns. Because to_frame() creates a column from the index,
# we need to drop it immediately following creation.
dataset = df.index.to_frame(name=TMP_COLUMN).drop(columns=[TMP_COLUMN])
# TODO: address if following results in fragmented DataFrame
# TODO: see if we can get divisions. concat (instead of iterative join) should work if divs are known. Source:
# https://github.com/dask/dask/blob/5fbda77cfc5bc1b8f1453a2dbb034b048fc10726/dask/dataframe/multi.py#L1245
for col_name, col in proc_cols.items():
col.name = col_name
dataset = dataset.join(col, how="inner") # inner join handles Series with dropped rows
dataset = df.index.to_frame(name=TMP_COLUMN).drop(columns=TMP_COLUMN)
for k, v in proc_cols.items():
dataset[k] = v
return dataset

def parallelize(self, data):
Expand Down
15 changes: 2 additions & 13 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,8 @@ def __init__(self, **kwargs):
super().__init__()

def df_like(self, df, proc_cols):
# Our goal is to preserve the index of the input dataframe but to drop
# all its columns. Because to_frame() creates a column from the index,
# we need to drop it immediately following creation.
col_names, cols = zip(*proc_cols.items())
series_cols = []
for col in cols:
if type(col) not in {pd.Series, pd.DataFrame}:
series_cols.append(pd.Series(col))
else:
series_cols.append(col)
dataset = pd.concat(series_cols, join="inner", axis=1) # inner join handles Series with dropped rows
dataset.columns = col_names
return dataset
# df argument unused for pandas, which can instantiate df directly
return pd.DataFrame(proc_cols)

def parallelize(self, data):
return data
Expand Down
11 changes: 11 additions & 0 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import numpy as np
import pandas as pd
import torch

from ludwig.backend import Backend, LOCAL_BACKEND
from ludwig.constants import (
Expand Down Expand Up @@ -1132,13 +1133,23 @@ def build_dataset(
if reshape is not None:
proc_cols[proc_column] = backend.df_engine.map_objects(proc_cols[proc_column], lambda x: x.reshape(-1))

# Implements an outer join of proc_cols
dataset = backend.df_engine.df_like(dataset_df, proc_cols)

# At this point, there should be no missing values left in the dataframe, unless
# the DROP_ROW preprocessing option was selected, in which case we need to drop those
# rows.
dataset = dataset.dropna()

# NaNs introduced by outer join change dtype of dataset cols (upcast to float64), so we need to cast them back.
col_name_to_dtype = {}
for col_name, col in proc_cols.items():
# if col is a list of list-like objects, we assume the internal dtype of each col[i] remains unchanged.
if type(col) == list and type(col[0]) in {list, np.ndarray, torch.Tensor}:
continue
col_name_to_dtype[col_name] = col.dtype
dataset = dataset.astype(col_name_to_dtype)

return dataset, metadata


Expand Down

0 comments on commit caf7fdc

Please sign in to comment.