Skip to content

Commit

Permalink
Implement hash_join for merges (#57970)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl committed Mar 24, 2024
1 parent e51039a commit 669ddfb
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 19 deletions.
17 changes: 17 additions & 0 deletions asv_bench/benchmarks/join_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,23 @@ def time_i8merge(self, how):
merge(self.left, self.right, how=how)


class UniqueMerge:
params = [4_000_000, 1_000_000]
param_names = ["unique_elements"]

def setup(self, unique_elements):
N = 1_000_000
self.left = DataFrame({"a": np.random.randint(1, unique_elements, (N,))})
self.right = DataFrame({"a": np.random.randint(1, unique_elements, (N,))})
uniques = self.right.a.drop_duplicates()
self.right["a"] = concat(
[uniques, Series(np.arange(0, -(N - len(uniques)), -1))], ignore_index=True
)

def time_unique_merge(self, unique_elements):
merge(self.left, self.right, how="inner")


class MergeDatetime:
params = [
[
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ Performance improvements
- Performance improvement in :meth:`RangeIndex.join` returning a :class:`RangeIndex` instead of a :class:`Index` when possible. (:issue:`57651`, :issue:`57752`)
- Performance improvement in :meth:`RangeIndex.reindex` returning a :class:`RangeIndex` instead of a :class:`Index` when possible. (:issue:`57647`, :issue:`57752`)
- Performance improvement in :meth:`RangeIndex.take` returning a :class:`RangeIndex` instead of a :class:`Index` when possible. (:issue:`57445`, :issue:`57752`)
- Performance improvement in :func:`merge` if hash-join can be used (:issue:`57970`)
- Performance improvement in ``DataFrameGroupBy.__len__`` and ``SeriesGroupBy.__len__`` (:issue:`57595`)
- Performance improvement in indexing operations for string dtypes (:issue:`56997`)
- Performance improvement in unary methods on a :class:`RangeIndex` returning a :class:`RangeIndex` instead of a :class:`Index` when possible. (:issue:`57825`)
Expand Down
8 changes: 7 additions & 1 deletion pandas/_libs/hashtable.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def unique_label_indices(
class Factorizer:
count: int
uniques: Any
def __init__(self, size_hint: int) -> None: ...
def __init__(self, size_hint: int, uses_mask: bool = False) -> None: ...
def get_count(self) -> int: ...
def factorize(
self,
Expand All @@ -25,6 +25,9 @@ class Factorizer:
na_value=...,
mask=...,
) -> npt.NDArray[np.intp]: ...
def hash_inner_join(
self, values: np.ndarray, mask=...
) -> tuple[np.ndarray, np.ndarray]: ...

class ObjectFactorizer(Factorizer):
table: PyObjectHashTable
Expand Down Expand Up @@ -216,6 +219,9 @@ class HashTable:
mask=...,
ignore_na: bool = True,
) -> tuple[np.ndarray, npt.NDArray[np.intp]]: ... # np.ndarray[subclass-specific]
def hash_inner_join(
self, values: np.ndarray, mask=...
) -> tuple[np.ndarray, np.ndarray]: ...

class Complex128HashTable(HashTable): ...
class Complex64HashTable(HashTable): ...
Expand Down
7 changes: 5 additions & 2 deletions pandas/_libs/hashtable.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cdef class Factorizer:
cdef readonly:
Py_ssize_t count

def __cinit__(self, size_hint: int):
def __cinit__(self, size_hint: int, uses_mask: bool = False):
self.count = 0

def get_count(self) -> int:
Expand All @@ -79,13 +79,16 @@ cdef class Factorizer:
def factorize(self, values, na_sentinel=-1, na_value=None, mask=None) -> np.ndarray:
raise NotImplementedError

def hash_inner_join(self, values, mask=None):
raise NotImplementedError


cdef class ObjectFactorizer(Factorizer):
cdef public:
PyObjectHashTable table
ObjectVector uniques

def __cinit__(self, size_hint: int):
def __cinit__(self, size_hint: int, uses_mask: bool = False):
self.table = PyObjectHashTable(size_hint)
self.uniques = ObjectVector()

Expand Down
50 changes: 48 additions & 2 deletions pandas/_libs/hashtable_class_helper.pxi.in
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,49 @@ cdef class {{name}}HashTable(HashTable):
self.table.vals[k] = i
self.na_position = na_position

@cython.wraparound(False)
@cython.boundscheck(False)
def hash_inner_join(self, const {{dtype}}_t[:] values, const uint8_t[:] mask = None) -> tuple[ndarray, ndarray]:
cdef:
Py_ssize_t i, n = len(values)
{{c_type}} val
khiter_t k
Int64Vector locs = Int64Vector()
Int64Vector self_locs = Int64Vector()
Int64VectorData *l
Int64VectorData *sl
int8_t na_position = self.na_position

l = &locs.data
sl = &self_locs.data

if self.uses_mask and mask is None:
raise NotImplementedError # pragma: no cover

with nogil:
for i in range(n):
if self.uses_mask and mask[i]:
if self.na_position == -1:
continue
if needs_resize(l.size, l.capacity):
with gil:
locs.resize(locs.data.capacity * 4)
self_locs.resize(locs.data.capacity * 4)
append_data_int64(l, i)
append_data_int64(sl, na_position)
else:
val = {{to_c_type}}(values[i])
k = kh_get_{{dtype}}(self.table, val)
if k != self.table.n_buckets:
if needs_resize(l.size, l.capacity):
with gil:
locs.resize(locs.data.capacity * 4)
self_locs.resize(locs.data.capacity * 4)
append_data_int64(l, i)
append_data_int64(sl, self.table.vals[k])

return self_locs.to_array(), locs.to_array()

@cython.boundscheck(False)
def lookup(self, const {{dtype}}_t[:] values, const uint8_t[:] mask = None) -> ndarray:
# -> np.ndarray[np.intp]
Expand Down Expand Up @@ -879,8 +922,8 @@ cdef class {{name}}Factorizer(Factorizer):
{{name}}HashTable table
{{name}}Vector uniques

def __cinit__(self, size_hint: int):
self.table = {{name}}HashTable(size_hint)
def __cinit__(self, size_hint: int, uses_mask: bool = False):
self.table = {{name}}HashTable(size_hint, uses_mask=uses_mask)
self.uniques = {{name}}Vector()

def factorize(self, const {{c_type}}[:] values,
Expand Down Expand Up @@ -911,6 +954,9 @@ cdef class {{name}}Factorizer(Factorizer):
self.count = len(self.uniques)
return labels

def hash_inner_join(self, const {{c_type}}[:] values, const uint8_t[:] mask = None) -> tuple[np.ndarray, np.ndarray]:
return self.table.hash_inner_join(values, mask)

{{endfor}}


Expand Down
51 changes: 37 additions & 14 deletions pandas/core/reshape/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,10 @@ def get_join_indexers_non_unique(
np.ndarray[np.intp]
Indexer into right.
"""
lkey, rkey, count = _factorize_keys(left, right, sort=sort)
lkey, rkey, count = _factorize_keys(left, right, sort=sort, how=how)
if count == -1:
# hash join
return lkey, rkey
if how == "left":
lidx, ridx = libjoin.left_outer_join(lkey, rkey, count, sort=sort)
elif how == "right":
Expand Down Expand Up @@ -2385,7 +2388,10 @@ def _left_join_on_index(


def _factorize_keys(
lk: ArrayLike, rk: ArrayLike, sort: bool = True
lk: ArrayLike,
rk: ArrayLike,
sort: bool = True,
how: str | None = None,
) -> tuple[npt.NDArray[np.intp], npt.NDArray[np.intp], int]:
"""
Encode left and right keys as enumerated types.
Expand All @@ -2401,6 +2407,9 @@ def _factorize_keys(
sort : bool, defaults to True
If True, the encoding is done such that the unique elements in the
keys are sorted.
how: str, optional
Used to determine if we can use hash-join. If not given, then just factorize
keys.
Returns
-------
Expand All @@ -2409,7 +2418,8 @@ def _factorize_keys(
np.ndarray[np.intp]
Right (resp. left if called with `key='right'`) labels, as enumerated type.
int
Number of unique elements in union of left and right labels.
Number of unique elements in union of left and right labels. -1 if we used
a hash-join.
See Also
--------
Expand Down Expand Up @@ -2527,28 +2537,41 @@ def _factorize_keys(

klass, lk, rk = _convert_arrays_and_get_rizer_klass(lk, rk)

rizer = klass(max(len(lk), len(rk)))
rizer = klass(
max(len(lk), len(rk)),
uses_mask=isinstance(rk, (BaseMaskedArray, ArrowExtensionArray)),
)

if isinstance(lk, BaseMaskedArray):
assert isinstance(rk, BaseMaskedArray)
llab = rizer.factorize(lk._data, mask=lk._mask)
rlab = rizer.factorize(rk._data, mask=rk._mask)
lk_data, lk_mask = lk._data, lk._mask
rk_data, rk_mask = rk._data, rk._mask
elif isinstance(lk, ArrowExtensionArray):
assert isinstance(rk, ArrowExtensionArray)
# we can only get here with numeric dtypes
# TODO: Remove when we have a Factorizer for Arrow
llab = rizer.factorize(
lk.to_numpy(na_value=1, dtype=lk.dtype.numpy_dtype), mask=lk.isna()
)
rlab = rizer.factorize(
rk.to_numpy(na_value=1, dtype=lk.dtype.numpy_dtype), mask=rk.isna()
)
lk_data = lk.to_numpy(na_value=1, dtype=lk.dtype.numpy_dtype)
rk_data = rk.to_numpy(na_value=1, dtype=lk.dtype.numpy_dtype)
lk_mask, rk_mask = lk.isna(), rk.isna()
else:
# Argument 1 to "factorize" of "ObjectFactorizer" has incompatible type
# "Union[ndarray[Any, dtype[signedinteger[_64Bit]]],
# ndarray[Any, dtype[object_]]]"; expected "ndarray[Any, dtype[object_]]"
llab = rizer.factorize(lk) # type: ignore[arg-type]
rlab = rizer.factorize(rk) # type: ignore[arg-type]
lk_data, rk_data = lk, rk # type: ignore[assignment]
lk_mask, rk_mask = None, None

hash_join_available = how == "inner" and not sort and lk.dtype.kind in "iufb"
if hash_join_available:
rlab = rizer.factorize(rk_data, mask=rk_mask)
if rizer.get_count() == len(rlab):
ridx, lidx = rizer.hash_inner_join(lk_data, lk_mask)
return lidx, ridx, -1
else:
llab = rizer.factorize(lk_data, mask=lk_mask)
else:
llab = rizer.factorize(lk_data, mask=lk_mask)
rlab = rizer.factorize(rk_data, mask=rk_mask)

assert llab.dtype == np.dtype(np.intp), llab.dtype
assert rlab.dtype == np.dtype(np.intp), rlab.dtype

Expand Down
1 change: 1 addition & 0 deletions scripts/run_stubtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"pandas._libs.hashtable.HashTable.set_na",
"pandas._libs.hashtable.HashTable.sizeof",
"pandas._libs.hashtable.HashTable.unique",
"pandas._libs.hashtable.HashTable.hash_inner_join",
# stubtest might be too sensitive
"pandas._libs.lib.NoDefault",
"pandas._libs.lib._NoDefault.no_default",
Expand Down

0 comments on commit 669ddfb

Please sign in to comment.