Skip to content

Commit

Permalink
TEST-#3672: do fair 'join' at OmniSci's join bench (#3721)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Nov 27, 2021
1 parent 62620cb commit 9396f23
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
70 changes: 59 additions & 11 deletions asv_bench/benchmarks/omnisci/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,63 @@
trigger_import,
get_benchmark_shapes,
)
from ..utils.common import random_state
import numpy as np
import pandas


class TimeJoin:
param_names = ["shape", "how"]
param_names = ["shape", "how", "is_equal_keys"]
params = [
get_benchmark_shapes("omnisci.TimeJoin"),
["left", "inner"],
[True, False],
]

def setup(self, shape, how):
self.df1 = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)
self.df2 = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)
def setup(self, shape, how, is_equal_keys):
self.df1, self.df2 = (
generate_dataframe(
ASV_USE_IMPL,
"int",
*frame_shape,
RAND_LOW,
RAND_HIGH,
cache_prefix=f"{i}-th_frame_to_join",
)
for i, frame_shape in enumerate((shape, shape))
)

if is_equal_keys:
# When the frames have default indices to join on: RangeIndex(frame_length),
# OmniSci backend performs join on the internal meta-column called 'rowid'.
# There is a bug in the engine that makes such joins fail. To avoid joining
# on the meta-column we explicitly specify a non-default index to join on.
# https://github.com/modin-project/modin/issues/3740
# Generating a new object for every index to avoid shared index objects:
self.df1.index = pandas.RangeIndex(1, len(self.df1) + 1)
self.df2.index = pandas.RangeIndex(1, len(self.df2) + 1)
else:
# Intersection rate indicates how many common join-keys `self.df1`
# and `self.df2` have in terms of percentage.
indices_intersection_rate = 0.5

frame_length = len(self.df1)
intersect_size = int(frame_length * indices_intersection_rate)

intersect_part = random_state.choice(
self.df1.index, size=intersect_size, replace=False
)
non_intersect_part = np.arange(
start=frame_length, stop=frame_length + (frame_length - intersect_size)
)
new_index = np.concatenate([intersect_part, non_intersect_part])

random_state.shuffle(new_index)
self.df1.index = new_index

trigger_import(self.df1, self.df2)

def time_join(self, shape, how):
def time_join(self, shape, how, is_equal_keys):
# join dataframes on index to get the predictable shape
execute(self.df1.join(self.df2, how=how, lsuffix="left_"))

Expand All @@ -57,7 +99,7 @@ class TimeMerge:
def setup(self, shapes, how):
gen_unique_key = how == "inner"
self.dfs = []
for shape in shapes:
for i, shape in enumerate(shapes):
self.dfs.append(
generate_dataframe(
ASV_USE_IMPL,
Expand All @@ -66,6 +108,7 @@ def setup(self, shapes, how):
RAND_LOW,
RAND_HIGH,
gen_unique_key=gen_unique_key,
cache_prefix=f"{i}-th_frame_to_merge",
)
)
trigger_import(*self.dfs)
Expand All @@ -86,11 +129,16 @@ class TimeAppend:
params = [get_benchmark_shapes("omnisci.TimeAppend")]

def setup(self, shapes):
self.df1 = generate_dataframe(
ASV_USE_IMPL, "int", *shapes[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
ASV_USE_IMPL, "int", *shapes[1], RAND_LOW, RAND_HIGH
self.df1, self.df2 = (
generate_dataframe(
ASV_USE_IMPL,
"int",
*shape,
RAND_LOW,
RAND_HIGH,
cache_prefix=f"{i}-th_frame_to_append",
)
for i, shape in enumerate(shapes)
)
trigger_import(self.df1, self.df2)

Expand Down
6 changes: 6 additions & 0 deletions asv_bench/benchmarks/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def generate_dataframe(
groupby_ncols: Optional[int] = None,
count_groups: Optional[int] = None,
gen_unique_key: bool = False,
cache_prefix: str = None,
) -> Union[pd.DataFrame, pandas.DataFrame]:
"""
Generate DataFrame with caching.
Expand Down Expand Up @@ -341,6 +342,8 @@ def generate_dataframe(
Count of groups in groupby columns.
gen_unique_key : bool, default: False
Generate `col1` column where all elements are unique.
cache_prefix : str, optional
Prefix to add to the cache key of the requested frame.
Returns
-------
Expand Down Expand Up @@ -369,6 +372,9 @@ def generate_dataframe(
gen_unique_key,
)

if cache_prefix is not None:
cache_key = (cache_prefix, *cache_key)

if cache_key in dataframes_cache:
return dataframes_cache[cache_key]

Expand Down

0 comments on commit 9396f23

Please sign in to comment.