Skip to content

Commit

Permalink
FEAT-#6398: Improved performance of list-like objects insertion into …
Browse files Browse the repository at this point in the history
…DataFrames

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Jul 24, 2023
1 parent d6eb589 commit 5559bae
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 25 deletions.
21 changes: 21 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,7 @@ def getitem_row_array(self, key):
)

def setitem(self, axis, key, value):
value = self._wrap_column_data(value)
return self._setitem(axis=axis, key=key, value=value, how=None)

def _setitem(self, axis, key, value, how="inner"):
Expand Down Expand Up @@ -2895,6 +2896,7 @@ def _compute_duplicated(df): # pragma: no cover
# return a new one from here and let the front end handle the inplace
# update.
def insert(self, loc, column, value):
value = self._wrap_column_data(value)
if isinstance(value, type(self)):
value.columns = [column]
return self.insert_item(axis=1, loc=loc, value=value, how=None)
Expand Down Expand Up @@ -2927,6 +2929,25 @@ def insert(df, internal_indices=[]): # pragma: no cover
)
return self.__constructor__(new_modin_frame)

def _wrap_column_data(self, data):
"""
If the data is list-like, create a single column query compiler.
Parameters
----------
data : any
Returns
-------
data or PandasQueryCompiler
"""
if is_list_like(data):
return self.from_pandas(
pandas.DataFrame(pandas.Series(data, index=self.index)),
data_cls=type(self._modin_frame),
)
return data

# END Insert

def explode(self, column):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _maybe_update_proxies(self, dtypes, new_parent=None):
super()._maybe_update_proxies(dtypes, new_parent)
if self._partitions is None:
return
table = self._partitions[0][0].get()
table = self._partitions[0][0].get(lazy=True)

Check warning on line 512 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L512

Added line #L512 was not covered by tests
if isinstance(table, pyarrow.Table):
super()._maybe_update_proxies(dtypes, new_parent=table)

Expand Down Expand Up @@ -1635,6 +1635,9 @@ def insert(self, loc, column, value):
assert column not in self._table_cols
assert 0 <= loc <= len(self.columns)

if is_list_like(value):
return self._insert_list(loc, column, value)

Check warning on line 1639 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1638-L1639

Added lines #L1638 - L1639 were not covered by tests

exprs = self._index_exprs()
for i in range(0, loc):
col = self.columns[i]
Expand All @@ -1655,6 +1658,171 @@ def insert(self, loc, column, value):
force_execution_mode=self._force_execution_mode,
)

def _insert_list(self, loc, name, value):

Check warning on line 1661 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1661

Added line #L1661 was not covered by tests
"""
Insert a list-like value.
Parameters
----------
loc : int
name : str
value : list
Returns
-------
HdkOnNativeDataframe
"""
ncols = len(self.columns)

Check warning on line 1675 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1675

Added line #L1675 was not covered by tests

if ncols == 0:
return self._list_to_df(name, value, True)

Check warning on line 1678 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1677-L1678

Added lines #L1677 - L1678 were not covered by tests

if self._partitions and self._partitions[0][0].lazy:
return self._insert_list_col(loc, name, value)

Check warning on line 1681 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1680-L1681

Added lines #L1680 - L1681 were not covered by tests

if loc == 0 or loc == ncols:
in_idx = 0 if loc == 0 else 1
if (

Check warning on line 1685 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1683-L1685

Added lines #L1683 - L1685 were not covered by tests
isinstance(self._op, JoinNode)
and self._op.by_rowid
and self._op.input[in_idx]._partitions
and self._op.input[in_idx]._partitions[0][0].lazy
):
lhs = self._op.input[0]
rhs = self._op.input[1]
if loc == 0:
lhs = lhs._insert_list(0, name, value)

Check warning on line 1694 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1691-L1694

Added lines #L1691 - L1694 were not covered by tests
else:
rhs = rhs._insert_list(-1, name, value)

Check warning on line 1696 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1696

Added line #L1696 was not covered by tests
else:
lhs = self if loc == ncols else self._list_to_df(name, value, False)
rhs = self if loc == 0 else self._list_to_df(name, value, False)
elif isinstance(self._op, JoinNode) and self._op.by_rowid:
left_len = len(self._op.input[0].columns)
if loc < left_len:
lhs = self._op.input[0]._insert_list(loc, name, value)
rhs = self._op.input[1]

Check warning on line 1704 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1698-L1704

Added lines #L1698 - L1704 were not covered by tests
else:
lhs = self._op.input[0]
rhs = self._op.input[1]._insert_list(loc - left_len, name, value)

Check warning on line 1707 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1706-L1707

Added lines #L1706 - L1707 were not covered by tests
else:
lexprs = self._index_exprs()
rexprs = OrderedDict()
for i, col in enumerate(self.columns):
(lexprs if i < loc else rexprs)[col] = self.ref(col)
lhs = self.__constructor__(

Check warning on line 1713 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1709-L1713

Added lines #L1709 - L1713 were not covered by tests
columns=self.columns[0:loc],
dtypes=self._dtypes_for_exprs(lexprs),
op=TransformNode(self, lexprs),
index=self._index_cache,
index_cols=self._index_cols,
force_execution_mode=self._force_execution_mode,
)._insert_list(loc, name, value)
rhs = self.__constructor__(

Check warning on line 1721 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1721

Added line #L1721 was not covered by tests
columns=self.columns[loc:],
dtypes=self._dtypes_for_exprs(rexprs),
op=TransformNode(self, rexprs),
force_execution_mode=self._force_execution_mode,
)

op = self._join_by_rowid_op(lhs, rhs)
return self._insert_list_col(loc, name, value, op=op)

Check warning on line 1729 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1728-L1729

Added lines #L1728 - L1729 were not covered by tests

def _insert_list_col(self, idx, name, value, op=None):

Check warning on line 1731 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1731

Added line #L1731 was not covered by tests
"""
Insert a list-like column.
Parameters
----------
idx : int
name : str
value : list
op : DFAlgNode, default: None
Returns
-------
HdkOnNativeDataframe
"""
cols = self.columns.tolist()
if idx == -1:
idx = len(cols)
cols.insert(idx, name)

Check warning on line 1749 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1746-L1749

Added lines #L1746 - L1749 were not covered by tests

part = None
if not op and self._partitions and self._partitions[0][0].lazy:
part = np.array([[self._partitions[0][0].insert(idx, name, value)]])

Check warning on line 1753 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1751-L1753

Added lines #L1751 - L1753 were not covered by tests

if self._index_cols:
idx += len(self._index_cols)
dtypes = self._dtypes.tolist()
dtypes.insert(idx, get_dtype(None if len(value) == 0 else type(value[0])))

Check warning on line 1758 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1755-L1758

Added lines #L1755 - L1758 were not covered by tests

df = self.copy(partitions=part, columns=cols, dtypes=dtypes, op=op)
if not op and part:
df._op = FrameNode(df)
return df

Check warning on line 1763 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1760-L1763

Added lines #L1760 - L1763 were not covered by tests

def _list_to_df(self, name, values, add_index):

Check warning on line 1765 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1765

Added line #L1765 was not covered by tests
"""
Create a single-column frame from the list-like value.
Parameters
----------
name : str
values : list
add_index : bool
Returns
-------
HdkOnNativeDataframe
"""
index_cols = self._index_cols if add_index else None
columns = Index([name])
dtype = get_dtype(None if len(values) == 0 else type(values[0]))
if index_cols is None or (

Check warning on line 1782 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1779-L1782

Added lines #L1779 - L1782 were not covered by tests
self.has_materialized_index and self._is_trivial_index(self.index)
):
idx = None
index_cols = None
dtypes = pd.Series([dtype], index=columns)

Check warning on line 1787 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1785-L1787

Added lines #L1785 - L1787 were not covered by tests
else:
idx = self._get_index
dtypes = self._dtypes[0 : len(index_cols)]
dtypes[name] = dtype
part_data = OrderedDict({name: values})
part = self._partition_mgr_cls._partition_class(part_data, idx)
df = HdkOnNativeDataframe(

Check warning on line 1794 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1789-L1794

Added lines #L1789 - L1794 were not covered by tests
partitions=np.array([[part]]),
index=self._index_cache,
index_cols=index_cols,
columns=columns,
dtypes=dtypes,
)
df._op = FrameNode(df)
return df

Check warning on line 1802 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1801-L1802

Added lines #L1801 - L1802 were not covered by tests

@staticmethod
def _join_by_rowid_op(lhs, rhs):

Check warning on line 1805 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1804-L1805

Added lines #L1804 - L1805 were not covered by tests
"""
Create a JoinNode for join by rowid.
Parameters
----------
lhs : HdkOnNativeDataframe
rhs : HdkOnNativeDataframe
Returns
-------
JoinNode
"""
exprs = lhs._index_exprs()
exprs.update((c, lhs.ref(c)) for c in lhs.columns)
exprs.update((c, rhs.ref(c)) for c in rhs.columns)
condition = lhs._build_equi_join_condition(

Check warning on line 1821 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1818-L1821

Added lines #L1818 - L1821 were not covered by tests
rhs, [ROWID_COL_NAME], [ROWID_COL_NAME]
)
return JoinNode(lhs, rhs, exprs=exprs, condition=condition)

Check warning on line 1824 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L1824

Added line #L1824 was not covered by tests

def cat_codes(self):
"""
Extract codes for a category column.
Expand Down Expand Up @@ -2122,6 +2290,11 @@ def to_arrow(result, op=frame._op, tables=[], frames=iter(input)):

def _build_index_cache(self):
"""Materialize index and store it in the cache."""
if self._partitions and not self._index_cols:
nrows = self._partitions[0][0]._length_cache
self.set_index_cache(Index.__new__(RangeIndex, data=range(nrows)))
return

Check warning on line 2296 in modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py#L2293-L2296

Added lines #L2293 - L2296 were not covered by tests

obj = self._execute()

if self._index_cols is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,28 @@ def __init__(
self.exprs = exprs
self.condition = condition

@property
def by_rowid(self):

Check warning on line 840 in modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py#L839-L840

Added lines #L839 - L840 were not covered by tests
"""
Return True if this is a join by the rowid column.
Returns
-------
bool
"""
return (

Check warning on line 848 in modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py#L848

Added line #L848 was not covered by tests
isinstance(self.condition, OpExpr)
and self.condition.op == "="
and all(
isinstance(o, InputRefExpr) and o.column == ColNameCodec.ROWID_COL_NAME
for o in self.condition.operands
)
)

@_inherit_docstrings(DFAlgNode.require_executed_base)
def require_executed_base(self) -> bool:
return self.by_rowid

Check warning on line 859 in modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/df_algebra.py#L857-L859

Added lines #L857 - L859 were not covered by tests

def copy(self):
"""
Make a shallow copy of the node.
Expand Down

0 comments on commit 5559bae

Please sign in to comment.