In [1]:
import pandas as pd
import pyarrow as pa
from pyarrow import feather

## TPCx-BB query 1

From https://github.com/rapidsai/gpu-bdb/blob/main/tpcx_bb/queries/q01/tpcx_bb_query_01.py

In [2]:
q01_i_category_id_IN = [1, 2, 3]
# -- sf1 -> 11 stores, 90k sales in 820k lines
q01_ss_store_sk_IN = [10, 20, 33, 40, 50]
q01_viewed_together_count = 50
q01_limit = 100

item_cols = ["i_item_sk", "i_category_id"]
ss_cols = ["ss_item_sk", "ss_store_sk", "ss_ticket_number"]

In [3]:
item_df = pd.read_feather("data/scale-10/item.feather", columns=item_cols)
ss_df = pd.read_feather("data/scale-10/store_sales.feather", columns=ss_cols)

In [4]:
item_df.head()

Unnamed: 0,i_item_sk,i_category_id
0,1,5.0
1,2,1.0
2,3,7.0
3,4,3.0
4,5,2.0


In [5]:
ss_df.head()

Unnamed: 0,ss_item_sk,ss_store_sk,ss_ticket_number
0,34435,40.0,1
1,18239,40.0,1
2,49325,40.0,1
3,57451,40.0,1
4,80011,40.0,1


In [6]:
item_df.dtypes

i_item_sk          int32
i_category_id    float64
dtype: object

In [7]:
ss_df.dtypes

ss_item_sk            int32
ss_store_sk         float64
ss_ticket_number      int32
dtype: object

In [8]:
len(ss_df)

28800991

In [9]:
%%time 
# SELECT DISTINCT ss_item_sk,ss_ticket_number
# FROM store_sales s, item i
# -- Only products in certain categories sold in specific stores are considered,
# WHERE s.ss_item_sk = i.i_item_sk
# AND i.i_category_id IN ({q01_i_category_id_IN})
# AND s.ss_store_sk IN ({q01_ss_store_sk_IN})

f_ss_df = ss_df.loc[ss_df["ss_store_sk"].isin(q01_ss_store_sk_IN)][
    ["ss_item_sk", "ss_ticket_number"]
].reset_index(drop=True)

f_item_df = item_df.loc[item_df["i_category_id"].isin(q01_i_category_id_IN)][
    ["i_item_sk"]
].reset_index(drop=True)

ss_item_join = f_item_df.merge(
    f_ss_df, left_on=["i_item_sk"], right_on=["ss_item_sk"]
)
ss_item_join = ss_item_join[["ss_item_sk", "ss_ticket_number"]]

CPU times: user 332 ms, sys: 284 ms, total: 616 ms
Wall time: 622 ms


In [10]:
ss_item_join

Unnamed: 0,ss_item_sk,ss_ticket_number
0,2,46630
1,2,113987
2,2,224047
3,2,277952
4,2,314134
...,...,...
646524,102000,1272102
646525,102000,1417665
646526,102000,1654229
646527,102000,2102112


In [11]:
ss_item_join = ss_item_join.drop_duplicates()

In [12]:
def get_pairs(
    df,
    col_name="ss_item_sk",
    merge_col="ss_ticket_number",
    pair_col="ss_item_sk",
    output_col_1="item_sk_1",
    output_col_2="item_sk_2",
):
    pair_df = df.merge(df, on=merge_col, suffixes=["_t1", "_t2"])
    pair_df = pair_df[[f"{pair_col}_t1", f"{pair_col}_t2"]]
    pair_df = pair_df[
        pair_df[f"{pair_col}_t1"] < pair_df[f"{pair_col}_t2"]
    ].reset_index(drop=True)
    pair_df = pair_df.rename(
        columns={f"{pair_col}_t1": output_col_1, f"{pair_col}_t2": output_col_2}
    )
    return pair_df


In [13]:
### do pair inner join
pair_df = get_pairs(ss_item_join)

In [14]:
pair_df.dtypes

item_sk_1    int32
item_sk_2    int32
dtype: object

In [15]:
%%time
# SELECT item_sk_1, item_sk_2, COUNT(*) AS cnt
# FROM
# (
#    ...
# )
# GROUP BY item_sk_1, item_sk_2
# -- 'frequently'
# HAVING cnt > {q01_viewed_together_count}
# ORDER BY cnt DESC, item_sk_1, item_sk_2

grouped_df = (
    pair_df.groupby(["item_sk_1", "item_sk_2"])
    .size()
    .reset_index()
    .rename(columns={0: "cnt"})
)
grouped_df = grouped_df[grouped_df["cnt"] > q01_viewed_together_count].reset_index(
    drop=True
)


CPU times: user 105 ms, sys: 5.31 ms, total: 110 ms
Wall time: 108 ms


In [16]:
grouped_df.dtypes

item_sk_1    int64
item_sk_2    int64
cnt          int64
dtype: object

In [17]:
grouped_df = grouped_df.sort_values(
    by=["cnt", "item_sk_1", "item_sk_2"], ascending=[False, True, True]
)

In [18]:
grouped_df

Unnamed: 0,item_sk_1,item_sk_2,cnt
7,49885,77827,57
2,25003,75205,54
1,17035,94801,52
8,57751,80629,52
0,8575,63991,51
3,30337,77719,51
4,34351,63991,51
5,42751,61627,51
6,47911,93625,51


In [19]:
grouped_df = grouped_df.reset_index(drop=True)
grouped_df = grouped_df.head(q01_limit)

In [20]:
grouped_df

Unnamed: 0,item_sk_1,item_sk_2,cnt
0,49885,77827,57
1,25003,75205,54
2,17035,94801,52
3,57751,80629,52
4,8575,63991,51
5,30337,77719,51
6,34351,63991,51
7,42751,61627,51
8,47911,93625,51


## Time the full query

In [1]:
def get_pairs(
    df,
    col_name="ss_item_sk",
    merge_col="ss_ticket_number",
    pair_col="ss_item_sk",
    output_col_1="item_sk_1",
    output_col_2="item_sk_2",
):
    pair_df = df.merge(df, on=merge_col, suffixes=["_t1", "_t2"])
    pair_df = pair_df[[f"{pair_col}_t1", f"{pair_col}_t2"]]
    pair_df = pair_df[
        pair_df[f"{pair_col}_t1"] < pair_df[f"{pair_col}_t2"]
    ].reset_index(drop=True)
    pair_df = pair_df.rename(
        columns={f"{pair_col}_t1": output_col_1, f"{pair_col}_t2": output_col_2}
    )
    return pair_df


In [2]:
q01_i_category_id_IN = [1, 2, 3]
# -- sf1 -> 11 stores, 90k sales in 820k lines
q01_ss_store_sk_IN = [10, 20, 33, 40, 50]
q01_viewed_together_count = 50
q01_limit = 100

item_cols = ["i_item_sk", "i_category_id"]
ss_cols = ["ss_item_sk", "ss_store_sk", "ss_ticket_number"]

In [3]:
pd.options.mode.data_manager = "block"

In [4]:
item_df = pd.read_feather("data/scale-10/item.feather", columns=item_cols)._as_manager("block").copy()
ss_df = pd.read_feather("data/scale-10/store_sales.feather", columns=ss_cols)._as_manager("block").copy()

In [5]:
%%timeit
f_ss_df = ss_df.loc[ss_df["ss_store_sk"].isin(q01_ss_store_sk_IN)][
    ["ss_item_sk", "ss_ticket_number"]
].reset_index(drop=True)

f_item_df = item_df.loc[item_df["i_category_id"].isin(q01_i_category_id_IN)][
    ["i_item_sk"]
].reset_index(drop=True)

ss_item_join = f_item_df.merge(
    f_ss_df, left_on=["i_item_sk"], right_on=["ss_item_sk"]
)
ss_item_join = ss_item_join[["ss_item_sk", "ss_ticket_number"]]

ss_item_join = ss_item_join.drop_duplicates()

### do pair inner join
pair_df = get_pairs(ss_item_join)

grouped_df = (
    pair_df.groupby(["item_sk_1", "item_sk_2"])
    .size()
    .reset_index()
    .rename(columns={0: "cnt"})
)
grouped_df = grouped_df[grouped_df["cnt"] > q01_viewed_together_count].reset_index(
    drop=True
)

grouped_df = grouped_df.sort_values(
    by=["cnt", "item_sk_1", "item_sk_2"], ascending=[False, True, True]
)
grouped_df = grouped_df.reset_index(drop=True)
grouped_df = grouped_df.head(q01_limit)

646 ms ± 25.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
pd.options.mode.data_manager = "array"

In [7]:
item_df = pd.read_feather("data/scale-10/item.feather", columns=item_cols)._as_manager("array").copy()
ss_df = pd.read_feather("data/scale-10/store_sales.feather", columns=ss_cols)._as_manager("array").copy()

In [8]:
%%timeit
f_ss_df = ss_df.loc[ss_df["ss_store_sk"].isin(q01_ss_store_sk_IN)][
    ["ss_item_sk", "ss_ticket_number"]
].reset_index(drop=True)

f_item_df = item_df.loc[item_df["i_category_id"].isin(q01_i_category_id_IN)][
    ["i_item_sk"]
].reset_index(drop=True)

ss_item_join = f_item_df.merge(
    f_ss_df, left_on=["i_item_sk"], right_on=["ss_item_sk"]
)
ss_item_join = ss_item_join[["ss_item_sk", "ss_ticket_number"]]

ss_item_join = ss_item_join.drop_duplicates()

### do pair inner join
pair_df = get_pairs(ss_item_join)

grouped_df = (
    pair_df.groupby(["item_sk_1", "item_sk_2"])
    .size()
    .reset_index()
    .rename(columns={0: "cnt"})
)
grouped_df = grouped_df[grouped_df["cnt"] > q01_viewed_together_count].reset_index(
    drop=True
)

grouped_df = grouped_df.sort_values(
    by=["cnt", "item_sk_1", "item_sk_2"], ascending=[False, True, True]
)
grouped_df = grouped_df.reset_index(drop=True)
grouped_df = grouped_df.head(q01_limit)

634 ms ± 40.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
