In [0]:
%pip install mlflow xgboost

%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

%restart_python

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import DataFrame, functions as F, types as T, Window

import builtins
from datetime import datetime
from typing import Optional, Dict, Union, List, Tuple, Any
import math
import random


import pandas as pd
import numpy as np
import sklearn

from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor
import mlflow

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics


from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, DenseVector, SparseVector, VectorUDT
from pyspark.ml import Pipeline, PipelineModel


from pyspark.ml.tuning import CrossValidatorModel, TrainValidationSplitModel, ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.storagelevel import StorageLevel

import matplotlib.pyplot as plt

from pyspark.sql.functions import round
import mlflow.spark
from mlflow.artifacts import download_artifacts


In [0]:
from src.config import *
from src.sampling import *
from src.tracking import *
from src.tuning import * 

In [0]:
LABEL_COL = "churn7"
DATE_FILTER = "2025-10-26"
DATE_INTERVAL = 30

# Payer split: None --> no split, "0" --> non-payer, "1,2" --> payer
#payer_split = "1,2"


#payer_split = None
#payer_split = "0,1"

In [0]:
#spark.sql(f""" describe table {FEATURES_TABLE_NAME}""").display()

In [0]:
df = spark.sql(f"""select * from {FEATURES_TABLE_NAME}
                                where '{LABEL_COL}' is not null
                                and date between date_sub('{DATE_FILTER}',{DATE_INTERVAL}) AND '{DATE_FILTER}' """).withColumn('market_name', col('market')).drop('market').withColumnRenamed('market_name','market')

In [0]:
from pyspark.sql.types import StringType, NumericType, BooleanType

string_features = []
numerical_features = []
churn_labels = []

drop_cols = ['judi','date','ts_last_updated','processed_date','churn3','churn5','churn7','churn14']


for field in df.schema.fields:
    if isinstance(field.dataType, StringType) and field.name not in drop_cols:
        string_features.append(field.name)
    elif isinstance(field.dataType, NumericType) and field.name not in drop_cols:
        numerical_features.append(field.name)
        
    

In [0]:
### This is the churn feature SET!!
churn_features = df.withColumn('label', when(col(LABEL_COL)==True,1).otherwise(0))

In [0]:
#churn_features.display()

In [0]:
split_payers = True
upsample=True
undersample=True

if split_payers:
    payers = ['P','S']
    non_payers= ['N']
    
    unioned_sets = get_stratified_sets(churn_features, split=None, undersample=undersample, upsample=upsample)

    payers_sets = get_stratified_sets(churn_features.filter(col('payer_type_cd').isin(payers)),split='payers',undersample=undersample, upsample=upsample)

    nonpayers_sets = get_stratified_sets(churn_features.filter(col('payer_type_cd').isin(non_payers)), split='nonpayers',undersample=undersample, upsample=upsample)

    all_sets = unioned_sets + payers_sets + nonpayers_sets
else:
    all_sets = get_stratified_sets(churn_features, split=None, undersample=undersample, upsample=upsample)

In [0]:

"""

split_payers = True


if split_payers:
    payers = ['P','S']
    non_payers= ['N']

    # Split nonpayer and payer separately
    strat_train_payer, strat_val_payer, strat_test_payer = stratified_sampling(churn_features.filter(col('payer_type_cd').isin(payers)), P_TEST=0.2, P_VAL=0.2)

    strat_train_nonpayer, strat_val_nonpayer, strat_test_nonpayer = stratified_sampling(churn_features.filter(col('payer_type_cd').isin(non_payers)), P_TEST=0.2, P_VAL=0.2)


    strat_train, strat_val, strat_test = stratified_sampling(churn_features, P_TEST=0.2, P_VAL=0.2) 
    

### This is just the stratified sampling, without a payer/nonpayer split
else:
    strat_train, strat_val, strat_test = stratified_sampling(churn_features, P_TEST=0.2, P_VAL=0.2)    

#Base info 
stratified_info =  {
                        'sampling':'stratified', 
                        'split':None,
                        'P_TEST':0.2,
                        'P_VAL':0.2,
                        'P_TRAIN':0.6,
                        'strategy':None
                    }

all_sets = [{
                'dataset': strat_train,
                'dataset_info': {**stratified_info, 'type':'training'},
                'relavent_test_set':strat_test,
                'relavent_val_set':strat_val,
            },      
            #{
            #    'dataset': strat_val, 
            #    'dataset_info':{**stratified_info, 'type':'validation'}
            #}, 
            #{
            #    'dataset': strat_test,
            #    'dataset_info': {**stratified_info, 'type':'testing'}
            #}
        ]

#stratified_sets= all_sets

split_info = {
    'sampling':'stratified', 
    'P_TEST':0.2,
    'P_VAL':0.2,
    'P_TRAIN':0.6,
},

if split_payers:

    # non_payer
    non_payer_slug = {
                'dataset':strat_train_nonpayer, 
                'dataset_info':{
                    **stratified_info,
                    'split':'nonpayer', 
                    'type':'training'
                    },
                'relavent_test_set': strat_test_nonpayer,
                'relavent_val_set': strat_val_nonpayer,
            }
    all_sets.append(non_payer_slug)
    
    #payers
    payers_slug = {
                'dataset':strat_train_payer,
                'dataset_info':{
                    **stratified_info, 
                    'split':'payer', 
                    'type':'training'
                    },
                'relavent_test_set': strat_test_payer,
                'relavent_val_set': strat_val_payer,
            }
    all_sets.append(payers_slug)

"""

In [0]:
"""
# Upsampling
upsample=True
undersample = True

#upsample="True"
#undersample = "True"

# Now deal with player splits
if split_payers: #if split_payers=="True"
    if upsample==True: #if upsample=="True"
        strat_train_up_payer, train_up_payer_info = upsample_minority(strat_train_payer,split='payer')
        all_sets.append(
            {
                'dataset':strat_train_up_payer, 
                'dataset_info':train_up_payer_info,
                'relevant_test_set': strat_test_payer,
                'relevant_val_set': strat_val_payer,
            }
        )


        strat_train_up_nonpayer, train_up_nonpayer_info = upsample_minority(strat_train_nonpayer,split='nonpayer')
        all_sets.append(
            {
                'dataset':strat_train_up_nonpayer, 
                'dataset_info':train_up_nonpayer_info,
                'relevant_test_set': strat_test_nonpayer,
                'relevant_val_set': strat_val_nonpayer,
            }
        )
        

        # Union for upsampling - remove splits
        strat_train_up = strat_train_up_payer.union(strat_train_up_nonpayer)
        train_up_info = {'majority_label': 0,
                            'minority_label': 1,
                            'strategy':'sampling',
                            'sampling': 'upsample',
                            'split':None,
                            'type':"training"}
        all_sets.append(
            {
                'dataset':strat_train_up, 
                'dataset_info':train_up_info,
                'relevant_test_set': strat_test,
                'relevant_val_set': strat_val,

            }
        )


    if undersample==True: #if undersample=="True":
        strat_train_under_payer, train_under_payer_info = undersample_majority(strat_train_payer,split='payer') 
        all_sets.append(
            {
                'dataset':strat_train_under_payer, 
                'dataset_info':train_under_payer_info,
                'relevant_test_set':strat_test_payer,
                'relevant_val_set':strat_val_payer,
            }
        )



        strat_train_under_nonpayer, train_under_nonpayer_info = undersample_majority(strat_train_nonpayer,split='nonpayer')
        all_sets.append(
            {
                'dataset':strat_train_under_nonpayer, 
                'dataset_info':train_under_nonpayer_info,
                'relevant_test_set':strat_test_nonpayer,
                'relevant_val_set':strat_val_nonpayer,
            }
        )

        # Union for undersampling - remove split
        strat_train_under = strat_train_under_payer.union(strat_train_under_nonpayer)
        train_under_info = {
                                'majority_label': 0,
                                'minority_label': 1,
                                'strategy':'sampling',
                                'sampling':'undersample',
                                'split':None,
                                'type':"training",
                            }
        
        all_sets.append(
            {
                'dataset':strat_train_under, 
                'dataset_info':train_under_info,
                'relevant_test_set':strat_test,
                'relevant_val_set':strat_val,
            })

else:
    if upsample==True: # if upsample=="True"
        ## Upsampling
        strat_train_up, train_up_info = upsample_minority(strat_train)
        train_up_info['type']='training'
        all_sets.append(
            {
                'dataset':strat_train_up, 
                'dataset_info':train_up_info,
                'relevant_test_set':strat_test,
                'relevant_val_set':strat_val,
            }
        )

    if undersample==True: # if undersample=="True"
        ## Undersampling
        strat_train_under, train_under_info = undersample_majority(strat_train)
        train_under_info['type'] = 'training'
        all_sets.append(
            {
                'dataset':strat_train_under, 
                'dataset_info':train_under_info,
                'relevant_test_set':strat_test,
                'relevant_val_set':strat_val
            }
        )
"""

In [0]:
def get_safe_works_repartition(df):

    conf = spark.sparkContext.getConf()
    cores_per_exec = int(conf.get("spark.executor.cores", "1"))
    # executors = all JVMs except the driver
    num_exec = spark._jsc.sc().getExecutorMemoryStatus().size() - 1
    slots = __builtins__.max(1, cores_per_exec * __builtins__.max(1, num_exec))

    safe_workers = __builtins__.max(1, __builtins__.min(slots, 32))  # cap if you like
    df = df.repartition(safe_workers)  # match partitions to workers

    return df, safe_workers

In [0]:
# Unecessary because we only have 1 worker?

for val in all_sets:
    repartitioned, safe_workers = get_safe_works_repartition(val['dataset'])
    val['dataset']=repartitioned


In [0]:
# For XGBoost we don't need to standarize any features
indexers = [StringIndexer(inputCol=x, 
                          outputCol=x+"_index", 
                          handleInvalid="keep") for x in string_features]
indexed_cols = [ x+"_index" for x in string_features]

inputs = numerical_features + indexed_cols

vec_assembler = VectorAssembler(inputCols=inputs, outputCol='features', handleInvalid='keep')


# Now add the xgb model to the pipeline
#eval_metrics = ["auc", "aucpr", "logloss"]
eval_metrics = ["aucpr"]


safe_workers=1

xgb = SparkXGBClassifier(
  features_col = "features",
  label_col = "label",
  num_workers = safe_workers,
  eval_metric = eval_metrics,
)

# Set the pipeline stages for the entire process
pipeline = Pipeline().setStages(indexers+[vec_assembler]+ [xgb])

In [0]:
'''
spec = {
    # "n_estimators": ("int_uniform", 50, 1000),
    "max_depth":  ("int_uniform", 8, 8), # Originally "max_depth":  ("int_uniform", 4, 8),
    #"gamma": ("uniform", 0.0, 0.2),
    #"learning_rate": ("uniform", 0.01,0.5),
    # "subsample": ("uniform", 0.7, 0.9),
    #"colsample_bytree": ("uniform", 0.7, 0.9),
    # "min_child_weight": ("int_uniform", 1, 5),
    #"reg_alpha": ("uniform", 0.0, 0.1),
    #"reg_lambda": ("int_uniform", 1, 10),
    #"colsample_bylevel": ("uniform", 0, 0.6),
}

# build random xgb param map
xgb_param_maps = build_random_param_maps(xgb, spec, n_samples=40, seed=7)


cv_xgb = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=xgb_param_maps,
    numFolds=5,
    seed=7,
    # parallelism=150
)

'''

In [0]:
spec = {
    "max_depth":  ("int_uniform", 8, 8), # Originally "max_depth":  ("int_uniform", 4, 8),
}

# build random xgb param map
xgb_param_maps = build_random_param_maps(xgb, spec, n_samples=40, seed=7)


cv_xgb = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=xgb_param_maps,
    numFolds=5,
    seed=7,
    # parallelism=150
)

In [0]:
import logging

# Set the MLflow logging level to INFO
logger = logging.getLogger("mlflow")
logger.setLevel(logging.INFO)

In [0]:
# Plus other useful information ... can actually do this elsewhere or whatever.. but this works for now
extra_tags = { 
                'label': LABEL_COL,
                'safe_workers':safe_workers, 
                'date_filter':DATE_FILTER, 
                'date_interval':DATE_INTERVAL, 
                'source_table_name':FEATURES_TABLE_NAME
            }

for val in all_sets:
    val['extra_tags']= {**extra_tags, **val['dataset_info']}

In [0]:
mlflow.set_experiment(EXPERIMENT_NAME)

In [0]:
### Add run_id to be like f"XGB_{days_back}_{split}_{sampling}_{churn_label}""

In [0]:
#temp = all_sets[0]['dataset'].select('total_rounds_l7d','label').limit(20).toPandas()

In [0]:
"""
temp = pd.DataFrame([[x,random.random()] for x in range(20)],columns = ['features','feature_importances']).sort_values('feature_importances',ascending=False)

"""

In [0]:
"""
def test_function(df):

    features = pd.Series(df['feature_importances'], index=df['features'])

    fig,ax = plt.subplots()
    features.plot.barh(ax=ax)
    ax.set_title("XGBoost Feature Importances")
    ax.set_ylabel("Feature Name")
    plt.xlabel("Feature Importance")

    return fig

fig_importances = test_function(temp)"""

In [0]:
"""fig = plt.figure()
plt.barh(temp['features_importances'],temp['feature_importances'])
plt.xlabel("Feature Importance")
plt.ylabel("Feature Name")
plt.title("XGBoost Feature Importances")
#plt.tight_layout()"""

In [0]:
"""
import pandas as pd

forest_importances = pd.Series(temp['feature_importances'], index=temp['features'])

fig, ax = plt.subplots()
forest_importances.plot.barh(ax=ax)
ax.set_title("Feature importances using MDI")
ax.set_ylabel("Mean decrease in impurity")
fig.tight_layout()"""

In [0]:
all_sets[6]

In [0]:
results_list = []
best_estimators = []
#need index 6,7,8 still


for i in all_sets[6:]:
    #print(f"Starting run on set {ix+7} out of {len(all_sets)}")
    #print(f"With dataset and run info:", i["extra_tags"])

    ### Strat train up: 
    results, best_estimator = run_spark_ml_training(estimator = cv_xgb, 
                        train_df = i["dataset"], 
                        test_df = i["relevant_test_set"], 
                        val_df = i["relevant_val_set"], 
                        extra_tags = i["extra_tags"])
    results_list.append(results)
    best_estimators.append(best_estimator)



In [0]:
mlflow.end_run()

In [0]:
###RNING mlflow.utils.environment: Encountered an unexpected error while inferring pip requirements (model URI: dbfs:/databricks/mlflow-tracking/762753878692720/76b0863a5e5947378a19c9fec1384fcd/artifacts/models/best_model/sparkml, flavor: spark). Fall back to return ['pyspark==4.0.0']. Set logging level to DEBUG to see the full traceback. 


In [0]:
mlflow.spark.load_model(

In [0]:
### 

# import mlflow

# model_uri = 'runs:/77facbd0a5f044ce807b92e5a9df96e3/best_model/spark-model'


In [0]:
# run_spark_ml_training(cv_xgb, strat_train_up, test_df = strat_test, val_df = strat_val, extra_tags = train_up_info)

In [0]:
#####

# Modify and test the tracking functions (log all to the mlflow experiment, vs. the notebook)




###