In [150]:
from pathlib import Path
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
import dill
import pandas as pd
import polars as pl

In [2]:
DATA_FOLDER = Path("/home/sirpantene/data/otto/")

In [164]:
with open(DATA_FOLDER / "test.jsonl") as f:
    print(f.readline())
    print(f.readline())
    print(f.readline())
    print(f.readline())

{"session":12899779,"events":[{"aid":59625,"ts":1661724000278,"type":"clicks"}]}

{"session":12899780,"events":[{"aid":1142000,"ts":1661724000378,"type":"clicks"},{"aid":582732,"ts":1661724058352,"type":"clicks"},{"aid":973453,"ts":1661724109199,"type":"clicks"},{"aid":736515,"ts":1661724136868,"type":"clicks"},{"aid":1142000,"ts":1661724155248,"type":"clicks"}]}

{"session":12899781,"events":[{"aid":141736,"ts":1661724000559,"type":"clicks"},{"aid":199008,"ts":1661724022851,"type":"clicks"},{"aid":57315,"ts":1661724170835,"type":"clicks"},{"aid":194067,"ts":1661724246188,"type":"clicks"},{"aid":199008,"ts":1661780623778,"type":"clicks"},{"aid":199008,"ts":1661781274081,"type":"clicks"},{"aid":199008,"ts":1661781409993,"type":"carts"},{"aid":199008,"ts":1661804151788,"type":"clicks"},{"aid":199008,"ts":1662060028567,"type":"clicks"},{"aid":199008,"ts":1662060064706,"type":"clicks"},{"aid":918667,"ts":1662060160406,"type":"clicks"}]}

{"session":12899782,"events":[{"aid":1669402,"ts":16

In [24]:
TRAIN_PROCESSED = DATA_FOLDER / "train_parquet"
TRAIN_PROCESSED.mkdir(parents=True, exist_ok=True)

TEST_PROCESSED = DATA_FOLDER / "test_parquet"
TEST_PROCESSED.mkdir(parents=True, exist_ok=True)

In [31]:
save = True
type_map = {
    "clicks": 0,
    "carts": 1,
    "orders": 2
}

def read_chunk(chunk, e, path):
    start = str(e*chunksize).zfill(9)
    end = str(e*chunksize+chunksize).zfill(9)
    
    print(f"{start}_{end}.parquet")
    event_dict = {
        'session': [],
        'aid': [],
        'ts': [],
        'type': [],
    }
    
    for session, events in zip(chunk['session'].tolist(), chunk['events'].tolist()):
        for event in events:
            event_dict['session'].append(session)
            event_dict['aid'].append(event['aid'])
            event_dict['ts'].append(event['ts'])
            event_dict['type'].append(type_map[event['type']])
    
    # save DataFrame

    event_df = pd.DataFrame(event_dict)
    if save == True:
        event_df.to_parquet(path / f"{start}_{end}.parquet")
 

In [32]:
chunksize = 100_000
chunks = pd.read_json(
    DATA_FOLDER / "test.jsonl",
    lines=True, chunksize=chunksize
)

res = Parallel(n_jobs=16)(
    delayed(read_chunk)(chunk, e, path=TEST_PROCESSED)
    for e, chunk in enumerate(tqdm(chunks))
)

0it [00:00, ?it/s]

In [34]:
df_train = pl.read_parquet(TRAIN_PROCESSED, use_pyarrow=True)
df_test = pl.read_parquet(TEST_PROCESSED, use_pyarrow=True)

In [35]:
df_train

session,aid,ts,type
i64,i64,i64,i64
0,1517085,1659304800025,0
0,1563459,1659304904511,0
0,1309446,1659367439426,0
0,16246,1659367719997,0
0,1781822,1659367871344,0
0,1152674,1659367885796,0
0,1649869,1659369893840,1
0,461689,1659369898050,1
0,305831,1659370027105,2
0,461689,1659370027105,2


In [36]:
df_test

session,aid,ts,type
i64,i64,i64,i64
12899779,59625,1661724000278,0
12899780,1142000,1661724000378,0
12899780,582732,1661724058352,0
12899780,973453,1661724109199,0
12899780,736515,1661724136868,0
12899780,1142000,1661724155248,0
12899781,141736,1661724000559,0
12899781,199008,1661724022851,0
12899781,57315,1661724170835,0
12899781,194067,1661724246188,0


In [39]:
item_stats = (
    df_train
    .groupby("aid")
    .agg([
        pl.n_unique("session").alias("uniq_sessions"),
        pl.min("ts").cast(
            pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
        ).alias("min_ts"),
    ])
    .sort("uniq_sessions", reverse=True)
)

In [37]:
item_stats_test = (
    df_test
    .groupby("aid")
    .agg([
        pl.n_unique("session").alias("uniq_sessions"),
        pl.min("ts").cast(
            pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
        ).alias("min_ts"),
    ])
    .sort("uniq_sessions", reverse=True)
)

In [42]:
item_stats.select([
    pl.min("min_ts").alias("first"),
    pl.max("min_ts").alias("last"),
])

first,last
"datetime[ms, Etc/GMT-2]","datetime[ms, Etc/GMT-2]"
2022-08-01 00:00:00.025 +02,2022-08-28 23:53:49.990 +02


In [44]:
item_stats_test.select([
    pl.min("min_ts").alias("first"),
    pl.max("min_ts").alias("last"),
])

first,last
"datetime[ms, Etc/GMT-2]","datetime[ms, Etc/GMT-2]"
2022-08-29 00:00:00.278 +02,2022-09-04 23:59:41.349 +02


In [54]:
(
    item_stats["aid"].n_unique(), 
    item_stats_test["aid"].n_unique(), 
    (
        item_stats.select(["aid"])
        .join(item_stats_test.select(["aid"]), on="aid", how="outer")
    ).shape,
    (
        item_stats.filter(pl.col("uniq_sessions") < 5).select(["aid"])
        .join(item_stats_test.select(["aid"]), on="aid", how="inner")
    ).shape
)

(1855603, 783486, (1855603, 1), (22977, 1))

In [55]:
session_stats = (
    df_train
    .groupby("session")
    .agg([
        pl.n_unique("aid").alias("uniq_aids"),
        pl.min("ts").cast(
            pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
        ).alias("min_ts"),
    ])
    .sort("uniq_aids", reverse=True)
)

In [56]:
session_stats

session,uniq_aids,min_ts
i64,u32,"datetime[ms, Etc/GMT-2]"
10185698,486,2022-08-19 10:21:15.261 +02
10448829,485,2022-08-20 04:51:13.302 +02
10809532,484,2022-08-21 10:21:07.066 +02
3289838,484,2022-08-04 13:48:35.840 +02
10765542,484,2022-08-21 04:51:05.079 +02
3847585,484,2022-08-05 10:32:27.666 +02
9358412,484,2022-08-16 18:21:41.076 +02
9475221,483,2022-08-16 23:31:13.003 +02
10327806,482,2022-08-19 18:21:35.721 +02
9495566,482,2022-08-17 04:51:05.161 +02


In [60]:
df_train.filter(pl.col("session") == 6944129)

session,aid,ts,type
i64,i64,i64,i64
6944129,1051188,1660232260586,0
6944129,1051188,1660232291810,1
6944129,1051188,1660232633459,0


In [96]:
pldf_user_type_stats = (
    df_train
    .groupby(["session", "type"])
    .agg([
        pl.n_unique("aid").alias("uniq_aids"),
    ])
    .sort("uniq_aids", reverse=True)
    .pivot(
        values="uniq_aids", index="session", columns="type"
    )
    .rename({
        "0": "uniq_clicks",
        "1": "uniq_carts",
        "2": "uniq_orders",
    })
    .fill_null(0)
)

In [97]:
df_train.filter(pl.col("session") == 7249857)

session,aid,ts,type
i64,i64,i64,i64
7249857,922824,1660297875116,0
7249857,922824,1660297925095,0
7249857,922824,1660298015195,1


In [98]:
pldf_user_type_stats.filter(pl.col("session") == 7249857)

session,uniq_clicks,uniq_carts,uniq_orders
i64,u32,u32,u32
7249857,1,1,0


In [159]:
def data_stats(df_train):
    
    pldf_user_type_stats = (
        df_train
        .groupby(["session", "type"])
        .agg([
            pl.n_unique("aid").alias("uniq_aids"),
        ])
        .sort("uniq_aids", reverse=True)
        .pivot(
            values="uniq_aids", index="session", columns="type"
        )
        .rename({
            "0": "uniq_clicks",
            "1": "uniq_carts",
            "2": "uniq_orders",
        })
        .fill_null(0)
    )
    
    pldf_user_stats = (
        df_train
        .groupby("session")
        .agg([
            pl.n_unique("aid").alias("uniq_aids"),
            pl.min("ts").cast(
                pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
            ).alias("min_ts"),
            pl.max("ts").cast(
                pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
            ).alias("max_ts"),
        ])
        .sort("uniq_aids", reverse=True)
        .join(pldf_user_type_stats, on="session", how="inner")
    )
    
    pldf_item_type_stats = (
        df_train
        .groupby(["aid", "type"])
        .agg([
            pl.n_unique("session").alias("uniq_sessions"),
        ])
        .sort("uniq_sessions", reverse=True)
        .pivot(
            values="uniq_sessions", index="aid", columns="type"
        )
        .rename({
            "0": "uniq_clicks",
            "1": "uniq_carts",
            "2": "uniq_orders",
        })
        .fill_null(0)
    )

    pldf_item_stats = (
        df_train
        .groupby("aid")
        .agg([
            pl.n_unique("session").alias("uniq_sessions"),
            pl.min("ts").cast(
                pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
            ).alias("min_ts"),
            pl.max("ts").cast(
                pl.Datetime(time_unit="ms", time_zone="Etc/GMT-2")
            ).alias("max_ts"),
        ])
        .sort("uniq_sessions", reverse=True)
        .join(pldf_item_type_stats, on="aid", how="inner")
    )

    return pldf_user_stats, pldf_item_stats

In [160]:
df_session_stats, df_aid_stats = data_stats(df_train)

In [163]:
df_aid_stats.filter(pl.col("uniq_carts") > 0)

aid,uniq_sessions,min_ts,max_ts,uniq_clicks,uniq_carts,uniq_orders
i64,u32,"datetime[ms, Etc/GMT-2]","datetime[ms, Etc/GMT-2]",u32,u32,u32
108125,68279,2022-08-01 00:00:08.809 +02,2022-08-28 23:59:41.358 +02,68256,2778,561
1460571,59727,2022-08-01 00:00:51.935 +02,2022-08-28 23:59:47.172 +02,59685,4700,1541
29735,53525,2022-08-01 00:04:13.141 +02,2022-08-28 23:59:11.228 +02,53461,6852,1752
184976,51196,2022-08-01 00:00:30.963 +02,2022-08-28 23:59:38.664 +02,51140,3369,818
95488,50261,2022-08-01 00:01:03.620 +02,2022-08-28 23:58:38.520 +02,50260,132,0
1502122,50131,2022-08-01 00:00:03.838 +02,2022-08-28 23:57:22.033 +02,50114,3784,0
1733943,48604,2022-08-01 00:02:04.388 +02,2022-08-28 23:59:48.904 +02,48539,7918,2807
959208,48195,2022-08-01 00:01:29.514 +02,2022-08-28 23:58:26.706 +02,48150,1271,432
322370,45826,2022-08-01 00:02:13.815 +02,2022-08-28 23:59:15.123 +02,45664,6114,0
231487,44171,2022-08-01 00:00:01.999 +02,2022-08-28 23:58:20.779 +02,43810,8816,3951


In [161]:
df_session_stats

session,uniq_aids,min_ts,max_ts,uniq_clicks,uniq_carts,uniq_orders
i64,u32,"datetime[ms, Etc/GMT-2]","datetime[ms, Etc/GMT-2]",u32,u32,u32
10185698,486,2022-08-19 10:21:15.261 +02,2022-08-19 12:28:08.560 +02,486,0,0
10448829,485,2022-08-20 04:51:13.302 +02,2022-08-20 06:13:54.091 +02,485,0,0
10765542,484,2022-08-21 04:51:05.079 +02,2022-08-21 06:02:23.664 +02,484,0,0
10809532,484,2022-08-21 10:21:07.066 +02,2022-08-21 12:03:08.060 +02,484,0,0
9358412,484,2022-08-16 18:21:41.076 +02,2022-08-16 19:26:47.597 +02,484,0,0
3289838,484,2022-08-04 13:48:35.840 +02,2022-08-04 14:24:07.171 +02,484,0,0
3847585,484,2022-08-05 10:32:27.666 +02,2022-08-05 12:03:38.178 +02,484,0,0
9475221,483,2022-08-16 23:31:13.003 +02,2022-08-17 00:57:03.564 +02,483,0,0
10327806,482,2022-08-19 18:21:35.721 +02,2022-08-19 19:28:20.649 +02,482,0,0
9495566,482,2022-08-17 04:51:05.161 +02,2022-08-17 06:19:10.368 +02,482,0,0


In [162]:
df_session_stats.filter(pl.col("uniq_carts") > 0)

session,uniq_aids,min_ts,max_ts,uniq_clicks,uniq_carts,uniq_orders
i64,u32,"datetime[ms, Etc/GMT-2]","datetime[ms, Etc/GMT-2]",u32,u32,u32
4164784,403,2022-08-05 20:16:28.148 +02,2022-08-18 18:39:30.617 +02,401,9,0
107112,399,2022-08-01 06:25:31.214 +02,2022-08-23 11:16:56.503 +02,397,2,0
8433998,394,2022-08-14 17:35:57.340 +02,2022-08-26 10:27:26.187 +02,393,21,10
97015,386,2022-08-01 05:54:54.216 +02,2022-08-19 14:35:09.115 +02,386,5,4
1459773,386,2022-08-02 11:14:38.354 +02,2022-08-18 20:59:32.814 +02,385,3,1
1448805,379,2022-08-02 11:00:14.623 +02,2022-08-20 19:25:58.156 +02,379,4,0
9740555,373,2022-08-17 20:01:44.438 +02,2022-08-28 22:11:11.291 +02,370,48,3
341942,369,2022-08-01 11:05:28.249 +02,2022-08-28 22:42:06.813 +02,369,2,0
2199030,369,2022-08-03 03:20:15.879 +02,2022-08-27 21:47:21.310 +02,369,7,0
4562379,365,2022-08-06 16:00:03.388 +02,2022-08-07 01:15:44.040 +02,364,4,0


In [108]:
df_session_stats.filter(pl.col("uniq_orders") > 0)

session,uniq_aids,min_ts,uniq_clicks,uniq_carts,uniq_orders
i64,u32,"datetime[ms, Etc/GMT-2]",u32,u32,u32
8433998,394,2022-08-14 17:35:57.340 +02,393,21,10
97015,386,2022-08-01 05:54:54.216 +02,386,5,4
1459773,386,2022-08-02 11:14:38.354 +02,385,3,1
9740555,373,2022-08-17 20:01:44.438 +02,370,48,3
1392204,365,2022-08-02 09:43:13.195 +02,363,21,3
5275857,364,2022-08-07 21:21:56.254 +02,362,14,2
8884222,359,2022-08-15 16:05:25.091 +02,356,39,10
179643,355,2022-08-01 08:20:01.014 +02,355,2,2
3152028,353,2022-08-04 10:06:57.338 +02,353,8,4
257979,349,2022-08-01 09:43:53.429 +02,349,6,2


In [112]:
def warm_users_and_models(pldf_user_stats, pldf_model_stats):
#     min_actions = self.dataset_params["min_actions"]
#     min_users = self.dataset_params["min_users"]
#     min_transactions = self.dataset_params["min_transactions"]
    
    min_actions = 5
    min_users = 5
    min_transactions = 1

    pldf_train_users = (
        pldf_user_stats
        .filter(pl.col("uniq_aids") >= min_actions)
        .filter(
            (pl.col("uniq_carts") >= min_transactions) |
            (pl.col("uniq_orders") >= min_transactions)
        )
        .select(["session"])
    )

    pldf_train_models = (
        pldf_model_stats
        .filter(pl.col("uniq_sessions") >= min_users)
        .filter(
            (pl.col("uniq_carts") >= min_transactions) |
            (pl.col("uniq_orders") >= min_transactions)
        )
        .select(["aid"])
    )

    return pldf_train_users, pldf_train_models

In [113]:
df_train_sessions, df_train_aids = warm_users_and_models(
    df_session_stats, df_aid_stats
)

In [114]:
df_train_sessions

session
i64
4164784
107112
8433998
97015
1459773
1448805
9740555
341942
2199030
4562379


In [115]:
df_train_aids

aid
i64
108125
1460571
29735
184976
95488
1502122
1733943
959208
322370
231487


In [120]:
print("Num lost items for test sessions")

(
    item_stats_test["aid"].n_unique(),
    (
        df_train_aids.select(["aid"])
        .join(item_stats_test.select(["aid"]), on="aid", how="inner")
    ).shape,
)

Num lost items for test sessions


(783486, (619802, 1))

In [158]:
N_LAST_ACTIONS = 200

df_train_interactions_warm = (
    df_train
    .join(df_train_sessions, on="session", how="inner")
    .join(df_train_aids, on="aid", how="inner")
    .sort("ts")
    .with_columns([pl.lit(1).alias("ones"),])
    .with_columns([pl.col("ones").cumsum().over("session").alias("user_interaction_rank"),])
    .with_columns([pl.max("user_interaction_rank").over("session").alias("user_interaction_total"),])
    .with_columns([
        (
            pl.col("user_interaction_total") - N_LAST_ACTIONS
        ).alias("split_user_interaction_rank"),
    ])
)

In [None]:
df_train_interactions_warm

In [123]:
from tasks.data.dataset.mappers import EntityEncoder

In [139]:
session_encoder = EntityEncoder(ext_name="session", int_name="uiid")
aid_encoder = EntityEncoder(ext_name="aid", int_name="miid")

In [140]:
session_encoder.set_external_ids(df_train_interactions_warm["session"])
aid_encoder.set_external_ids(df_train_interactions_warm["aid"])

In [141]:
df_train_interactions_warm = (
    df_train_interactions_warm
    .join(session_encoder.mapping_df, on="session", how="left")  # all users
    .join(aid_encoder.mapping_df, on="aid", how="inner")  # only known models
)

In [142]:
df_train_interactions_warm

session,aid,ts,type,uiid,miid
i64,i64,i64,i64,i64,i64
0,1517085,1659304800025,0,0,951831
0,1309446,1659367439426,0,0,821605
0,16246,1659367719997,0,0,10217
0,1781822,1659367871344,0,0,1117991
0,1649869,1659369893840,1,0,1035175
0,461689,1659369898050,1,0,289459
0,305831,1659370027105,2,0,191473
0,461689,1659370027105,2,0,289459
0,362233,1659370064916,0,0,226883
0,1649869,1659370067686,0,0,1035175


In [135]:
from tasks.jobs import Splitter

In [136]:
splitter = Splitter()

In [143]:
n_valid_users = 40000

df_train_interactions, df_valid_interactions = splitter.user_split(
    df_train_interactions_warm,
    user_col="session",
    test_size=n_valid_users
)

In [146]:
def build_input_sequences(pldf_interactions):
    return (
        pldf_interactions
        .sort("ts")
        .groupby(["session", "uiid"])
        .agg([
            pl.list("miid").alias("miid_seq"),
            pl.list("ts").alias("ts_seq"),
            pl.list("type").alias("transaction_seq"),
#             pl.list("transaction_weight").alias("transaction_seq"),
        ])
    )

In [147]:
df_train_seq = build_input_sequences(df_train_interactions)
df_valid_seq = build_input_sequences(df_valid_interactions)

In [149]:
DATASET_PATH = DATA_FOLDER / "dataset_v0"
DATASET_PATH.mkdir(parents=True, exist_ok=True)

In [152]:
with open(DATASET_PATH / "session_encoder.dill", "wb") as f:
    dill.dump(session_encoder, f)

with open(DATASET_PATH / "aid_encoder.dill", "wb") as f:
    dill.dump(aid_encoder, f)
    
df_train_seq.write_parquet(DATASET_PATH / "input_train.parquet", use_pyarrow=True)
df_valid_seq.write_parquet(DATASET_PATH / "input_valid.parquet", use_pyarrow=True)
