In [None]:
# Import python packages
import streamlit as st
import numpy as np
import pandas as pd
from snowflake.snowpark.functions import(
col, count, avg, max as max_, min as min_, dateadd, current_date, lit, sum as sum_, 
coalesce, datediff, any_value, when, array_unique_agg, array_size, regexp_replace, iff, nullifzero
)
from snowflake.ml.feature_store import FeatureStore, Entity, FeatureView, CreationMode

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

## Data preparation

#### Call quality

In [None]:
user_features_call_quality_df = session.sql("""
SELECT 
    USER_ID_HEX,
    CAST(COUNT_IF(COALESCE(num_bad_mos_periods, 0) > 0) AS FLOAT) AS calls_with_bad_mos,
    CAST(AVG(computed_mos) AS FLOAT) AS average_mos,
    CAST(MAX(RTP_SETUP_TIME) AS FLOAT) AS max_rtp_setup_time,
    CAST(COUNT_IF(CASE WHEN call_date >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN COALESCE(num_bad_mos_periods, 0) > 0 ELSE FALSE END) AS FLOAT) AS calls_with_bad_mos_7d,
    CAST(AVG(CASE WHEN call_date >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN computed_mos END) AS FLOAT) AS average_mos_7d,
    CAST(MAX(CASE WHEN call_date >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN RTP_SETUP_TIME END) AS FLOAT) AS max_rtp_setup_time_7d
FROM dev.public.legacy_call_end
WHERE call_date <= CURRENT_DATE()
    AND USER_ID_HEX != '000-00-000-000000000'
GROUP BY USER_ID_HEX
""")
user_features_call_quality_df.show()

#### Call rating

In [None]:
user_features_call_rating_df =session.sql("""
SELECT 
    user_id_hex,
    CAST(COUNT(call_rating) AS FLOAT) AS call_rating_count,
    CAST(AVG(call_rating) AS FLOAT) AS avg_call_rating,
    CAST(MAX(call_rating) AS FLOAT) AS max_call_rating,
    CAST(MIN(call_rating) AS FLOAT) AS min_call_rating,
    CAST(COUNT(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN call_rating END) AS FLOAT) AS call_rating_count_7d,
    CAST(AVG(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN call_rating END) AS FLOAT) AS avg_call_rating_7d,
    CAST(MAX(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN call_rating END) AS FLOAT) AS max_call_rating_7d,
    CAST(MIN(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN call_rating END) AS FLOAT) AS min_call_rating_7d
FROM dev.public.call_ratings_combined_sources
WHERE date_utc <= CURRENT_DATE()
    AND call_rating > 0
    AND user_id_hex != '000-00-000-000000000'
GROUP BY user_id_hex
""")
user_features_call_rating_df.show()

#### Data usage

In [None]:
user_features_data_usage_df = session.sql("""
SELECT 
    up.user_id_hex,
    SUM(c.mb_usage) AS data_usage_mb,
    SUM(CASE WHEN c.date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN c.mb_usage ELSE 0 END) AS data_usage_mb_7d
FROM dev.public.cost_user_daily_tmobile_cost c
JOIN dev.public.user_profiles up ON c.username = up.latest_username
WHERE c.date_utc <= CURRENT_DATE()
GROUP BY up.user_id_hex
""")
user_features_data_usage_df.show()

#### Session

In [None]:
user_features_sessions_df = session.sql("""
SELECT 
    up.user_id_hex,
    CAST(SUM(m.time_in_app_mins_per_day) AS FLOAT) AS time_in_app_mins,
    CAST(DATEDIFF(day, ANY_VALUE(up.registered_at), CURRENT_DATE()) AS FLOAT) AS tenure_days,
    CAST(SUM(m.num_sessions) AS FLOAT) AS session_count,
    CAST(SUM(CASE WHEN m.date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN m.time_in_app_mins_per_day ELSE 0 END) AS FLOAT) AS time_in_app_mins_7d,
    CAST(SUM(CASE WHEN m.date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN m.num_sessions ELSE 0 END) AS FLOAT) AS session_count_7d
FROM dev.public.metrics_daily_userlevel_app_time_sessions m
JOIN dev.public.user_profiles up ON m.username = up.latest_username
WHERE m.date_utc <= CURRENT_DATE()
GROUP BY up.user_id_hex
""")
user_features_sessions_df.show()

#### NPS ratings

In [None]:
user_features_nps_rating_df = session.sql("""
SELECT 
    user_id_hex,
    CAST(COUNT(*) AS FLOAT) AS nps_count,
    CAST(AVG(score) AS FLOAT) AS nps_avg_rating,
    CAST(MAX(score) AS FLOAT) AS nps_max_rating,
    CAST(COUNT(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN 1 END) AS FLOAT) AS nps_count_7d,
    CAST(AVG(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN score END) AS FLOAT) AS nps_avg_rating_7d,
    CAST(MAX(CASE WHEN date_utc >= CURRENT_DATE() - INTERVAL '7 DAYS' THEN score END) AS FLOAT) AS nps_max_rating_7d
FROM dev.public.nps_combined_sources
WHERE date_utc <= CURRENT_DATE()
    AND user_id_hex != '000-00-000-000000000'
GROUP BY user_id_hex
""")
user_features_nps_rating_df.show()

## Feature store

#### Create FS

In [None]:
# fs = FeatureStore(
#     session=session,
#     database="dev",
#     name="user_activity_feature_store",
#     default_warehouse="ds_wh_medium",
#     creation_mode=CreationMode.CREATE_IF_NOT_EXIST
# )

#### Connect to FS

In [None]:
fs = FeatureStore(
    session=session,
    database="dev",
    name="user_activity_feature_store",
    default_warehouse="ds_wh_medium"
)

#### Create and register entities

In [None]:
# entity = Entity(
#     name="user",
#     join_keys=["user_id_hex"],
#     desc="user entity"
# )
# fs.register_entity(entity)

#### Get existing entities

In [None]:
entity = fs.get_entity("user")

#### Create and register feature views

In [None]:
# Call quality
user_features_call_quality_fv = FeatureView(
    name="user_features_call_quality",
    entities=[entity],
    feature_df=user_features_call_quality_df,
    refresh_freq="24 hours",
    desc="features about user call quality"
)

fs.register_feature_view(
    feature_view=user_features_call_quality_fv,
    version="1"
)

In [None]:
# Call rating
user_features_call_rating_fv = FeatureView(
    name="user_features_call_rating",
    entities=[entity],
    feature_df=user_features_call_rating_df,
    refresh_freq="24 hours",
    desc="features about user call rating"
)

fs.register_feature_view(
    feature_view=user_features_call_rating_fv,
    version="1"
)

In [None]:
# Data usage
user_features_data_usage_fv = FeatureView(
    name="user_features_data_usage",
    entities=[entity],
    feature_df=user_features_data_usage_df,
    refresh_freq="24 hours",
    desc="features about user data usage"
)

fs.register_feature_view(
    feature_view=user_features_data_usage_fv,
    version="1"
)

In [None]:
# User sessions
user_features_sessions_fv = FeatureView(
    name="user_features_sessions",
    entities=[entity],
    feature_df=user_features_sessions_df,
    refresh_freq="24 hours",
    desc="features about user sessions"
)

fs.register_feature_view(
    feature_view=user_features_sessions_fv,
    version="1"
)

In [None]:
# NPS Rating
user_features_nps_rating_fv = FeatureView(
    name="user_features_nps_rating",
    entities=[entity],
    feature_df=user_features_nps_rating_df,
    refresh_freq="24 hours",
    desc="features about user NPS Rating"
)

fs.register_feature_view(
    feature_view=user_features_nps_rating_fv,
    version="1"
)

## Model training

In [None]:
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from snowflake.ml.experiment import ExperimentTracking

exp = ExperimentTracking(session=session)
exp.set_experiment("Baseline_user_activity_forecastint_models")

#### Spine df

In [None]:
spine_df= session.sql("""
SELECT
    up.user_id_hex,
    sum(iff(m.time_in_app_mins_per_day > 1, 1, 0)) as active_days_in_week
FROM dev.public.metrics_daily_userlevel_app_time_sessions m
JOIN dev.public.user_profiles up ON m.username = up.latest_username
WHERE m.date_utc >= dateadd('day', -14, current_date())
GROUP by up.user_id_hex
""")
spine_df.show()

#### Get training dataset from FS

In [None]:
user_features_call_quality=fs.get_feature_view(name="user_features_call_quality", version="1")
user_features_call_rating=fs.get_feature_view(name="user_features_call_rating", version="1")

df = fs.generate_training_set(
    spine_df=spine_df,
    features=[user_features_call_quality, user_features_call_rating],
    spine_label_cols="active_days_in_week"
)
df = df.to_pandas()
df.head()

#### Split training and testing dataset

In [None]:
X = df.drop(['USER_ID_HEX', 'ACTIVE_DAYS_IN_WEEK'], axis=1)
y = df['ACTIVE_DAYS_IN_WEEK']

# Split the data into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# X_train = X_train.replace([np.inf, -np.inf], np.nan)
# X_test = X_test.replace([np.inf, -np.inf], np.nan)
# X_train = X_train.fillna(0)
# X_test = X_test.fillna(0)

train_count = X_train.count()
test_count = X_test.count()
print(f"  Training set: {train_count} samples")
print(f"  Test set: {test_count} samples\n")

#### Preprocessing

In [None]:
numerical_cols = X.select_dtypes(include=['number']).columns
preprocess=ColumnTransformer([
    ("num", StandardScaler(), numerical_cols)
])

#### Base training

In [None]:
baseline_models = [
    # Linear models
    ("LinearRegression", LinearRegression(), {}),
    ("Ridge", Ridge(), {'model__alpha': [0.1, 1.0, 10.0, 100.0]}),
    ("Lasso", Lasso(), {'model__alpha': [0.1, 1.0, 10.0, 100.0]}),
    
    # Random Forest variations
    ("RandomForest", RandomForestRegressor(), {
        'model__n_estimators': [50, 100, 200],
        'model__max_depth': [5, 10, 15, None],
        'model__min_samples_split': [2, 5, 10]
    }),
    
    # Gradient Boosting
    ("GradientBoosting", GradientBoostingRegressor(), {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    }),
    
    # XGBoost
    ("XGBoost", XGBRegressor(verbosity=0), {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    }),
    
    # LightGBM
    ("LightGBM", LGBMRegressor(), {
        'model__n_estimators': [50, 100, 200],
        'model__learning_rate': [0.01, 0.1, 0.2],
        'model__max_depth': [3, 5, 7]
    })
]

In [None]:
results=[]
for name, model, param_grid in baseline_models:
    print(f"Training with RandomizedSearch: {name}")

    pipeline = Pipeline(steps=[
        ("preprocess", preprocess),
        ("model", model)
    ])

    # Use RandomizedSearchCV if parameters are provided
    if param_grid:
        random_search = RandomizedSearchCV(
            estimator=pipeline,
            param_distributions=param_grid,
            n_iter=10,
            cv=3,
            scoring='neg_mean_squared_error',
            n_jobs=-1,
            verbose=1,
            random_state=42
        )
        random_search.fit(X_train, y_train)
        
        best_pipeline = random_search.best_estimator_
        best_params = random_search.best_params_
        y_pred = best_pipeline.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        
        print(f"{name} -> Best Params: {best_params}")
        print(f"{name} -> Test MSE: {mse:.4f}")
        
        results.append((name, mse, best_pipeline))
        
        # Log experiment with best parameters
        with exp.start_run():
            exp.log_metric("mse", mse)
            exp.log_param("model_type", name)
            for param_name, param_value in best_params.items():
                exp.log_param(param_name, param_value)
            exp.log_model(model=best_pipeline, model_name=f"{name}_model", sample_input_data=X_train.head())
    else:
        # Fit directly without RandomizedSearch for models with no hyperparameters
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        
        results.append((name, mse, pipeline))
        print(f"{name} -> Test MSE: {mse:.4f}")
        
        # Log experiment
        with exp.start_run():
            exp.log_metric("mse", mse)
            exp.log_param("model_type", name)
            exp.log_model(model=pipeline, model_name=f"{name}_model", sample_input_data=X_train.head())

## Model registry

In [None]:
exp.list_artifacts("HOT_KIWI_4", artifact_path="manifest.yaml")

In [None]:
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name="dev", schema_name="data_science")

In [None]:
# # Log the model
# model_version = registry.log_model(
#     model=LinearRegression,
#     model_name="base_linear_regression_model",
#     version_name="v1.0",
#     comment="Base version of the linear regression model",
#     sample_input_data=X.head()
# )
# print(f"Model '{model_version.model_name}' version '{model_version.version_name}' registered successfully.")

## Inference