In [None]:
import polars as pl
import pandas as pd
import datetime as dt

# pl.Config.set_verbose()
# %env POLARS_MAX_THREADS=8

# Eager

In [None]:
interactions_path = "../data/contentwise/data/contentwise/CW10M/interactions/*.parquet"
impressions_dl_path = "../data/contentwise/data/contentwise/CW10M/impressions-direct-link/*.parquet"

interactions = pl.read_parquet(interactions_path)
impressions_dl = pl.read_parquet(impressions_dl_path)

# Select 'clicks' only from all interactions
interactions = interactions.filter(pl.col("interaction_type") == 0)

impressions_dl = impressions_dl.explode("recommended_series_list")

# Join indirectly positive actions with negative (impressions)
interactions = interactions.join(impressions_dl, on="recommendation_id", how="inner")

# Mark positive interactions with 1 and negative with 0
interactions = interactions.with_column(
    pl.when(pl.col("series_id") == pl.col("recommended_series_list"))
    .then(1)
    .otherwise(0)
    .alias("target")
)
interactions = interactions.rename(
    {
        "user_id": "user",
        "recommended_series_list": "item",
        "utc_ts_milliseconds": "timestamp",
    }
)
interactions = interactions.with_column(
    pl.col("timestamp").cast(pl.Datetime).dt.with_time_unit("ms")
)

# Handle (user, item) duplicates
interactions = interactions.groupby(["user", "item"]).agg(
    [pl.sum("target"), pl.max("timestamp")]
)
interactions = interactions.with_column(
    pl.when(pl.col("target") > 0).then(1).otherwise(0).alias("target")
)

interactions = interactions.sort("timestamp")

# Split data
train_data = interactions.filter(pl.col("timestamp") < dt.date(2019, 4, 14))
val_data = interactions.filter(pl.col("timestamp") >= dt.date(2019, 4, 14))

# Prepare user/item to idx mappers based on train data
train_user_to_idx = train_data.select(
    [
        pl.col("user").unique(),
        pl.col("user").unique().rank().cast(pl.Int64).alias("user_idx") - 1,
    ]
)
train_item_to_idx = train_data.select(
    [
        pl.col("item").unique(),
        pl.col("item").unique().rank().cast(pl.Int64).alias("item_idx") - 1,
    ]
)

# Map user/item to idx
train_data = train_data.join(train_user_to_idx, on="user", how="inner")
train_data = train_data.join(train_item_to_idx, on="item", how="inner")
val_data = val_data.join(train_user_to_idx, on="user", how="inner")
val_data = val_data.join(train_item_to_idx, on="item", how="inner")

# Select valid columns
train_data = train_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
        "timestamp",
    ]
)
val_data = val_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
        "timestamp",
    ]
)
test_data = val_data  # test set == validation set (to change in the future!)

In [None]:
train_data.shape, val_data.shape, test_data.shape

In [None]:
train_user_to_idx.shape, train_item_to_idx.shape

In [None]:
train_data.head(), val_data.head(), test_data.head()

In [None]:
id(train_data), id(val_data), id(test_data)

## bpr

In [None]:
interactions_path = "../data/contentwise/data/contentwise/CW10M/interactions/*.parquet"
impressions_dl_path = "../data/contentwise/data/contentwise/CW10M/impressions-direct-link/*.parquet"

interactions = pl.read_parquet(interactions_path)
impressions_dl = pl.read_parquet(impressions_dl_path)

# Select 'clicks' only from all interactions
interactions = interactions.filter(pl.col("interaction_type") == 0)
impressions_dl = impressions_dl.explode("recommended_series_list")

# Join indirectly positive actions with negative (impressions)
interactions = interactions.join(impressions_dl, on="recommendation_id", how="inner")

# Mark positive interactions with 1 and negative with 0
interactions = interactions.with_column(
    pl.when(pl.col("series_id") == pl.col("recommended_series_list"))
    .then(1)
    .otherwise(0)
    .alias("target")
)
interactions = interactions.rename(
    {
        "user_id": "user",
        "recommended_series_list": "item",
        "utc_ts_milliseconds": "timestamp",
    }
)
interactions = interactions.with_column(
    pl.col("timestamp").cast(pl.Datetime).dt.with_time_unit("ms")
)
# Handle (user, item) duplicates
interactions = interactions.groupby(["user", "item"]).agg(
    [pl.sum("target"), pl.max("timestamp")]
)
interactions = interactions.with_column(
    pl.when(pl.col("target") > 0).then(1).otherwise(0).alias("target")
)
interactions = interactions.sort("timestamp")

# Split data
train_data = interactions.filter(pl.col("timestamp") < dt.date(2019, 4, 14))
val_data = interactions.filter(pl.col("timestamp") >= dt.date(2019, 4, 14))

# Transform train data to be BPR specific
train_data_neg = train_data.filter(pl.col("target") == 0)
train_data_pos = train_data.filter(pl.col("target") == 1)
train_data = train_data_neg.join(train_data_pos, on="user", how="inner")

# Prepare user/item to idx mappers
train_user_to_idx = train_data.select(
    [
        pl.col("user").unique(),
        pl.col("user").unique().rank().cast(pl.Int64).alias("user_idx") - 1,
    ]
)
train_item_to_idx = train_data.select(
    [
        pl.concat((pl.col("item"), pl.col("item_right"))).unique(),
        pl.concat((pl.col("item"), pl.col("item_right"))).unique().rank().cast(pl.Int64).alias("item_idx") - 1,
    ]
)

train_data = train_data.join(train_user_to_idx, on="user", how="inner")
train_data = train_data.join(train_item_to_idx, on="item", how="inner")
train_data = train_data.join(train_item_to_idx, left_on="item_right", right_on="item", how="inner")
val_data = val_data.join(train_user_to_idx, on="user", how="inner")
val_data = val_data.join(train_item_to_idx, on="item", how="inner")

# Select valid columns
train_data = train_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item_neg"),
        pl.col("item_idx_right").alias("item_pos"),
    ]
)
val_data = val_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
    ]
)
test_data = val_data  # test set == validation set (to change in the future!)

In [None]:
train_data.shape, val_data.shape, test_data.shape

In [None]:
train_user_to_idx.shape, train_item_to_idx.shape

In [None]:
train_data.head(), val_data.head(), test_data.head()

# Lazy

In [None]:
interactions_path = "../data/contentwise/data/contentwise/CW10M/interactions/*.parquet"
impressions_dl_path = "../data/contentwise/data/contentwise/CW10M/impressions-direct-link/*.parquet"

interactions = pl.scan_parquet(interactions_path)
impressions_dl = pl.scan_parquet(impressions_dl_path)

# Select 'clicks' only from all interactions
interactions = interactions.filter(pl.col("interaction_type") == 0)

impressions_dl = impressions_dl.explode("recommended_series_list")

# Join indirectly positive actions with negative (impressions)
interactions = interactions.join(impressions_dl, on="recommendation_id", how="inner")

# Mark positive interactions with 1 and negative with 0
interactions = interactions.with_column(
    pl.when(pl.col("series_id") == pl.col("recommended_series_list"))
    .then(1)
    .otherwise(0)
    .alias("target")
)
interactions = interactions.rename(
    {
        "user_id": "user",
        "recommended_series_list": "item",
        "utc_ts_milliseconds": "timestamp",
    }
)
interactions = interactions.with_column(
    pl.col("timestamp").cast(pl.Datetime).dt.with_time_unit("ms")
)
# Handle (user, item) duplicates
interactions = interactions.groupby(["user", "item"]).agg(
    [pl.sum("target"), pl.max("timestamp")]
)
interactions = interactions.with_column(
    pl.when(pl.col("target") > 0).then(1).otherwise(0).alias("target")
)
interactions = interactions.sort("timestamp")
interactions = interactions.cache()

# Split data
train_data = interactions.filter(pl.col("timestamp") < dt.date(2019, 4, 14))
val_data = interactions.filter(pl.col("timestamp") >= dt.date(2019, 4, 14))

# Prepare user/item to idx mappers based on train data
train_user_to_idx = train_data.select(
    [
        pl.col("user").unique(),
        pl.col("user").unique().rank().cast(pl.Int64).alias("user_idx") - 1,
    ]
)
train_item_to_idx = train_data.select(
    [
        pl.col("item").unique(),
        pl.col("item").unique().rank().cast(pl.Int64).alias("item_idx") - 1,
    ]
)

# Map user/item to idx
train_data = train_data.join(train_user_to_idx, on="user", how="inner")
train_data = train_data.join(train_item_to_idx, on="item", how="inner")
val_data = val_data.join(train_user_to_idx, on="user", how="inner")
val_data = val_data.join(train_item_to_idx, on="item", how="inner")

# Select valid columns
train_data = train_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
    ]
)
val_data = val_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
    ]
)
test_data = val_data  # test set == validation set (to change in the future!)

In [None]:
train_data.collect().head(), val_data.collect().head(), test_data.collect().head()

# train_data.collect().write_parquet("train_data_implicit.parquet")
# val_data.collect().write_parquet("val_data_implicit.parquet")
# test_data.collect().write_parquet("test_data_implicit.parquet")

## bpr

In [None]:
interactions_path = "../data/contentwise/data/contentwise/CW10M/interactions/*.parquet"
impressions_dl_path = "../data/contentwise/data/contentwise/CW10M/impressions-direct-link/*.parquet"

interactions = pl.scan_parquet(interactions_path)
impressions_dl = pl.scan_parquet(impressions_dl_path)

# Select 'clicks' only from all interactions
interactions = interactions.filter(pl.col("interaction_type") == 0)
impressions_dl = impressions_dl.explode("recommended_series_list")

# Join indirectly positive actions with negative (impressions)
interactions = interactions.join(impressions_dl, on="recommendation_id", how="inner")

# Mark positive interactions with 1 and negative with 0
interactions = interactions.with_column(
    pl.when(pl.col("series_id") == pl.col("recommended_series_list"))
    .then(1)
    .otherwise(0)
    .alias("target")
)
interactions = interactions.rename(
    {
        "user_id": "user",
        "recommended_series_list": "item",
        "utc_ts_milliseconds": "timestamp",
    }
)
interactions = interactions.with_column(
    pl.col("timestamp").cast(pl.Datetime).dt.with_time_unit("ms")
)
# Handle (user, item) duplicates
interactions = interactions.groupby(["user", "item"]).agg(
    [pl.sum("target"), pl.max("timestamp")]
)
interactions = interactions.with_column(
    pl.when(pl.col("target") > 0).then(1).otherwise(0).alias("target")
)
interactions = interactions.sort("timestamp")
interactions = interactions.cache()

# Split data
train_data = interactions.filter(pl.col("timestamp") < dt.date(2019, 4, 14))
val_data = interactions.filter(pl.col("timestamp") >= dt.date(2019, 4, 14))

# Transform train data to be BPR specific
train_data_neg = train_data.filter(pl.col("target") == 0)
train_data_pos = train_data.filter(pl.col("target") == 1)
train_data = train_data_neg.join(train_data_pos, on="user", how="inner")

# Prepare user/item to idx mappers
train_user_to_idx = train_data.select(
    [
        pl.col("user").unique(),
        pl.col("user").unique().rank().cast(pl.Int64).alias("user_idx") - 1,
    ]
)
train_item_to_idx = train_data.select(
    [
        pl.concat((pl.col("item"), pl.col("item_right"))).unique(),
        pl.concat((pl.col("item"), pl.col("item_right"))).unique().rank().cast(pl.Int64).alias("item_idx") - 1,
    ]
)

train_data = train_data.join(train_user_to_idx, on="user", how="inner")
train_data = train_data.join(train_item_to_idx, on="item", how="inner")
train_data = train_data.join(train_item_to_idx, left_on="item_right", right_on="item", how="inner")
val_data = val_data.join(train_user_to_idx, on="user", how="inner")
val_data = val_data.join(train_item_to_idx, on="item", how="inner")

# Select valid columns
train_data = train_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item_neg"),
        pl.col("item_idx_right").alias("item_pos"),
    ]
)
val_data = val_data.select(
    [
        pl.col("user_idx").alias("user"),
        pl.col("item_idx").alias("item"),
        "target",
    ]
)
test_data = val_data  # test set == validation set (to change in the future!)

In [None]:
train_data.collect().head(), val_data.collect().head(), test_data.collect().head()

# train_data.collect().write_parquet("train_data_implicit_bpr.parquet")
# val_data.collect().write_parquet("val_data_implicit_bpr.parquet")
# test_data.collect().write_parquet("test_data_implicit_bpr.parquet")

# Read saved parquets

In [None]:
train_data = pl.read_parquet("train_data_implicit.parquet")
pl.n_unique(train_data["user"]), pl.n_unique(train_data["item"])

In [None]:
train_data_bpr = pl.read_parquet("train_data_implicit_bpr.parquet")
pl.n_unique(train_data_bpr["user"])