In [0]:
%load_ext autoreload
%autoreload 2
import os
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import ArrayType, FloatType

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, f1_score, precision_score, recall_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

import lightgbm as lgb

from components.data_prep import *
from components.preprocess import *
from components.config import *
from components.helper import *
from model.nn import*

cdp_path='/dbfs/mnt/cdpprod/Customer_Profile_Aggregates/'
mgm_reward = spark.read.parquet('dbfs:/mnt/proddatalake/prod/CFA/CFA_overall_lvl')
rc = spark.read.format("delta").load('dbfs:/mnt/edhprodenrich/Enterprise Data Store/Secured/data/Cleanse/RCX/RCX_enums/CurrentState')
la = spark.read.format("delta").load("dbfs:/mnt/edhprodenrich/Enterprise Data Store/Secured/data/Cleanse/RCX/RCX_loyalty_activities/CurrentState")

In [0]:
regional = 'Borgata'

In [0]:
yesterday=str(max(os.listdir(cdp_path)))

trip_data = spark.read.parquet('dbfs:/mnt/cdpprod/Customer_Profile_Aggregates/'+yesterday+'/')\
            .withColumn('Site', F.when(F.col('SiteGroup') == 'LAS', 'Vegas').otherwise('Region'))\
            .filter(F.col('Mnth')>='2016-01-01')\
            .filter((F.col('TripRvMgmt_Segment')!='Convention')|F.col('TripRvMgmt_Segment').isNull())
trip_data = trip_data.withColumn("TripID",F.concat(F.col("Guest_ID"),F.lit('_'), F.col("TripStart"),F.lit('_'),F.col("TripEnd")))

CPA = trip_data.where("property_name != 'BetMGM' and tripstart < '2024-12-31'").select('guest_id','Property_Name','Department','TripStart','TripEnd','TripStartMlifeTier', 'TripGamingDays','TripID')
trip_CPA = CPA.groupBy('guest_id','TripStart','TripEnd', 'TripID').agg(F.count('Department').alias('dept_num'), F.max('Property_Name').alias('property_name'),
                                                                              F.max('TripGamingDays').alias('TripGamingDays'), F.max('TripStartMlifeTier').alias('TripStartMlifeTier'))
trip_borgata = trip_CPA.where(F.col('property_name').contains(regional))
trip_borgata_2022 = trip_borgata.where('TripStart between "2022-01-01" and "2022-12-31"')
trip_borgata_2023 = trip_borgata.where('TripStart between "2023-01-01" and "2023-12-31"')
trip_borgata_2024 = trip_borgata.where('TripStart between "2024-01-01" and "2024-12-31"')
# Players need to have trip in either 2022 or 2023
hist_spec = trip_borgata_2022.union(trip_borgata_2023).select('guest_id').distinct()
#trip_spec = trip_borgata_2024.join(hist_spec, on='guest_id', how='inner').select('guest_id').distinct()
# 2023 only
trip_spec = trip_borgata_2024.join(trip_borgata_2023, on='guest_id', how='inner').select('guest_id').distinct()

In [0]:
trip_spec_2024 = trip_borgata_2024.join(trip_borgata_2023, on='guest_id', how='inner').select('guest_id').distinct()
new_trip = trip_borgata_2024.join(trip_spec_2024, on = 'guest_id', how = 'leftanti')

tc_2023 = spark.read.parquet(f'dbfs:/mnt/proddatalake/dev/RCX/TC_2023.parquet')
tc_2024 = spark.read.parquet(f'dbfs:/mnt/proddatalake/dev/RCX/TC_2024.parquet')

tc_2024 = tc_2024.groupBy('playerid').agg(
    F.sum('tiercredit').alias('total_tc'), 
    F.sum(F.when(F.col('site_name').contains(regional), F.col('tiercredit')).otherwise(F.lit(0))).alias('regional_tc')
)
tc_2024 = tc_2024.join(mgm_reward, F.col('playerid') == F.col('MLifeID'), how='inner').select(*tc_2024.columns, 'guest_id')

# Filtering Borgata Dominant Players
regional_dominant_players = tc_2024.where('regional_tc > 0.8').select('guest_id')
trip = new_trip.join(regional_dominant_players, on='guest_id', how='inner')

In [0]:
TM = spark.read.parquet('dbfs:/mnt/proddatalake/dev/RCX/Tier_History.parquet').where('year(change_assigned) = 2024')
TM = TM.join(mgm_reward, TM.playerid == mgm_reward.MLifeID, how='inner').select('guest_id','change_assigned','tier_before_change','tier_after_change')
TM = TM.join(trip.select('guest_id').distinct(), on='guest_id',how='inner').select('guest_id','change_assigned','tier_before_change','tier_after_change').withColumn('tier_before', F.when(F.col('tier_before_change') == 'Sapphire', F.lit(1)).when(F.col('tier_before_change') == 'Pearl', F.lit(2)).when(F.col('tier_before_change') == 'Gold', F.lit(3)).when(F.col('tier_before_change') == 'Platinum', F.lit(4)).otherwise(5)) \
        .withColumn('tier_after', F.when(F.col('tier_after_change') == 'Sapphire', F.lit(1)).when(F.col('tier_after_change') == 'Pearl', F.lit(2)).when(F.col('tier_after_change') == 'Gold', F.lit(3)).when(F.col('tier_after_change') == 'Platinum', F.lit(4)).otherwise(5))
TM.where('tier_before < tier_after').select(F.countDistinct('guest_id')).display()

In [0]:
temp = TC_trip_formulation_daily_model(2024, trip, spark)
temp_train = temp.groupby('guest_id','calendar_year','change_assigned').agg(
    F.count('*').alias('trip_num'), 
    F.sum('TotalTierCredit').alias('TotalTierCredit'),
    F.sum('gaming_tc').alias('gaming_tc'),
    F.sum('non_gaming_tc').alias('non_gaming_tc'),
    F.min('tier').alias('tier'),
    F.max('tier_before').alias('pro_tier'),
    F.max('mt_reason').alias('reason_code'),
    F.max('mt_subreason').alias('subreason_code'),
    F.max('mth_reason').alias('mth_reason_code'),
    F.max('mth_subreason').alias('mth_subreason_code'),
    F.max('tier_after').alias('tier_after')
).withColumn(
    'trip_tier',
    F.coalesce('pro_tier', F.substring("tier", 2, 2).cast('int'))
).withColumn(
    'target_TC', 
    F.when(F.col('trip_tier')==1, 20000)
    .when(F.col('trip_tier')==2, 75000)
    .when(F.col('trip_tier')==3, 200000)
    .otherwise(-1)
).filter('target_tc != -1')

In [0]:
year = 2024
TC = spark.read.parquet(f'dbfs:/mnt/proddatalake/dev/RCX/TC_{year}.parquet')
temp_train_daily = get_train_daily_new(TC, 2024, temp, temp_train, mgm_reward, rc, la)

window_spec = Window.partitionBy('train_guest_id').orderBy('transactiondate').rowsBetween(Window.unboundedPreceding, 0)

temp_train_daily = temp_train_daily.withColumn(
    "lodger_percentage",
    (F.sum(F.when(F.col('TripLodgingStatus')==1, 1).otherwise(0)).over(window_spec) / F.row_number().over(window_spec))
).withColumn(
    "local_percentage",
    (F.sum(F.when(F.col('TripLodgingStatus')==2, 1).otherwise(0)).over(window_spec) / F.row_number().over(window_spec))
)

temp_train_daily = temp_train_daily.distinct()

error= temp_train_daily.where('cuml_tc > target_tc and (transactiondate != change_assigned or change_assigned is null)').select('train_guest_id').distinct()
new_train_daily = temp_train_daily.join(error, on = 'train_guest_id', how='leftanti')

#temp_train_daily = temp_train_daily.where('(change_assigned != transactiondate) or change_assigned is null')

In [0]:
temp_train_daily.join(error, on = 'train_guest_id', how='inner').where('train_guest_id = 78151882').display()

In [0]:
mlflow.set_registry_uri("databricks-uc")
scaler_model_path = "models:/data_science_mart.tierimminent_cleaned.minmax_scaler_borgata_new@active"
minmax_scalar_new, train = train_minmax(new_train_daily, FEATURE_NAMES_NEW)
#train = apply_minmax(new_train_daily, FEATURE_NAMES_NEW, scaler_model_path)
train.write.mode("overwrite").parquet('/mnt/proddatalake/dev/TierImminent/data/BR_new_scaled_test.parquet')
t = spark.read.parquet('dbfs:/mnt/proddatalake/dev/TierImminent/data/BR_new_scaled_test.parquet')

# unsucess_id = t.groupby('train_guest_id').agg(F.count('change_assigned').alias('count')).where('count = 0').sample(withReplacement=False, fraction=0.5, seed=269)
# train_final = t.join(unsucess_id, on = 'train_guest_id', how='leftanti').select(*train.columns)
train_final = t.where('(change_assigned != transactiondate) or change_assigned is null')

In [0]:
t.describe().display()

In [0]:
loaded_model = mlflow.pytorch.load_model(f"models:/data_science_mart.tierimminent_cleaned.borgata_lstm_model_new@active", map_location=torch.device('cpu'))

In [0]:
from tqdm import tqdm
# outputs_ls, labels_ls, y_pred_ls_2, y_pred_ls_5, y_pred_ls_35 = [], [], [], [], []
outputs_ls = []
labels_ls = []
remaining_ls = []
new_outputs_ls = []
id_ls = []
X, y, l = None, np.array([]), np.array([])
tier = pd.DataFrame(columns=['id','remaining_dyas','tier'])
features_scaled = []
for j in range(len(FEATURE_NAMES_NEW)):
    features_scaled.append(FEATURE_NAMES_NEW[j]+'_scaled')
for i in tqdm(range(365, 0, -1)):
    cus_list = train_final.filter(train_final.remaining_days == i).select(F.col('train_guest_id').alias('id')).distinct()
    temp_df = train_final.join(cus_list, (train_final.train_guest_id == cus_list.id) & (train_final.remaining_days >= i), how = 'inner')
    temp_df = temp_df.withColumn('rn', F.row_number().over(Window.partitionBy('train_guest_id').orderBy('remaining_days'))).where('rn <= 6')
    # t = temp_df.groupby('train_guest_id','remaining_days').agg(F.min('target_tc').alias('tier')).where(f'remaining_days == {i}')
    # t = t.toPandas()
    # tier = pd.concat([tier, t], ignore_index=True)
 
    train_np, label, length, train_id = create_sequences_and_labels(temp_df, features_scaled, 6)
    if X is None:
        X = train_np
    else:
        X = np.append(X, train_np, axis = 0)
    y = np.append(y, label)
    l = np.append(l, length)
    
    # X = torch.tensor(train_np, dtype=torch.float32).cpu()
    # len_X = torch.tensor(length, dtype=torch.float32).cpu()
    # with torch.no_grad():
    #     outputs = loaded_model(X, len_X).squeeze()
    # outputs = torch.sigmoid(outputs)

    test_X = torch.tensor(train_np, dtype=torch.float32).cpu()
    len_X = torch.tensor(length, dtype=torch.float32).cpu()
    with torch.no_grad():
        new_outputs = loaded_model(test_X, len_X).squeeze()

    new_outputs = torch.sigmoid(new_outputs)


    # y_pred_2 = [1 if prob >= 0.2 else 0 for prob in outputs]
    # y_pred_5 = [1 if prob >= 0.5 else 0 for prob in outputs]
    # y_pred_35 = [1 if prob >= 0.35 else 0 for prob in outputs]

    # for i, p in zip(train_id, y_pred):
    #     if p == 1:
    #         if i in output_dict:
    #             output_dict[i] += 1
    #         else:
    #             output_dict[i] = 1
                                                                                                                                                              
    # y_pred_ls_2.extend(y_pred_2)  
    # y_pred_ls_5.extend(y_pred_5)
    # y_pred_ls_35.extend(y_pred_35)
    # outputs_ls.extend(outputs)
    # labels_ls.extend(label)
    # id_ls.extend(train_id)
    # remaining_ls.extend([i]*len(label))
    new_outputs_ls.extend(new_outputs)
    labels_ls.extend(label)
    id_ls.extend(train_id)
    remaining_ls.extend([i]*len(label))


In [0]:
fpr, tpr, thresholds = roc_curve(labels_ls, new_outputs_ls)
auc_score = roc_auc_score(labels_ls, new_outputs_ls)
plt.figure()
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {auc_score:.2f})")
plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="best")
plt.grid()
for i in range(0, len(thresholds), len(thresholds)//20):
    plt.text(fpr[i], tpr[i], f'{thresholds[i]:.2f}', fontsize=8, color='red')
#plt.savefig("/Workspace/Users/609399@mgmresorts.com/Tier Imminent/model_performance_metric/sim_2024_roc.png")
plt.show()
plt.close() 

In [0]:
fpr, tpr, thresholds = roc_curve(labels_ls, new_outputs_ls)
auc_score = roc_auc_score(labels_ls, new_outputs_ls)
plt.figure()
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {auc_score:.2f})")
plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="best")
plt.grid()
for i in range(0, len(thresholds), len(thresholds)//20):
    plt.text(fpr[i], tpr[i], f'{thresholds[i]:.2f}', fontsize=8, color='red')
#plt.savefig("/Workspace/Users/609399@mgmresorts.com/Tier Imminent/model_performance_metric/sim_2024_roc.png")
plt.show()
plt.close() 