<a href="https://colab.research.google.com/github/martin-fabbri/recsys-pipeline/blob/main/tiktok-pipeline/1_feature_backfill.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## <span style='color:#ff5f27'> 📝 Imports

In [1]:
!rm *.py
!wget https://raw.githubusercontent.com/martin-fabbri/recsys-pipeline/refs/heads/main/tiktok-pipeline/features.py

!pip install -U hopsworks[python] --quiet
!pip install -U mimesis==15.1.0 --quiet
!pip install great-expectations==0.18.21 --quiet

rm: cannot remove '*.py': No such file or directory
--2024-12-08 20:57:10--  https://raw.githubusercontent.com/martin-fabbri/recsys-pipeline/refs/heads/main/tiktok-pipeline/features.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5649 (5.5K) [text/plain]
Saving to: ‘features.py’


2024-12-08 20:57:11 (54.9 MB/s) - ‘features.py’ saved [5649/5649]

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.2

# 🚨🚨🚨 ⚠️ Restart the Colab Session before continuing ⚠️ 🚨🚨🚨

In [1]:
!pip install -U confluent-kafka==2.3.0 --quiet

In [2]:
import pandas as pd
import numpy as np
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

from features import generate_users
from features import generate_video_content
from features import generate_interactions

## <span style='color:#ff5f27'> ⚙️ Data Generation</span>


### <span style='color:#ff5f27'> 👥 Users Data Generation</span>

In [3]:
user_data = generate_users(25_000)

data_users_df = pd.DataFrame(user_data)
data_users_df.head()

Unnamed: 0,user_id,gender,age,country
0,TV993Z,Male,34,Slovenia
1,NZ829N,Other,51,Réunion
2,OH399H,Male,87,North Korea
3,ZA924B,Other,75,Ukraine
4,BB098H,Female,37,Slovenia


### <span style='color:#ff5f27'> 🎥 Content Data Generation</span>


In [4]:
# Generate data for 25_000 videos
video_data = generate_video_content(25_000, historical=True)

data_video_df = pd.DataFrame(video_data)
data_video_df.head()

Unnamed: 0,video_id,category,views,likes,video_length,upload_date
0,0UB28M,Cooking,4585,3705,10,2022-12-16
1,3QP51I,News,275058,183395,14,2024-03-20
2,4AO64Y,Music,16291,10751,143,2023-01-09
3,6IC08K,Lifestyle,76423,48329,179,2023-08-23
4,6GB88V,Cooking,65921,52733,155,2023-06-05


### <span style='color:#ff5f27'> 🔗 Interactions Generation</span>


In [5]:
num_interactions = 1_000_000

# Generate interactions
interactions = generate_interactions(num_interactions, user_data, video_data)

data_interactions_df = pd.DataFrame(interactions)
data_interactions_df.head()

Unnamed: 0,interaction_id,user_id,video_id,interaction_type,watch_time
0,4469-63-8719,SH230G,6OZ03M,skip,144
1,9820-96-3187,GI189N,3TQ70H,skip,42
2,7590-89-3136,PP972F,3DP31B,skip,179
3,6686-12-5885,JB793K,4WO16T,like,229
4,9444-05-6953,BG606F,0BN95U,skip,110


## <span style="color:#ff5f27">👮🏻‍♂️ Great Expectations </span>

In [6]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_users_df = ge.from_pandas(data_users_df)

# Initialize the expectation suite
expectation_suite_users = ge_users_df.get_expectation_suite()
expectation_suite_users.expectation_suite_name = "user_data_suite"

# Expectation: Age should be between 12 and 120
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "age", "min_value": 12, "max_value": 120}
    )
)

# Expectations: Columns should not have null values
for column in ge_users_df.columns:
    expectation_suite_users.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Gender should only contain specific values
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={"column": "gender", "value_set": ["Male", "Female", "Other"]}
    )
)

{"expectation_type": "expect_column_distinct_values_to_be_in_set", "kwargs": {"column": "gender", "value_set": ["Male", "Female", "Other"]}, "meta": {}}

In [7]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_video_df = ge.from_pandas(data_video_df)

# Initialize the expectation suite
expectation_suite_videos = ge_video_df.get_expectation_suite()
expectation_suite_videos.expectation_suite_name = "video_data_suite"

# Expectation: Views, Likes, and Video Length should be non-negative
for column in ["views", "likes", "video_length"]:
    expectation_suite_videos.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={"column": column, "min_value": 0, "max_value": None}
        )
    )

# Expectation: Valid date format for upload_date
expectation_suite_videos.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_dateutil_parseable",
        kwargs={"column": "upload_date"}
    )
)

{"expectation_type": "expect_column_values_to_be_dateutil_parseable", "kwargs": {"column": "upload_date"}, "meta": {}}

In [8]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_interactions_df = ge.from_pandas(data_interactions_df)

# Initialize the expectation suite
expectation_suite_interactions = ge_interactions_df.get_expectation_suite()
expectation_suite_interactions.expectation_suite_name = "interactions_data_suite"

# Expectations: Non-null values in all columns
for column in ge_interactions_df.columns:
    expectation_suite_interactions.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Validate interaction types
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={
            "column": "interaction_type",
            "value_set": ['like', 'dislike', 'view', 'comment', 'share', 'skip']
        }
    )
)

# Expectation: Positive watch time
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "watch_time", "min_value": 0}
    )
)

{"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "watch_time", "min_value": 0}, "meta": {}}

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [9]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated

Paste it here: ··········

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1196184


## <span style="color:#ff5f27">🪄 Feature Group Creation </span>


In [10]:
users_fg = fs.get_or_create_feature_group(
    name="users",
    description="Users data.",
    version=1,
    primary_key=["user_id"],
    online_enabled=True,
    expectation_suite=expectation_suite_users,
)

users_fg.insert(data_users_df)
print('Done ✅')

Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1196184/fs/1183821/fg/1380739


Uploading Dataframe: 100.00% |██████████| Rows 25000/25000 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: users_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1196184/jobs/named/users_1_offline_fg_materialization/executions
Done ✅


In [11]:
videos_fg = fs.get_or_create_feature_group(
    name="videos",
    description="Videos data.",
    version=1,
    primary_key=["video_id"],
    online_enabled=True,
    expectation_suite=expectation_suite_videos,
)

videos_fg.insert(data_video_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1196184/fs/1183821/fg/1380741
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1196184/fs/1183821/fg/1380741


Uploading Dataframe: 100.00% |██████████| Rows 25000/25000 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: videos_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1196184/jobs/named/videos_1_offline_fg_materialization/executions
Done ✅


In [12]:
interactions_fg = fs.get_or_create_feature_group(
    name="interactions",
    description="Interactions data.",
    version=1,
    primary_key=["interaction_id", "user_id", "video_id"],
    online_enabled=True,
    expectation_suite=expectation_suite_interactions,
)

interactions_fg.insert(data_interactions_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1196184/fs/1183821/fg/1381734
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1196184/fs/1183821/fg/1381734


Uploading Dataframe: 100.00% |██████████| Rows 1000000/1000000 | Elapsed Time: 00:41 | Remaining Time: 00:00


Launching job: interactions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1196184/jobs/named/interactions_1_offline_fg_materialization/executions
Done ✅


## <span style="color:#ff5f27">🪄 Ranking Feature Group </span>


In [None]:
video_interactions_df = pd.merge(
    data_interactions_df,
    data_video_df,
    on='video_id',
    how='inner',
)

ranking_df = pd.merge(
    video_interactions_df,
    data_users_df,
    on='user_id',
    how='inner',
)

ranking_df['label'] = np.where(
    ranking_df.interaction_type.isin(['view', 'like', 'share', 'comment']),
    1,
    0,
)

ranking_df.drop(
    ['interaction_id', 'interaction_type', 'watch_time'],
    axis=1,
    inplace=True,
)

ranking_df.head()

In [None]:
ranking_fg = fs.get_or_create_feature_group(
    name="ranking",
    description="Ranking Data.",
    version=1,
    primary_key=["user_id", "video_id"],
    online_enabled=True,
)

ranking_fg.insert(ranking_df)
print('Done ✅')

---