In [1]:
import os
import sys

import dill
import numpy as np
import pandas as pd
from datasets import load_dataset
from feast import FeatureStore
from loguru import logger
from pydantic import BaseModel
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from features.tfm import (
    categories_pipeline_steps,
    description_pipeline_steps,
    flatten_string_array_col,
    price_parse_dtype,
    price_pipeline_steps,
    rating_agg_pipeline_steps,
    reshape_2d_to_1d,
    title_pipeline_steps,
    todense,
    tokenizer,
)
from pyspark.sql import SparkSession
from datetime import datetime

sys.path.insert(0, "..")
from data_prep_utils import chunk_transform, handle_dtypes
from id_mapper import IDMapper, map_indice

In [2]:
class Args(BaseModel):
    run_name: str = "000-prep-data"
    testing: bool = True
    notebook_persist_dp: str = None
    random_seed: int = 41

    user_col: str = "user_id"
    item_col: str = "parent_asin"
    rating_col: str = "rating"
    timestamp_col: str = "event_timestamp"

    tfm_chunk_size: int = 5000

    sequence_length: int = 10

    def init(self):
        self.notebook_persist_dp = os.path.abspath(f"data/{self.run_name}")
        if not self.testing:
            os.makedirs(self.notebook_persist_dp, exist_ok=True)

        return self


args = Args().init()

print(args.model_dump_json(indent=2))

{
  "run_name": "000-prep-data",
  "testing": true,
  "notebook_persist_dp": "/home/duong/Documents/datn1/src/feature_engineer/data/000-prep-data",
  "random_seed": 41,
  "user_col": "user_id",
  "item_col": "parent_asin",
  "rating_col": "rating",
  "timestamp_col": "event_timestamp",
  "tfm_chunk_size": 5000,
  "sequence_length": 10
}


In [None]:
pvc_path = os.getenv("PVC_PATH")
# pvc_path = "/home/duong/Documents/datn1/data"
if not pvc_path:
    raise ValueError("PVC_PATH environment variable not set")
idm_persist_fp = f"{pvc_path}/idm.json"
train_persist_fp = f"{pvc_path}/train_features.parquet"
val_persist_fp = f"{pvc_path}/val_features.parquet"


metadata_raw_df = pd.read_parquet(f"{pvc_path}/raw_meta.parquet")

In [4]:
metadata_raw_df

Unnamed: 0,main_category,title,average_rating,rating_number,features,description,price,images,videos,store,categories,details,parent_asin,bought_together,subtitle,author
0,Toys & Games,"KUNGOON Happy Anniversary Balloon Banner,Weddi...",4.5,241,[],[],,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': ['Pretty Cool!', 'Product assembly a...",Kunggo,[],"{""Package Dimensions"": ""10.12 x 8.03 x 0.51 in...",B08GPM7CQN,,,
1,Toys & Games,Gothic Mothman Plushie Doll with Bright Red Ey...,1.3,2,[🦋 Mothman’s bright red eyes could stare you d...,[🦋 Description: Mothman’s bright red eyes coul...,18.99,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': [], 'url': [], 'user_id': []}",Felicy,"[Toys & Games, Stuffed Animals & Plush Toys, P...","{""Item Weight"": ""2.47 ounces"", ""Manufacturer r...",B09X9XW42H,,,
2,Toys & Games,Melody Jane Dollhouse Builders DIY 1:24 Scale ...,4.2,67,[1:24 Scale - Plastic - Approximate cut out si...,[],,"{'hi_res': [None, 'https://m.media-amazon.com/...",{'title': ['Cutemini wooden window double door...,Melody Jane Dolls Houses,"[Toys & Games, Dolls & Accessories, Dollhouse ...","{""Item Weight"": ""0.48 ounces"", ""Manufacturer r...",B01I9QET6M,,,
3,Toys & Games,Traxxas Stampede 4X4: 1/10 Scale 4wd Monster T...,4.5,48,[Waterproof electronics for all-weather drivin...,[Stampede 4X4 is built Traxxas Tough to withst...,,{'hi_res': ['https://m.media-amazon.com/images...,{'title': ['Traxxas Slash 2WD Short Course Rac...,Traxxas,"[Toys & Games, Remote & App Controlled Vehicle...","{""Product Dimensions"": ""15.63 x 13.39 x 8.94 i...",B019XEEX1A,,,
4,Toys & Games,Hot Wheels Monster Truck 1:24 Scale 2022 Bone ...,4.8,17699,[Designed in 1:24 scale with durable die-cast ...,[The Hot Wheels Monster Trucks 1:24 scale die-...,27.98,{'hi_res': ['https://m.media-amazon.com/images...,{'title': ['Hot Wheels 1:24 Scale Monster Truc...,Hot Wheels,"[Toys & Games, Preschool, Pre-Kindergarten Toys]","{""Product Dimensions"": ""5 x 6.27 x 5.5 inches""...",B09G7K3JWQ,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
890869,Toys & Games,Dollhouse Miniature 1:12 Scale Fire Place Acce...,4.6,2,[],[Unless stated otherwise this item is 1:12 sca...,16.09,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': [], 'url': [], 'user_id': []}",Melody Jane Dolls Houses,"[Toys & Games, Dolls & Accessories, Dollhouse ...","{""Product Dimensions"": ""2.99 x 2.52 x 0.08 inc...",B00BGO1PDU,,,
890870,Sports & Outdoors,Hacko Games Pride Deck Poker Cards,4.6,5,[Custom deck of playing cards],[Pride is a fantastically color card system. A...,,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': [], 'url': [], 'user_id': []}",Hacko Games,"[Toys & Games, Games & Accessories, Card Games...","{""Item Package Dimensions L x W x H"": ""3.54 x ...",B07T16B3W1,,,
890871,Toys & Games,Mini Squee-Z-Bubs & Bubbles (Sold Individually...,3.7,7,"[Toysmith 774546 Mini Squee-z Bubbles, Educati...",[Toysmith 774546 Mini Squee-z Bubbles. Toysmit...,,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': [], 'url': [], 'user_id': []}",Toysmith,"[Toys & Games, Sports & Outdoor Play, Bubbles,...","{""Product Dimensions"": ""2 x 4.2 x 1.1 inches"",...",B002IOZ92K,,,
890872,Toys & Games,Sentosphère Aquarellum Junior Butterflies & Fl...,4.6,141,"[Complete kit., Paint without going over the l...",[Fantastic. A few drops of paint and any child...,,{'hi_res': ['https://m.media-amazon.com/images...,"{'title': [], 'url': [], 'user_id': []}",Sentosphère,"[Toys & Games, Arts & Crafts, Craft Kits, Pain...","{""Product Dimensions"": ""10.43 x 7.68 x 1.18 in...",B06XJVLKDD,,,


In [5]:
store = FeatureStore(repo_path="../../feature_store", fs_yaml_file="feature_store.yaml")



In [6]:
# Trigger Spark Session Creation
dummy_entity_df = pd.DataFrame(
    {
        "user_id": ["dummy_item"],
        "parent_asin": ["dummy_item"],
        "event_timestamp": [datetime.now()],
    }
)
features = ["train_feature_view:rating"]
try:
    # This call will initialize the Spark session
    store.get_historical_features(
        entity_df=dummy_entity_df, features=features, full_feature_names=True
    ).to_df()
    print("Spark session initialized by Feast")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    raise

# Step 2: Get the Spark session
spark = SparkSession.getActiveSession()
if spark is None:
    raise RuntimeError("No active Spark session found after initialization.")

# Step 3: Read raw Parquet files with column pruning
columns_to_select = ["user_id", "parent_asin", "rating", "timestamp"]

train_path = store.get_feature_view("train_feature_view").source.path
val_path = store.get_feature_view("val_feature_view").source.path
print(f"Train feature view path: {train_path}")
print(f"Val feature view path: {val_path}")

train_df = spark.read.parquet(train_path).select(*columns_to_select).toPandas()
val_df = spark.read.parquet(val_path).select(*columns_to_select).toPandas()

25/06/28 17:16:02 WARN Utils: Your hostname, duong resolves to a loopback address: 127.0.1.1; using 192.168.1.103 instead (on interface enp3s0)
25/06/28 17:16:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/duong/Documents/datn1/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/duong/.ivy2/cache
The jars for the packages stored in: /home/duong/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-863c1b02-06ae-4632-b608-e694db29da33;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.hadoop#hadoop-common;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-protobuf_3_7;1.1.1 in central
	found org.apache.hadoop#hadoop-annotations;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found com.google.guava#guava;27.0-jre in central
	found com.google.guava#failureaccess;1.0 in central
	found com.google.guava#listenablefuture;9999.0-emp

Spark session initialized by Feast




Train feature view path: s3a://recsys-ops/feature-store/train/train.parquet/
Val feature view path: s3a://recsys-ops/feature-store/val/val.parquet/


  if not is_datetime64tz_dtype(pser.dtype):
  if not is_datetime64tz_dtype(pser.dtype):


In [7]:
# Kiểm tra kích thước của DataFrames
print("📊 Kích thước DataFrame:")
print(f"Train shape: {train_df.shape}")
print(f"Val shape: {val_df.shape}")

# Kiểm tra timestamp
print("\n⏰ Thông tin timestamp:")
print("Train timestamp range:")
print(f"- Min: {train_df['timestamp'].min()}")
print(f"- Max: {train_df['timestamp'].max()}")

print("\nVal timestamp range:")
print(f"- Min: {val_df['timestamp'].min()}")
print(f"- Max: {val_df['timestamp'].max()}")

📊 Kích thước DataFrame:
Train shape: (90700, 4)
Val shape: (861, 4)

⏰ Thông tin timestamp:
Train timestamp range:
- Min: 2002-11-04 08:54:18
- Max: 2021-08-11 00:12:16.369000

Val timestamp range:
- Min: 2021-08-11 02:21:01.740000
- Max: 2022-06-14 05:27:26.678000


📊 Kích thước DataFrame:
Train shape: (86979, 5)
Val shape: (4582, 5)

⏰ Thông tin timestamp:
Train timestamp range:
- Min: 2002-11-04 08:54:18
- Max: 2020-11-04 05:00:47.204000

Val timestamp range:
- Min: 2020-11-04 08:02:53.871000
- Max: 2022-06-14 05:27:26.678000

In [8]:
full_df = (
    pd.concat([train_df, val_df], axis=0)
    .pipe(handle_dtypes)
    .rename(columns={"timestamp": args.timestamp_col})  # đổi tên cột
    .assign(
        timestamp_unix=lambda df: pd.to_datetime(
            df[args.timestamp_col], utc=True
        ).apply(lambda x: int(x.timestamp()))
    )
)
full_df

Unnamed: 0,user_id,parent_asin,rating,event_timestamp,timestamp_unix
0,AF2UBRLFZTLECH44DEMVKDWS7Z5A,B00DQC2FPM,5.0,2015-01-02 17:54:49.000,1420221289
1,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B0BFGWHBB5,5.0,2015-06-14 19:21:09.000,1434309669
2,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B09NPJGN9N,5.0,2015-06-14 19:21:11.000,1434309671
3,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B08GGHG8S6,5.0,2015-06-14 19:21:18.000,1434309678
4,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B006EFMSSM,5.0,2015-06-14 19:21:22.000,1434309682
...,...,...,...,...,...
856,AFKQD5SINKETJBCBOTWOCWMSOA5Q,B0979WX1W4,5.0,2021-12-13 05:43:30.735,1639374210
857,AFJCPOPHR46UYP7S4YR4YXYMZHQA,B08XQMJCXL,5.0,2022-03-07 23:43:01.126,1646696581
858,AGNNDSTERX7WEWRHQGBVUW5EORUQ,B0BHT45FW9,5.0,2022-01-20 01:23:03.152,1642641783
859,AH2SXPFOHZKKPEPBTFL7K4ZLUVVQ,B078X1Q2HC,5.0,2021-10-05 13:01:25.980,1633438885


In [9]:
# Sorted to make sure that even rerun we get same idm mapping
unique_user_ids = sorted(train_df[args.user_col].unique())
unique_item_ids = sorted(train_df[args.item_col].unique())
idm = IDMapper()
idm.fit(unique_user_ids, unique_item_ids)
train_df = train_df.pipe(map_indice, idm, args.user_col, args.item_col)
val_df = val_df.pipe(map_indice, idm, args.user_col, args.item_col)

In [10]:
if os.path.exists(idm_persist_fp):
    os.remove(idm_persist_fp)
idm.save(idm_persist_fp)
idm = IDMapper().load(idm_persist_fp)

## Load feature from feature store

In [11]:
item_features = [
    "parent_asin_feature_view:parent_asin_rating_cnt_365d",
    "parent_asin_feature_view:parent_asin_rating_avg_prev_rating_365d",
    "parent_asin_feature_view:parent_asin_rating_cnt_90d",
    "parent_asin_feature_view:parent_asin_rating_avg_prev_rating_90d",
    "parent_asin_feature_view:parent_asin_rating_cnt_30d",
    "parent_asin_feature_view:parent_asin_rating_avg_prev_rating_30d",
    "parent_asin_feature_view:parent_asin_rating_cnt_7d",
    "parent_asin_feature_view:parent_asin_rating_avg_prev_rating_7d",
    "parent_asin_feature_view:main_category",
    "parent_asin_feature_view:categories",
    "parent_asin_feature_view:price",
]
features_df = store.get_historical_features(
    full_df[[args.item_col, args.timestamp_col]].drop_duplicates(), item_features
).to_df()

# Check for duplicates
check_columns = [
    args.item_col,
    args.timestamp_col,
    "parent_asin_rating_cnt_365d",
    "parent_asin_rating_avg_prev_rating_365d",
    "parent_asin_rating_cnt_90d",
    "parent_asin_rating_avg_prev_rating_90d",
    "parent_asin_rating_cnt_30d",
    "parent_asin_rating_avg_prev_rating_30d",
    "parent_asin_rating_cnt_7d",
    "parent_asin_rating_avg_prev_rating_7d",
    "main_category",
    "price",
]
assert features_df.duplicated(subset=check_columns).sum() == 0, features_df[
    features_df.duplicated(subset=check_columns)
][check_columns].to_string()

  if is_categorical_dtype(series.dtype):
  elif is_datetime64tz_dtype(s.dtype):
                                                                                

In [12]:
full_features_df = pd.merge(
    full_df, features_df, on=[args.item_col, args.timestamp_col], how="left"
).pipe(map_indice, idm, args.user_col, args.item_col)
full_features_df

Unnamed: 0,user_id,parent_asin,rating,event_timestamp,timestamp_unix,parent_asin_rating_cnt_365d,parent_asin_rating_avg_prev_rating_365d,parent_asin_rating_cnt_90d,parent_asin_rating_avg_prev_rating_90d,parent_asin_rating_cnt_30d,parent_asin_rating_avg_prev_rating_30d,parent_asin_rating_cnt_7d,parent_asin_rating_avg_prev_rating_7d,main_category,categories,price,user_indice,item_indice
0,AF2UBRLFZTLECH44DEMVKDWS7Z5A,B00DQC2FPM,5.0,2015-01-02 17:54:49.000,1420221289,10,4.700000,4,5.000000,2,5.000000,1,5.0,Toys & Games,"[Toys & Games, Building Toys, Building Sets]",187.5,3195,1261
1,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B0BFGWHBB5,5.0,2015-06-14 19:21:09.000,1434309669,14,4.785714,5,4.600000,1,5.000000,1,5.0,Toys & Games,"[Toys & Games, Preschool, Pre-Kindergarten Toys]",83.27,2759,3767
2,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B09NPJGN9N,5.0,2015-06-14 19:21:11.000,1434309671,13,5.000000,7,5.000000,1,5.000000,0,,Toys & Games,"[Toys & Games, Preschool, Pre-Kindergarten Toy...",9.69,2759,3535
3,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B08GGHG8S6,5.0,2015-06-14 19:21:18.000,1434309678,30,4.666667,7,4.714286,3,4.666667,0,,Toys & Games,"[Toys & Games, Sports & Outdoor Play, Toy Spor...",,2759,3198
4,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B006EFMSSM,5.0,2015-06-14 19:21:22.000,1434309682,24,4.833333,4,5.000000,2,5.000000,0,,Toys & Games,"[Toys & Games, Toy Figures & Playsets, Play Fi...",,2759,862
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
91556,AFKQD5SINKETJBCBOTWOCWMSOA5Q,B0979WX1W4,5.0,2021-12-13 05:43:30.735,1639374210,0,,0,,0,,0,,Toys & Games,"[Toys & Games, Preschool, Pre-Kindergarten Toys]",,4754,3432
91557,AFJCPOPHR46UYP7S4YR4YXYMZHQA,B08XQMJCXL,5.0,2022-03-07 23:43:01.126,1646696581,2,5.000000,0,,0,,0,,Toys & Games,"[Toys & Games, Santa App]",39.95,4612,3334
91558,AGNNDSTERX7WEWRHQGBVUW5EORUQ,B0BHT45FW9,5.0,2022-01-20 01:23:03.152,1642641783,6,4.166667,1,5.000000,1,5.000000,0,,Toys & Games,"[Toys & Games, Preschool, Pre-Kindergarten Toy...",17.98,8030,3790
91559,AH2SXPFOHZKKPEPBTFL7K4ZLUVVQ,B078X1Q2HC,5.0,2021-10-05 13:01:25.980,1633438885,3,5.000000,0,,0,,0,,Toys & Games,"[Toys & Games, Sports & Outdoor Play, Play Set...",49.36,9325,2499


In [13]:
user_features = [
    "user_feature_view:user_rating_cnt_90d",
    "user_feature_view:user_rating_avg_prev_rating_90d",
    "user_feature_view:user_rating_list_10_recent_asin",
    "user_feature_view:user_rating_list_10_recent_asin_timestamp",
    "user_feature_view:item_sequence_ts",
    "user_feature_view:item_sequence_ts_bucket",
]

features_df = store.get_historical_features(
    full_df[[args.user_col, args.timestamp_col]].drop_duplicates(), user_features
).to_df()

check_columns = [
    args.user_col,
    args.timestamp_col,
    "user_rating_cnt_90d",
    "user_rating_avg_prev_rating_90d",
    "user_rating_list_10_recent_asin",
    "user_rating_list_10_recent_asin_timestamp",
]

assert features_df.duplicated(subset=check_columns).sum() == 0, features_df[
    features_df.duplicated(subset=check_columns)
][check_columns].to_string()

  if is_categorical_dtype(series.dtype):
  elif is_datetime64tz_dtype(s.dtype):
                                                                                

In [14]:
features_df

Unnamed: 0,user_id,event_timestamp,user_rating_cnt_90d,user_rating_avg_prev_rating_90d,user_rating_list_10_recent_asin,user_rating_list_10_recent_asin_timestamp,item_sequence_ts,item_sequence_ts_bucket
0,AE25OGR4Y5KVKPI24XYNEUZXHP2A,2019-03-30 12:47:10.261,4,4.250000,"B00X0XGSVY,B08K9TMZSV,B0C82XC86B,B003VYAJMG","2019-01-18T00:20:37.081Z,2019-01-18T00:26:15.8...","[-1, -1, -1, -1, -1, -1, 1547745637, 154774597...","[-1, -1, -1, -1, -1, -1, 5, 5, 5, 5]"
1,AE25WA7EBPSL75RANIXDVUHTOUHA,2016-05-06 20:32:31.000,2,4.500000,"B0064CTIC4,B00ILA29PU,B00NHQI2MC","2015-04-20T23:39:45.000Z,2016-02-26T00:05:36.0...","[-1, -1, -1, -1, -1, -1, -1, 1429547985, 14564...","[-1, -1, -1, -1, -1, -1, -1, 6, 5, 0]"
2,AE26XR22W3UOWZAJC7WTHKVHF57A,2016-12-26 15:57:10.000,1,4.000000,B00NW2Q6ZG,2016-12-26T22:41:56.000Z,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, 1482766916]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, 1]"
3,AE27ZXMWTIFQOIEFCURLQWGVOYVQ,2017-09-18 01:07:52.799,0,,"B09XN13ZTY,B000LCD2GQ,B000ARW60U,B001PNG8SY","2012-02-15T00:46:19.000Z,2013-07-25T11:33:50.0...","[-1, -1, -1, -1, -1, -1, 1329241579, 137472683...","[-1, -1, -1, -1, -1, -1, 8, 7, 7, 7]"
4,AE2AEUQYYQIOKVMVMUOX3B5TVHYQ,2021-01-11 05:43:46.795,2,5.000000,"B00SUED2J4,B08K13QG5M,B07N29HQMN,B0BHTGCF5W","2019-04-15T01:13:32.493Z,2019-04-15T01:15:59.3...","[-1, -1, -1, -1, -1, -1, 1555265612, 155526575...","[-1, -1, -1, -1, -1, -1, 6, 6, 5, 5]"
...,...,...,...,...,...,...,...,...
91552,AHZUGNMXZS6QEGBMVOIX6HDAPGHA,2018-10-11 12:14:24.913,2,4.500000,"B095392GMZ,1223080412,B001W2WKS0,B00SI63N48,B0...","2018-04-13T19:29:01.477Z,2018-04-13T19:41:11.4...","[1523622541, 1523623271, 1523626301, 152569784...","[5, 5, 5, 5, 5, 5, 5, 5, 5, 5]"
91553,AHZXVZECCDWVAHMZO5ORUMYQ4Y6Q,2013-01-14 20:10:49.000,3,3.666667,"B004H1V5S4,B0BZHVZQ3J,B002NPBT50","2012-12-28T08:46:27.000Z,2012-12-28T08:48:03.0...","[-1, -1, -1, -1, -1, -1, -1, 1356659187, 13566...","[-1, -1, -1, -1, -1, -1, -1, 4, 4, 4]"
91554,AHZXVZECCDWVAHMZO5ORUMYQ4Y6Q,2016-01-02 04:39:41.000,6,5.000000,"B0BZHVZQ3J,B002NPBT50,B0016LKFXE,B000UEL172,B0...","2012-12-28T08:48:03.000Z,2012-12-28T08:50:45.0...","[1356659283, 1356659445, 1358194249, 136591676...","[7, 7, 6, 6, 4, 4, 4, 0, 0, 0]"
91555,AHZXVZECCDWVAHMZO5ORUMYQ4Y6Q,2016-12-30 13:23:34.000,1,4.000000,"B0016LKFXE,B000UEL172,B00U2UO1LM,B00D8STBHY,B0...","2013-01-15T03:10:49.000Z,2013-04-14T12:19:26.0...","[1358194249, 1365916766, 1449276314, 144927633...","[7, 7, 6, 6, 6, 5, 5, 5, 5, 0]"


In [15]:
full_features_df = pd.merge(
    full_features_df, features_df, on=[args.user_col, args.timestamp_col], how="left"
)
full_features_df

Unnamed: 0,user_id,parent_asin,rating,event_timestamp,timestamp_unix,parent_asin_rating_cnt_365d,parent_asin_rating_avg_prev_rating_365d,parent_asin_rating_cnt_90d,parent_asin_rating_avg_prev_rating_90d,parent_asin_rating_cnt_30d,...,categories,price,user_indice,item_indice,user_rating_cnt_90d,user_rating_avg_prev_rating_90d,user_rating_list_10_recent_asin,user_rating_list_10_recent_asin_timestamp,item_sequence_ts,item_sequence_ts_bucket
0,AF2UBRLFZTLECH44DEMVKDWS7Z5A,B00DQC2FPM,5.0,2015-01-02 17:54:49.000,1420221289,10,4.700000,4,5.000000,2,...,"[Toys & Games, Building Toys, Building Sets]",187.5,3195,1261,3,5.0,"B09QM5JMCD,B09PH8Z5R8,B000084JMC,B09NPJGN9N","2014-01-04T01:00:02.000Z,2015-01-03T00:39:56.0...","[-1, -1, -1, -1, -1, -1, 1388772002, 142022039...","[-1, -1, -1, -1, -1, -1, 5, 1, 1, 1]"
1,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B0BFGWHBB5,5.0,2015-06-14 19:21:09.000,1434309669,14,4.785714,5,4.600000,1,...,"[Toys & Games, Preschool, Pre-Kindergarten Toys]",83.27,2759,3767,0,,,,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]"
2,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B09NPJGN9N,5.0,2015-06-14 19:21:11.000,1434309671,13,5.000000,7,5.000000,1,...,"[Toys & Games, Preschool, Pre-Kindergarten Toy...",9.69,2759,3535,1,5.0,B0BFGWHBB5,2015-06-15T02:21:09.000Z,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, 1434309669]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, 0]"
3,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B08GGHG8S6,5.0,2015-06-14 19:21:18.000,1434309678,30,4.666667,7,4.714286,3,...,"[Toys & Games, Sports & Outdoor Play, Toy Spor...",,2759,3198,2,5.0,"B0BFGWHBB5,B09NPJGN9N","2015-06-15T02:21:09.000Z,2015-06-15T02:21:11.000Z","[-1, -1, -1, -1, -1, -1, -1, -1, 1434309669, 1...","[-1, -1, -1, -1, -1, -1, -1, -1, 0, 0]"
4,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B006EFMSSM,5.0,2015-06-14 19:21:22.000,1434309682,24,4.833333,4,5.000000,2,...,"[Toys & Games, Toy Figures & Playsets, Play Fi...",,2759,862,3,5.0,"B0BFGWHBB5,B09NPJGN9N,B08GGHG8S6","2015-06-15T02:21:09.000Z,2015-06-15T02:21:11.0...","[-1, -1, -1, -1, -1, -1, -1, 1434309669, 14343...","[-1, -1, -1, -1, -1, -1, -1, 0, 0, 0]"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
91556,AFKQD5SINKETJBCBOTWOCWMSOA5Q,B0979WX1W4,5.0,2021-12-13 05:43:30.735,1639374210,0,,0,,0,...,"[Toys & Games, Preschool, Pre-Kindergarten Toys]",,4754,3432,0,,"B0033BM3K8,B0BT9H7VF3,B01ASVCZ44,B01N9KR2SZ,B0...","2015-05-28T23:08:50.000Z,2016-12-11T20:26:20.0...","[1432829330, 1481462780, 1494592575, 151508893...","[8, 8, 7, 7, 7, 7, 6, 6, 6, 5]"
91557,AFJCPOPHR46UYP7S4YR4YXYMZHQA,B08XQMJCXL,5.0,2022-03-07 23:43:01.126,1646696581,2,5.000000,0,,0,...,"[Toys & Games, Santa App]",39.95,4612,3334,0,,"B000W3TD4Y,B09M7XZ33P,B084K4J39K,B01N64HQ1X,B0...","2010-12-26T21:58:56.000Z,2011-12-13T07:56:13.0...","[-1, -1, -1, -1, -1, 1293375536, 1323737773, 1...","[-1, -1, -1, -1, -1, 9, 9, 8, 7, 6]"
91558,AGNNDSTERX7WEWRHQGBVUW5EORUQ,B0BHT45FW9,5.0,2022-01-20 01:23:03.152,1642641783,6,4.166667,1,5.000000,1,...,"[Toys & Games, Preschool, Pre-Kindergarten Toy...",17.98,8030,3790,0,,"B09NXVL2P2,B06XRGBBXP,B00IL7IFOM,B004K6KM8K,B0...","2018-06-07T09:41:33.364Z,2018-11-16T02:25:15.8...","[-1, -1, 1528339293, 1542309915, 1562260149, 1...","[-1, -1, 7, 7, 6, 6, 6, 6, 6, 6]"
91559,AH2SXPFOHZKKPEPBTFL7K4ZLUVVQ,B078X1Q2HC,5.0,2021-10-05 13:01:25.980,1633438885,3,5.000000,0,,0,...,"[Toys & Games, Sports & Outdoor Play, Play Set...",49.36,9325,2499,0,,"B00B2B051A,B01L8JF64G,B0BFXK2HJW,B012CRQ7S2,B0...","2015-10-27T21:10:20.000Z,2015-10-27T21:10:58.0...","[-1, -1, -1, 1445955020, 1445955058, 148536225...","[-1, -1, -1, 8, 8, 7, 7, 7, 7, 7]"


In [16]:
def convert_asin_to_idx(inp: str, sequence_length=10, padding_value=-1):
    if inp is None:
        return [padding_value] * sequence_length
    asins = inp.split(",")
    indices = [idm.get_item_index(item_id) for item_id in asins]
    padding_needed = sequence_length - len(indices)
    output = np.pad(
        indices,
        (padding_needed, 0),  # Add padding at the beginning
        "constant",
        constant_values=padding_value,
    )
    return output


full_features_df = full_features_df.assign(
    item_sequence=lambda df: df["user_rating_list_10_recent_asin"].apply(
        convert_asin_to_idx
    )
)
full_features_df

Unnamed: 0,user_id,parent_asin,rating,event_timestamp,timestamp_unix,parent_asin_rating_cnt_365d,parent_asin_rating_avg_prev_rating_365d,parent_asin_rating_cnt_90d,parent_asin_rating_avg_prev_rating_90d,parent_asin_rating_cnt_30d,...,price,user_indice,item_indice,user_rating_cnt_90d,user_rating_avg_prev_rating_90d,user_rating_list_10_recent_asin,user_rating_list_10_recent_asin_timestamp,item_sequence_ts,item_sequence_ts_bucket,item_sequence
0,AF2UBRLFZTLECH44DEMVKDWS7Z5A,B00DQC2FPM,5.0,2015-01-02 17:54:49.000,1420221289,10,4.700000,4,5.000000,2,...,187.5,3195,1261,3,5.0,"B09QM5JMCD,B09PH8Z5R8,B000084JMC,B09NPJGN9N","2014-01-04T01:00:02.000Z,2015-01-03T00:39:56.0...","[-1, -1, -1, -1, -1, -1, 1388772002, 142022039...","[-1, -1, -1, -1, -1, -1, 5, 1, 1, 1]","[-1, -1, -1, -1, -1, -1, 3587, 3558, 102, 3535]"
1,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B0BFGWHBB5,5.0,2015-06-14 19:21:09.000,1434309669,14,4.785714,5,4.600000,1,...,83.27,2759,3767,0,,,,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, 4143]"
2,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B09NPJGN9N,5.0,2015-06-14 19:21:11.000,1434309671,13,5.000000,7,5.000000,1,...,9.69,2759,3535,1,5.0,B0BFGWHBB5,2015-06-15T02:21:09.000Z,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, 1434309669]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, 0]","[-1, -1, -1, -1, -1, -1, -1, -1, -1, 3767]"
3,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B08GGHG8S6,5.0,2015-06-14 19:21:18.000,1434309678,30,4.666667,7,4.714286,3,...,,2759,3198,2,5.0,"B0BFGWHBB5,B09NPJGN9N","2015-06-15T02:21:09.000Z,2015-06-15T02:21:11.000Z","[-1, -1, -1, -1, -1, -1, -1, -1, 1434309669, 1...","[-1, -1, -1, -1, -1, -1, -1, -1, 0, 0]","[-1, -1, -1, -1, -1, -1, -1, -1, 3767, 3535]"
4,AEW3V4ZWDGC6G33C6I5YSTNVJXSQ,B006EFMSSM,5.0,2015-06-14 19:21:22.000,1434309682,24,4.833333,4,5.000000,2,...,,2759,862,3,5.0,"B0BFGWHBB5,B09NPJGN9N,B08GGHG8S6","2015-06-15T02:21:09.000Z,2015-06-15T02:21:11.0...","[-1, -1, -1, -1, -1, -1, -1, 1434309669, 14343...","[-1, -1, -1, -1, -1, -1, -1, 0, 0, 0]","[-1, -1, -1, -1, -1, -1, -1, 3767, 3535, 3198]"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
91556,AFKQD5SINKETJBCBOTWOCWMSOA5Q,B0979WX1W4,5.0,2021-12-13 05:43:30.735,1639374210,0,,0,,0,...,,4754,3432,0,,"B0033BM3K8,B0BT9H7VF3,B01ASVCZ44,B01N9KR2SZ,B0...","2015-05-28T23:08:50.000Z,2016-12-11T20:26:20.0...","[1432829330, 1481462780, 1494592575, 151508893...","[8, 8, 7, 7, 7, 7, 6, 6, 6, 5]","[569, 3911, 2098, 2342, 1859, 1340, 1305, 4073..."
91557,AFJCPOPHR46UYP7S4YR4YXYMZHQA,B08XQMJCXL,5.0,2022-03-07 23:43:01.126,1646696581,2,5.000000,0,,0,...,39.95,4612,3334,0,,"B000W3TD4Y,B09M7XZ33P,B084K4J39K,B01N64HQ1X,B0...","2010-12-26T21:58:56.000Z,2011-12-13T07:56:13.0...","[-1, -1, -1, -1, -1, 1293375536, 1323737773, 1...","[-1, -1, -1, -1, -1, 9, 9, 8, 7, 6]","[-1, -1, -1, -1, -1, 363, 3508, 3004, 2336, 1562]"
91558,AGNNDSTERX7WEWRHQGBVUW5EORUQ,B0BHT45FW9,5.0,2022-01-20 01:23:03.152,1642641783,6,4.166667,1,5.000000,1,...,17.98,8030,3790,0,,"B09NXVL2P2,B06XRGBBXP,B00IL7IFOM,B004K6KM8K,B0...","2018-06-07T09:41:33.364Z,2018-11-16T02:25:15.8...","[-1, -1, 1528339293, 1542309915, 1562260149, 1...","[-1, -1, 7, 7, 6, 6, 6, 6, 6, 6]","[-1, -1, 3542, 2367, 1450, 734, 827, 586, 1008..."
91559,AH2SXPFOHZKKPEPBTFL7K4ZLUVVQ,B078X1Q2HC,5.0,2021-10-05 13:01:25.980,1633438885,3,5.000000,0,,0,...,49.36,9325,2499,0,,"B00B2B051A,B01L8JF64G,B0BFXK2HJW,B012CRQ7S2,B0...","2015-10-27T21:10:20.000Z,2015-10-27T21:10:58.0...","[-1, -1, -1, 1445955020, 1445955058, 148536225...","[-1, -1, -1, 8, 8, 7, 7, 7, 7, 7]","[-1, -1, -1, 1096, 2261, 3776, 1972, 2741, 193..."


In [17]:
val_timestamp = val_df["timestamp"].min()
train_df_length = train_df.shape[0]
train_df = full_features_df.loc[lambda df: df[args.timestamp_col].lt(val_timestamp)]
assert train_df.shape[0] == train_df_length

val_df_length = val_df.shape[0]
val_df = full_features_df.loc[lambda df: df[args.timestamp_col].ge(val_timestamp)]
assert val_df.shape[0] == val_df_length

## Transform Pipeline

In [18]:
# Define the transformations for the columns
rating_agg_cols = [
    "parent_asin_rating_cnt_365d",
    "parent_asin_rating_avg_prev_rating_365d",
    "parent_asin_rating_cnt_90d",
    "parent_asin_rating_avg_prev_rating_90d",
    "parent_asin_rating_cnt_30d",
    "parent_asin_rating_avg_prev_rating_30d",
    "parent_asin_rating_cnt_7d",
    "parent_asin_rating_avg_prev_rating_7d",
]

tfm = [
    ("main_category", OneHotEncoder(handle_unknown="ignore"), ["main_category"]),
    # ("title", Pipeline(title_pipeline_steps()), ["title"]),
    # ("description", Pipeline(description_pipeline_steps()), "description"),
    (
        "categories",
        Pipeline(categories_pipeline_steps()),
        "categories",
    ),  # Count Vectorizer for multi-label categorical
    (
        "price",
        Pipeline(price_pipeline_steps()),
        "price",
    ),  # Normalizing price
    (
        "rating_agg",
        Pipeline(rating_agg_pipeline_steps()),
        rating_agg_cols,
    ),
]
meta_cols = ["main_category", "title", "description", "categories", "price"]
cols = meta_cols + rating_agg_cols
cols

['main_category',
 'title',
 'description',
 'categories',
 'price',
 'parent_asin_rating_cnt_365d',
 'parent_asin_rating_avg_prev_rating_365d',
 'parent_asin_rating_cnt_90d',
 'parent_asin_rating_avg_prev_rating_90d',
 'parent_asin_rating_cnt_30d',
 'parent_asin_rating_avg_prev_rating_30d',
 'parent_asin_rating_cnt_7d',
 'parent_asin_rating_avg_prev_rating_7d']

In [19]:
def check_dup(df):
    assert (
        df[[args.user_col, args.item_col, args.timestamp_col]].duplicated().sum() == 0
    )


check_dup(train_df)
check_dup(val_df)

In [20]:
train_df.to_parquet(train_persist_fp, index=False)
val_df.to_parquet(val_persist_fp, index=False)

In [21]:
# papermill_description=fit-tfm-pipeline
preprocessing_pipeline = ColumnTransformer(
    transformers=tfm, remainder="drop"  # Drop any columns not specified in transformers
)

# Create a pipeline object
item_metadata_pipeline = Pipeline(
    steps=[
        ("preprocessing", preprocessing_pipeline),
        (
            "normalizer",
            StandardScaler(),
        ),  # Normalize the numerical outputs since it's an important preconditions for any Deep Learning models
    ]
)

# Fit the pipeline
# Drop duplicated item so that the Pipeline only fit the unique item features
fit_df = train_df.drop_duplicates(subset=[args.item_col])
item_metadata_pipeline.fit(fit_df)

0,1,2
,steps,"[('preprocessing', ...), ('normalizer', ...)]"
,transform_input,
,memory,
,verbose,False

0,1,2
,transformers,"[('main_category', ...), ('categories', ...), ...]"
,remainder,'drop'
,sparse_threshold,0.3
,n_jobs,
,transformer_weights,
,verbose,False
,verbose_feature_names_out,True
,force_int_remainder_cols,'deprecated'

0,1,2
,categories,'auto'
,drop,
,sparse_output,True
,dtype,<class 'numpy.float64'>
,handle_unknown,'ignore'
,min_frequency,
,max_categories,
,feature_name_combiner,'concat'

0,1,2
,func,<function fla...x7efedc7125c0>
,inverse_func,
,validate,False
,accept_sparse,False
,check_inverse,True
,feature_names_out,
,kw_args,
,inv_kw_args,

0,1,2
,input,'content'
,encoding,'utf-8'
,decode_error,'strict'
,strip_accents,
,lowercase,True
,preprocessor,
,tokenizer,<function tok...x7efedc45da80>
,stop_words,
,token_pattern,
,ngram_range,"(1, ...)"

0,1,2
,func,<function tod...x7efedc75bd80>
,inverse_func,
,validate,False
,accept_sparse,False
,check_inverse,True
,feature_names_out,
,kw_args,
,inv_kw_args,

0,1,2
,func,<function pri...x7efedc45dbc0>
,inverse_func,
,validate,False
,accept_sparse,False
,check_inverse,True
,feature_names_out,
,kw_args,{'pattern': '\\b((?:\\d+\\.\\d*)|(?:\\d+))\\b'}
,inv_kw_args,

0,1,2
,missing_values,
,strategy,'constant'
,fill_value,0
,copy,True
,add_indicator,False
,keep_empty_features,False

0,1,2
,feature_range,"(0, ...)"
,copy,True
,clip,False

0,1,2
,missing_values,
,strategy,'constant'
,fill_value,0
,copy,True
,add_indicator,False
,keep_empty_features,False

0,1,2
,copy,True
,with_mean,True
,with_std,True

0,1,2
,copy,True
,with_mean,True
,with_std,True


In [22]:
print(rating_agg_cols)
print(fit_df[rating_agg_cols].dtypes)
print(fit_df[rating_agg_cols].head(3))

['parent_asin_rating_cnt_365d', 'parent_asin_rating_avg_prev_rating_365d', 'parent_asin_rating_cnt_90d', 'parent_asin_rating_avg_prev_rating_90d', 'parent_asin_rating_cnt_30d', 'parent_asin_rating_avg_prev_rating_30d', 'parent_asin_rating_cnt_7d', 'parent_asin_rating_avg_prev_rating_7d']
parent_asin_rating_cnt_365d                  int64
parent_asin_rating_avg_prev_rating_365d    float64
parent_asin_rating_cnt_90d                   int64
parent_asin_rating_avg_prev_rating_90d     float64
parent_asin_rating_cnt_30d                   int64
parent_asin_rating_avg_prev_rating_30d     float64
parent_asin_rating_cnt_7d                    int64
parent_asin_rating_avg_prev_rating_7d      float64
dtype: object
   parent_asin_rating_cnt_365d  parent_asin_rating_avg_prev_rating_365d  \
0                           10                                 4.700000   
1                           14                                 4.785714   
2                           13                                 5

In [23]:
# Kiểm tra số lượng unique items trước khi mapping
print("Số lượng unique items trong train_df:", len(train_df[args.item_col].unique()))

# Kiểm tra mapping trong IDMapper
print("\nSố lượng items trong IDMapper:", len(idm.item_to_index))

# Kiểm tra xem có item nào không có trong training set không
all_items = set(train_df[args.item_col].unique())
test_items = set(val_df[args.item_col].unique())
missing_items = test_items - all_items
print("\nSố lượng items trong validation không có trong training:", len(missing_items))
if len(missing_items) > 0:
    print("Ví dụ một số items bị thiếu:", list(missing_items)[:5])

Số lượng unique items trong train_df: 4143

Số lượng items trong IDMapper: 4143

Số lượng items trong validation không có trong training: 0


In [24]:
with open(f"{pvc_path}/item_metadata_pipeline.dill", "wb") as f:
    dill.dump(item_metadata_pipeline, f)