## <span style='color:#ff5f27'> 📝 Imports

In [1]:
# !pip install -r requirements.txt --quiet

In [2]:
import pandas as pd
import numpy as np
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

from features.users import generate_users
from features.videos import generate_video_content
from features.interactions import generate_interactions, generate_user_interactions_window_agg, generate_video_interactions_window_agg
from streaming import config

## <span style='color:#ff5f27'> ⚙️ Data Generation</span>


### <span style='color:#ff5f27'> 👥 Users Data Generation</span>

In [3]:
user_data = generate_users(
    config.USERS_AMOUNT_HISTORICAL,
    historical=True,
)

data_users_df = pd.DataFrame(user_data)
data_users_df.registration_date = data_users_df.registration_date.apply(pd.to_datetime)
data_users_df.head()

Unnamed: 0,user_id,gender,age,country,registration_date,registration_month
0,GS445X,Other,40,Belarus,2022-05-20 20:54:52,2022-05
1,YO732J,Female,61,Ghana,2022-07-02 20:54:52,2022-07
2,HS787T,Male,70,Northern Mariana Islands,2023-05-20 20:54:52,2023-05
3,IP580S,Other,83,Turks & Caicos Islands,2023-01-11 20:54:52,2023-01
4,NS105R,Other,27,Guatemala,2024-01-11 20:54:52,2024-01


In [4]:
data_users_df.shape

(25000, 6)

### <span style='color:#ff5f27'> 🎥 Content Data Generation</span>


In [5]:
# Generate data for videos
video_data = generate_video_content(
    config.VIDEO_AMOUNT_HISTORICAL, 
    historical=True,
)

data_video_df = pd.DataFrame(video_data)
data_video_df.upload_date = data_video_df.upload_date.apply(pd.to_datetime)

data_video_df.head()

Unnamed: 0,video_id,category,views,likes,video_length,upload_date,upload_month
0,4HQ12L,Entertainment,47916,19644,112,2023-02-24 20:54:59,2023-02
1,3MF02Z,News,62115,60914,216,2024-02-24 20:54:59,2024-02
2,4EI11Y,Cooking,17374,13810,239,2022-05-18 20:54:59,2022-05
3,8WD06G,Entertainment,43801,7394,144,2022-07-03 20:54:59,2022-07
4,2LB34K,Lifestyle,249173,221179,98,2023-08-11 20:54:59,2023-08


### <span style='color:#ff5f27'> 🔗 Interactions Generation</span>


In [6]:
# Generate interactions
interactions = generate_interactions(
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

data_interactions_df = pd.DataFrame(interactions)
data_interactions_df.interaction_date = data_interactions_df.interaction_date.apply(pd.to_datetime)
data_interactions_df.head()

Unnamed: 0,interaction_id,user_id,video_id,video_category,interaction_type,watch_time,interaction_date,interaction_day
0,1595-89-0473,NA356P,6SJ03H,Technology,skip,23,2023-09-14 20:54:59,2023-09-14
1,7757-13-8816,UA822Z,1FL37K,News,skip,89,2024-04-17 20:54:59,2024-04-17
2,5562-88-9089,FF706Z,1PG56V,Lifestyle,view,77,2023-10-25 20:54:59,2023-10-25
3,3462-40-9731,CJ496Y,1YB05J,Travel,view,77,2024-02-10 20:54:59,2024-02-10
4,4299-85-4264,PX985V,0LB67C,Sports,skip,11,2023-08-10 20:54:59,2023-08-10


In [7]:
user_interactions_window_agg = generate_user_interactions_window_agg(
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

In [8]:
user_interactions_window_agg_df = pd.DataFrame(user_interactions_window_agg)
user_interactions_window_agg_df.window_end_time = user_interactions_window_agg_df.window_end_time.apply(pd.to_datetime)
user_interactions_window_agg_df

Unnamed: 0,user_id,video_category,window_end_time,interaction_day,like_count,dislike_count,view_count,comment_count,share_count,skip_count,total_watch_time
0,QV478O,Music,2024-04-22 10:54:59,2024-04-22,45,46,81,54,96,39,30
1,QZ513C,Lifestyle,2023-11-16 06:54:59,2023-11-16,92,5,80,46,89,56,55
2,UC726A,Comedy,2023-05-26 07:54:59,2023-05-26,83,44,44,61,55,74,28
3,QA573V,Education,2023-03-28 02:54:59,2023-03-28,51,65,94,58,10,68,22
4,QK229F,Music,2023-11-27 07:54:59,2023-11-27,5,74,61,59,26,34,46
...,...,...,...,...,...,...,...,...,...,...,...
99995,OR564V,Dance,2022-11-12 08:54:59,2022-11-12,97,43,30,45,37,74,12
99996,XE198Y,Education,2022-11-05 08:54:59,2022-11-05,25,21,5,71,24,57,47
99997,WU874Q,Cooking,2022-12-30 01:54:59,2022-12-30,59,61,49,13,88,56,96
99998,XM733L,Dance,2022-08-07 03:54:59,2022-08-07,47,4,93,41,83,26,83


In [9]:
video_interactions_window_agg = generate_video_interactions_window_agg(    
    config.INTERACTIONS_AMOUNT_HISTORICAL, 
    user_data, 
    video_data,
)

In [10]:
video_interactions_window_agg_df = pd.DataFrame(video_interactions_window_agg)
video_interactions_window_agg_df.window_end_time = video_interactions_window_agg_df.window_end_time.apply(pd.to_datetime)
video_interactions_window_agg_df

Unnamed: 0,video_id,window_end_time,interaction_day,like_count,dislike_count,view_count,comment_count,share_count,skip_count,total_watch_time
0,3WE98R,2023-04-25 12:54:59,2023-04-25,80,39,60,78,77,52,39
1,4AL18F,2023-11-16 00:54:59,2023-11-16,50,1,19,61,26,23,47
2,9OY55W,2022-09-04 16:54:59,2022-09-04,88,77,81,20,68,24,8
3,6UX43P,2023-05-07 21:54:59,2023-05-07,28,28,4,23,1,34,17
4,1SC30Q,2023-12-17 17:54:59,2023-12-17,30,67,28,75,86,79,47
...,...,...,...,...,...,...,...,...,...,...
99995,4ZO52H,2023-07-31 22:54:59,2023-07-31,42,52,75,19,31,85,19
99996,5HC37E,2023-10-23 21:54:59,2023-10-23,43,89,26,55,33,13,28
99997,7YY10R,2023-10-10 20:54:59,2023-10-10,95,65,50,65,15,31,77
99998,0KO81D,2024-04-06 19:54:59,2024-04-06,100,72,31,91,28,75,80


## <span style="color:#ff5f27">👮🏻‍♂️ Great Expectations </span>

In [11]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_users_df = ge.from_pandas(data_users_df)

# Initialize the expectation suite
expectation_suite_users = ge_users_df.get_expectation_suite()
expectation_suite_users.expectation_suite_name = "user_data_suite"

# Expectation: Age should be between 0 and 120
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "age", "min_value": 12, "max_value": 100}
    )
)

# Expectations: Columns should not have null values
for column in ge_users_df.columns:
    expectation_suite_users.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Gender should only contain specific values
expectation_suite_users.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={"column": "gender", "value_set": ["Male", "Female", "Other"]}
    )
)

{"meta": {}, "kwargs": {"column": "gender", "value_set": ["Male", "Female", "Other"]}, "expectation_type": "expect_column_distinct_values_to_be_in_set"}

In [12]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_video_df = ge.from_pandas(data_video_df)

# Initialize the expectation suite
expectation_suite_videos = ge_video_df.get_expectation_suite()
expectation_suite_videos.expectation_suite_name = "video_data_suite"

# Expectation: Views, Likes, and Video Length should be non-negative
for column in ["views", "likes", "video_length"]:
    expectation_suite_videos.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={"column": column, "min_value": 0, "max_value": None}
        )
    )

# Expectation: Valid date format for upload_date
expectation_suite_videos.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_dateutil_parseable",
        kwargs={"column": "upload_date"}
    )
)

{"meta": {}, "kwargs": {"column": "upload_date"}, "expectation_type": "expect_column_values_to_be_dateutil_parseable"}

In [13]:
# Create a Great Expectations DataFrame from the pandas DataFrame
ge_interactions_df = ge.from_pandas(data_interactions_df)

# Initialize the expectation suite
expectation_suite_interactions = ge_interactions_df.get_expectation_suite()
expectation_suite_interactions.expectation_suite_name = "interactions_data_suite"

# Expectations: Non-null values in all columns
for column in ge_interactions_df.columns:
    expectation_suite_interactions.add_expectation(
        ExpectationConfiguration(
            expectation_type="expect_column_values_to_not_be_null",
            kwargs={"column": column}
        )
    )

# Expectation: Validate interaction types
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_distinct_values_to_be_in_set",
        kwargs={
            "column": "interaction_type",
            "value_set": ['like', 'dislike', 'view', 'comment', 'share', 'skip']
        }
    )
)

# Expectation: Positive watch time
expectation_suite_interactions.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={"column": "watch_time", "min_value": 0}
    )
)

{"meta": {}, "kwargs": {"column": "watch_time", "min_value": 0}, "expectation_type": "expect_column_values_to_be_between"}

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [14]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/398
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27">🪄 Feature Group Creation </span>


In [15]:
users_fg = fs.get_or_create_feature_group(
    name="users",
    description="Users data.",
    version=1,
    primary_key=["user_id"],
    partition_key=["registration_month"],
    event_time="registration_date",
    online_enabled=True,
    expectation_suite=expectation_suite_users,
)

users_fg.insert(data_users_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fg/740715
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/398/fs/335/fg/740715


Uploading Dataframe: 0.00% |          | Rows 0/25000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: users_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/398/jobs/named/users_1_offline_fg_materialization/executions
Done ✅


In [16]:
videos_fg = fs.get_or_create_feature_group(
    name="videos",
    description="Videos data.",
    version=1,
    primary_key=["video_id"],
    partition_key=["upload_month"],
    online_enabled=True,
    event_time="upload_date",
    expectation_suite=expectation_suite_videos,
)

videos_fg.insert(data_video_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fg/739708
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/398/fs/335/fg/739708


Uploading Dataframe: 0.00% |          | Rows 0/25000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: videos_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/398/jobs/named/videos_1_offline_fg_materialization/executions
Done ✅


In [18]:
interactions_fg = fs.get_or_create_feature_group(
    name="interactions",
    description="Interactions data.",
    version=1,
    primary_key=["interaction_id", "user_id", "video_id"],
    partition_key = ["interaction_day"],
    online_enabled=True,
    event_time="interaction_date",
    expectation_suite=expectation_suite_interactions,
)

interactions_fg.insert(data_interactions_df)
print('Done ✅')

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/398/fs/335/fg/739711
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/398/fs/335/fg/739711


Uploading Dataframe: 0.00% |          | Rows 0/100000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: interactions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/398/jobs/named/interactions_1_offline_fg_materialization/executions
Done ✅


In [None]:
# user_window_agg_1h_fg = fs.get_or_create_feature_group(
#     "user_window_agg_1h",
#     version=1,
#     statistics_config=False,
#     primary_key=["user_id"],
#     partition_key=["interaction_day"],
#     event_time="window_end_time",
#     online_enabled=True,
#     stream=True,
# )

# user_window_agg_1h_fg.insert(user_interactions_window_agg_df)

In [None]:
# video_window_agg_1h_fg = fs.get_or_create_feature_group(
#     "video_window_agg_1h",
#     version=1,
#     statistics_config=False,
#     primary_key=["video_id"],
#     partition_key=["interaction_day"],
#     event_time="window_end_time",
#     online_enabled=True,
#     stream=True,
# )

# video_window_agg_1h_fg.insert(video_interactions_window_agg_df)

## <span style="color:#ff5f27">🪄 Ranking Feature Group </span>


In [None]:
video_interactions_df = pd.merge(
    data_interactions_df, 
    data_video_df, 
    on='video_id', 
    how='inner',
)

ranking_df = pd.merge(
    video_interactions_df, 
    data_users_df, 
    on='user_id', 
    how='inner',
)

ranking_df['label'] = np.where(
    ranking_df.interaction_type.isin(['view', 'like', 'share', 'comment']), 
    1, 
    0,
)

ranking_df.drop(
    ['interaction_id', 'interaction_type', 'watch_time', 'interaction_date', 'upload_date', 'registration_date'], 
    axis=1,
    inplace=True,
)

ranking_df.head()

In [None]:
ranking_fg = fs.get_or_create_feature_group(
    name="ranking",
    description="Ranking Data.",
    version=1,
    primary_key=["user_id", "video_id"],
    partition_key = ["interaction_day"],    
    online_enabled=True, # why online?
)

ranking_fg.insert(ranking_df)
print('Done ✅')

---