In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import numpy as np
import random
import datetime
# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
import lightgbm as lgb
from snowflake.snowpark import functions as F
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
session = get_active_session()
print(f"Database: {session.get_current_database()}, Schema: {session.get_current_schema()}")

In [None]:
NUM_USERS = 1000
NUM_OFFERS = 50
AVG_OFFERS_PER_USER = 10
MAX_OFFERS_PER_USER = 20
MODEL_NAME = "credit_card_ltr_model"
MODEL_VERSION = "v1"
SERVICE_NAME = "credit_card_ltr_service"

In [None]:
def generate_users(n):
    """Generates synthetic user data."""
    data = []
    for i in range(n):
        user_id = f"user_{i:04d}"
        age = random.randint(18, 75)
        income = max(20000, np.random.normal(loc=60000 + (age - 18) * 500, scale=20000))
        credit_score = min(850, max(300, int(np.random.normal(loc=500 + (age / 80) * 200 + (income / 150000) * 150, scale=75))))
        num_existing_cards = random.randint(0, 8)
        data.append({
        "USER_ID": user_id,
        "AGE": age,
        "INCOME": round(income, 2),
        "CREDIT_SCORE": credit_score,
        "NUM_EXISTING_CARDS": num_existing_cards,
        })
    return pd.DataFrame(data)

In [None]:
def generate_offers(n):
    """Generates synthetic credit card offer data."""
    data = []
    issuers = ["amex", "visa", "chase", "mastercard"]
    credit_scores = [400, 450, 500, 550, 600, 650, 700, 750, 800, 850]
    fees = [10, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500]
    reward_types = ["cashback", "points", "miles", "low_apr"]
    for i in range(n):
        offer_id = f"offer_{i:03d}"
        issuer = random.choice(issuers)
        card_name = f"{issuer} Card {random.randint(1,100)}"
        min_credit_score = random.choice(credit_scores)
        annual_fee = random.choice(fees)
        reward_type = random.choice(reward_types)
        base_reward_rate = round(random.uniform(0.005, 0.03), 3) if reward_type!= 'low_apr' else 0.0
        required_income = max(15000, min_credit_score * 50 + annual_fee * 100 + random.randint(-5000, 15000))
        data.append({
        "OFFER_ID": offer_id,
        "CARD_NAME": card_name,
        "ISSUER": issuer,
        "MIN_CREDIT_SCORE": min_credit_score,
        "ANNUAL_FEE": annual_fee,
        "REWARD_TYPE": reward_type,
        "BASE_REWARD_RATE": base_reward_rate,
        "REQUIRED_INCOME": round(required_income, 2),
        })
    return pd.DataFrame(data)

In [None]:
def generate_interactions(users_df, offers_df, avg_offers, max_offers):
    """Generates synthetic user-offer interactions and relevance scores."""
    interactions = []
    all_offer_ids = offers_df['OFFER_ID'].tolist()
    query_counter = 0
    for _, user in users_df.iterrows():
        user_id = user['USER_ID']
        num_offers_shown = random.randint(max(1, avg_offers // 2), max_offers)
        shown_offers = random.sample(all_offer_ids, num_offers_shown)
        query_id = f"query_{query_counter:05d}"
        query_counter += 1    
        for offer_id in shown_offers:
            offer = offers_df[offers_df['OFFER_ID'] == offer_id].iloc[0]
            timestamp = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 90), hours=random.randint(0, 23))
            # Determine relevance (simplified logic)
            relevance = 0
            prob_click = 0.1 # Base click probability
            prob_apply = 0.05 # Base apply probability (conditional on click)
    
            # Increase probability based on user/offer match
            if user["CREDIT_SCORE"] >= offer["MIN_CREDIT_SCORE"]:
                prob_click += 0.15
                prob_apply += 0.1
            if user["INCOME"] >= offer["REQUIRED_INCOME"]:
                prob_click += 0.1
                prob_apply += 0.05
            if user['NUM_EXISTING_CARDS'] < 3:
                 prob_click += 0.05
                 prob_apply += 0.02
    
            clicked = 0
            applied = 0
            if random.random() < prob_click:
                clicked = 1
                relevance = 1
                if random.random() < prob_apply:
                    applied = 1
                    relevance = 2 # Higher relevance for application
    
            interactions.append({
                "QUERY_ID": query_id,
                "USER_ID": user_id,
                "OFFER_ID": offer_id,
                "TIMESTAMP": timestamp,
                "CLICKED": clicked,
                "APPLIED": applied,
                "RELEVANCE": relevance, # Target variable for LTR
            })
    
    return pd.DataFrame(interactions)

In [None]:
def load_data_to_snowflake(session, df, table_name):
    """Loads a Pandas DataFrame into a Snowflake table."""
    print(f"Loading data into Snowflake table: {table_name}")
    # Infer schema (adjust types if necessary)
    snowpark_df = session.create_dataframe(df)
    snowpark_df.write.mode("overwrite").save_as_table(table_name)
    print(f"Successfully loaded {len(df)} rows into {table_name}.")

In [None]:
users_df = generate_users(NUM_USERS)
offers_df = generate_offers(NUM_OFFERS)
interactions_df = generate_interactions(users_df, offers_df, AVG_OFFERS_PER_USER, MAX_OFFERS_PER_USER)

In [None]:
interactions_df.head()

In [None]:
load_data_to_snowflake(session, users_df, "USERS")
load_data_to_snowflake(session, offers_df, "OFFERS")
load_data_to_snowflake(session, interactions_df, "INTERACTIONS")

### Feature Engineering

In [None]:
users_sp_df = session.table("USERS")
offers_sp_df = session.table("OFFERS")
interactions_sp_df = session.table("INTERACTIONS")

In [None]:
interactions_sp_df.show()

In [None]:
users_sp_df.show()

In [None]:
offers_sp_df.show()

In [None]:
joined_df = interactions_sp_df.join(users_sp_df, interactions_sp_df["USER_ID"] == users_sp_df["USER_ID"], rsuffix="_USER") \
                             .join(offers_sp_df, interactions_sp_df["OFFER_ID"] == offers_sp_df["OFFER_ID"], rsuffix="_OFFER")

In [None]:
joined_df.show()

In [None]:
feature_eng_df = joined_df.select(
    "QUERY_ID",
    "USER_ID",
    "OFFER_ID",
    "CLICKED",
    "APPLIED",
    "RELEVANCE",
    "AGE",
    "INCOME",
    "CREDIT_SCORE",
    "NUM_EXISTING_CARDS",
    F.col("MIN_CREDIT_SCORE").alias("OFFER_MIN_CREDIT_SCORE"),
    F.col("ANNUAL_FEE").alias("OFFER_ANNUAL_FEE"),
    F.col("BASE_REWARD_RATE").alias("OFFER_BASE_REWARD_RATE"),
    F.col("REQUIRED_INCOME").alias("OFFER_REQUIRED_INCOME"),
    F.col("REWARD_TYPE").alias("OFFER_REWARD_TYPE"),
    F.col("ISSUER").alias("OFFER_ISSUER"),
    (F.col("CREDIT_SCORE")-F.col("MIN_CREDIT_SCORE")).alias("CREDIT_SCORE_DIFF"),
    (F.col("INCOME") - F.col("REQUIRED_INCOME")).alias("INCOME_DIFF"),
    F.iff(F.col("INCOME") > 0, F.col("INCOME") / F.col("REQUIRED_INCOME"), F.lit(1.0)).alias("INCOME_RATIO"),
    # Simple one-hot encoding placeholders (more robust methods exist in Snowpark ML)
    F.iff(F.col("REWARD_TYPE") == 'cashback', F.lit(1), F.lit(0)).alias("IS_CASHBACK"),
    F.iff(F.col("REWARD_TYPE") == 'points', F.lit(1), F.lit(0)).alias("IS_POINTS"),
    F.iff(F.col("REWARD_TYPE") == 'miles', F.lit(1), F.lit(0)).alias("IS_MILES"),
    F.iff(F.col("REWARD_TYPE") == 'low_apr', F.lit(1), F.lit(0)).alias("IS_LOW_APR"),
    F.iff(F.col("OFFER_ISSUER") == 'amex', F.lit(1), F.lit(0)).alias("IS_AMEX"),
    F.iff(F.col("OFFER_ISSUER") == 'visa', F.lit(1), F.lit(0)).alias("IS_VISA"),
    F.iff(F.col("OFFER_ISSUER") == 'chase', F.lit(1), F.lit(0)).alias("IS_CHASE"),
    F.iff(F.col("OFFER_ISSUER") == 'mastercard', F.lit(1), F.lit(0)).alias("IS_MASTERCARD")
)


In [None]:
feature_eng_df.write.mode("overwrite").save_as_table("ENGINEERED_FEATURES")
feature_eng_df = session.table("ENGINEERED_FEATURES")

In [None]:
feature_eng_df.show()

In [None]:
identifier_cols = ["QUERY_ID", "USER_ID", "OFFER_ID"]
categorical_cols = ["OFFER_REWARD_TYPE", "OFFER_ISSUER"]
target_col = "RELEVANCE"
feature_cols = feature_eng_df.columns
feature_cols.remove(target_col)
for col in identifier_cols:
    feature_cols.remove(col)
for col in categorical_cols:
    feature_cols.remove(col)

In [None]:
feature_cols

In [None]:
feature_eng_df.select(*feature_cols).show()

### Prepare data for training

In [None]:
print("Preparing data for LightGBM training...")
training_data_pd = feature_eng_df.sort("QUERY_ID").to_pandas()
# Handle potential NaN/Inf values introduced during engineering
training_data_pd.replace([np.inf, -np.inf], np.nan, inplace=True)
# Simple imputation: fill NaN with 0 (consider more sophisticated methods)
training_data_pd[feature_cols] = training_data_pd[feature_cols].fillna(0)
X_train = training_data_pd[feature_cols]
y_train = training_data_pd[target_col]
# Calculate group sizes for LTR
# group_data contains the number of items (offers) for each query (user)
group_data = training_data_pd.groupby("QUERY_ID").size().tolist()
print(f"Training data shape: X={X_train.shape}, y={y_train.shape}")
print(f"Number of groups (queries): {len(group_data)}")
print(f"Group sizes sample: {group_data[:5]}...")

### Train model

In [None]:
group_data

In [None]:
print("Training LightGBM LGBMRanker model...")
ranker = lgb.LGBMRanker(
    objective="lambdarank", # Standard LTR objective
    metric="ndcg",          # Normalized Discounted Cumulative Gain
    n_estimators=100,
    learning_rate=0.05,
    num_leaves=31,
    random_state=42,
    n_jobs=-1,
    # Add other hyperparameters as needed
)

ranker.fit(
    X_train,
    y_train,
    group=group_data,#
    eval_set=[(X_train, y_train)], # Evaluate on training data for simplicity
    eval_group=[group_data]
)
print("LGBMRanker training complete.")

In [None]:
ranker

In [None]:
from sklearn.metrics import mean_squared_error, r2_score
preds = ranker.predict(X_train)
mse = mean_squared_error(y_train, preds)
r2 = r2_score(y_train, preds)
print(mse)

### Deploy the model into model registry

In [None]:
from snowflake.ml.registry import registry

In [None]:
reg = registry.Registry(session=session)

In [None]:
model_name = "ltr_model"
image_repo_name = "images"
cp_name = "kipi_ltr_cpu"
num_spcs_nodes = "4"
spcs_instance_family = "CPU_X64_L"
service_name = "kipi_ltr_service"
current_database = session.get_current_database().replace('"', '')
current_schema = session.get_current_schema().replace('"', '')
extended_image_repo_name = f"{current_database}.{current_schema}.{image_repo_name}"
extended_service_name = f'{current_database}.{current_schema}.{service_name}'

In [None]:
!pip list

In [None]:
_ = reg.log_model(
    ranker,
    model_name=model_name,
    sample_input_data=X_train,
    pip_requirements=["lightgbm==4.5.0"]
)
     

In [None]:
m = reg.get_model(model_name)
version_df = m.show_versions()
version_df.head(100)

In [None]:
last_version_name = version_df['name'].iloc[-1]
pip_model = m.version(last_version_name)
pip_model

In [None]:
session.sql(f"alter compute pool if exists {cp_name} stop all").collect()
session.sql(f"drop compute pool if exists {cp_name}").collect()
session.sql(f"create compute pool {cp_name} min_nodes={num_spcs_nodes} max_nodes={num_spcs_nodes} instance_family={spcs_instance_family} auto_resume=True auto_suspend_secs=300").collect()
session.sql(f"describe compute pool {cp_name}").show()

In [None]:
session.sql(f"create or replace image repository {extended_image_repo_name}").collect()

In [None]:
pip_model.create_service(
    service_name=extended_service_name,
    service_compute_pool=cp_name,
    image_repo=extended_image_repo_name,
    ingress_enabled=True,
    max_instances=int(num_spcs_nodes),
    build_external_access_integration="ALLOW_ALL_INTEGRATION"
)

In [None]:
pip_model.list_services()

In [None]:
session.sql(f"SELECT VALUE:status::VARCHAR as SERVICESTATUS, VALUE:message::VARCHAR as SERVICEMESSAGE FROM TABLE(FLATTEN(input => parse_json(system$get_service_status('{service_name}')), outer => true)) f").show(100)

In [None]:
pip_model.run(X_train, function_name="predict", service_name=service_name)

In [None]:
session.sql(f"show endpoints in service {service_name}").collect()[0]["ingress_url"]

In [None]:
pip_model.show_functions()

In [None]:
feature_columns = [feature.name for feature in pip_model.show_functions()[0]["signature"].inputs]
feature_columns

In [None]:
sample_data = X_train.iloc[0]

In [None]:
print(sample_data.tolist())