In [0]:
dbutils.library.installPyPI('h2o-pysparkling-2.4')
from pysparkling import * # Import PySparkling
import h2o
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
from pyspark.sql.functions import *
import warnings
warnings.filterwarnings("ignore")
from pysparkling.ml import H2OAutoML
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
from pyspark.sql.types import *
from datetime import datetime,date 

In [0]:
hc = H2OContext.getOrCreate()

In [0]:
def read_data():
  '''Read all locations' train data'''
  all_data_df=spark.table('churn.train_data_2019_2020_all_loc')
  return all_data_df

def filter_data(all_data_df, location_id):
  '''Filter data by location id and prepare the dataframe for training'''

  train_df=all_data_df[all_data_df.location_id==location_id]
  train_df=train_df.drop('user_id','location_id','location_name','updated_on') # drop the columns not needed in train set
  train_df=train_df.withColumnRenamed('num_days_before_end2019_purchased','num_days_before_end_purchased') # renaming
  train_df=train_df.withColumn('tag_churn',train_df["tag_churn"].cast(StringType())) # type coversion
  return train_df 

def get_feature_importance(leader_board_pd):
  '''Get features importance of selected model bi model_id and plot them'''
  
  model_id=leader_board_pd["model_id"][2]
  m = h2o.get_model(model_id)
  
  print('Plot Feature Importance---')
  #print(m.varimp_plot()) #Variable importance
  
  print('Feature importance Values------')
  print(m.varimp(use_pandas=True))
  
  print(f'*** PLOTTING {model_id}***')
  #print(m.plot())
  
def get_metrics(leader_board_pd):
  '''compute the metrics of the model trained by id '''
  
  model_id=leader_board_pd["model_id"][0]
  model = h2o.get_model(model_id)
  
  print("Model Perfomance---")
  print(model.model_performance())
  print("--------------------")
  
  threshold=model.find_threshold_by_max_metric(metric='F1')
  precision=(model.precision(thresholds=threshold)[0][1])*100
  recall=(model.recall(thresholds=threshold)[0][1])*100
  accuracy=(model.accuracy(thresholds=threshold)[0][1])*100
  f1=model.F1(thresholds=threshold)[0][1]
  specificity=100*(model.tnr(thresholds=threshold)[0][1])
  mcc=model.mcc(thresholds=threshold)[0][1]
  
  print('Saving these metrics for leader model')
  print(f'Threshold to consider for mx f1 metric based is {threshold}')
  print(f'The model sensitivity/recall or total positive rate ie model will catch {recall} % of customers who will actually churn')
  print(f'The model specificity or True negative rate ie model will catch {specificity} % of customers who will actually Notchurn')
  print(f'Precision ie Out of all customers it predicted as it will churn {precision} % of them will actually churn')      
  print(f'Overall accuracy is {accuracy}%')
  print(f'Max F1 score {f1}')
  print(f'Mcc score is {mcc}')
  
  return threshold,precision,recall,accuracy,f1,specificity,mcc
  
def save_model(model, location_name, updated_on, best_model_df):
  '''save the trained leader model and metrics'''
  
  # SAVE MODEL
  location_name = location_name.replace(" ", "")
  print('Saving Model For:' , location_name)
  path_tomodel="/dbfs/FileStore/df/Churn_Models/h2o_leader_model_train_"+location_name+"_"+str(updated_on)+".model"
  model.write().overwrite().save(path_tomodel)
  
  # SAVE METRICS
 
  best_model_df_sp = spark.createDataFrame(best_model_df)
  best_model_df_sp.write.format('delta').mode('append').saveAsTable('churn.h2o_leader_model_train_metrics')
  
  
def train_model(train_df,location_id,location_name):
  '''Train the model, get the leaderboard and save them'''
  
  InputdataTransformer = SQLTransformer(statement="SELECT * FROM __THIS__")
  automlEstimator = H2OAutoML(maxModels=26,seed=1,splitRatio=0.75,nfolds=5,balanceClasses=True,labelCol="tag_churn")
  #Building Pipeline of Inputs and Estimator
  pipeline = Pipeline(stages=[InputdataTransformer, automlEstimator])
  
  # Train thr model 
  model = automlEstimator.fit(train_df)
  
  # Get leaderboard 
  lb = automlEstimator.getLeaderboard()
  return model, lb
  
def main_training_function():
  '''main function retrieving data for all locations, training the models and saving them'''
  
  # Get all the location ids and names
  inputs_df=spark.sql('select * from churn.all_location_inputs_train_test')
  all_locations_ids = [x["location_id"] for x in inputs_df.collect()]
  print('Total Locations=',len(all_locations_ids))
  
  # Get all locations train data
  all_data_df=read_data()
    
  # Train seprate models for each location in this loop
  for loc_id in all_locations_ids:
    if int(loc_id)>0:
      location_id = loc_id
      location_name = inputs_df.where(inputs_df.location_id == loc_id).select('location_name').collect()[0]['location_name']
      print(f'Start Training Model For Location ID: {loc_id} and Location Name: {location_name}')

      # Filter and prepare train data by location_id
      train_df=filter_data(all_data_df, location_id)

      # Model training and saving
      model, leader_board = train_model(train_df,location_id,location_name)

      leader_board_pd=leader_board.toPandas()

      get_feature_importance(leader_board_pd)
      threshold,precision,recall,accuracy,f1,specificity,mcc = get_metrics(leader_board_pd) 

      updated_on=date.today() 
      best_model_df=leader_board_pd[:1]
      best_model_df['location_name']=location_name
      best_model_df['updated_on']=updated_on
      best_model_df['threshold']=threshold
      best_model_df['precision']=precision
      best_model_df['recall']=recall
      best_model_df['accuracy']=accuracy
      best_model_df['f1']=f1
      best_model_df['specificity']=specificity
      best_model_df['mcc']=mcc

      save_model(model, location_name, updated_on,best_model_df)
    
  print('DONE....')
    
# def main_train_function_for_debugging():
#   '''Use this function to debug/test for one hardcoded location only'''  
#   # HARDCODE THE INPUTS HERE
#   location_id = 9
#   location_name = 'testrun_for_Qatar'
#   print(f'Start Training Model For Location ID: {location_id} and Location Name: {location_name}')

#   # Get all locations train data
#   all_data_df=read_data()

#   # Filter and prepare train data by location_id
#   train_df=filter_data(all_data_df, location_id)

#  # Model training and saving
#   model, leader_board = train_model(train_df,location_id,location_name)
  
#   try:
#     model_id=leader_board["model_id"][2]
#     print('model_id ',model_id)
#     feature_imp_model = get_feature_importance(model_id)

#     threshold,precision,recall,accuracy,f1,specificity,mcc = get_metrics(model) 
#     leader_board = leader_board.toPandas()
#     updated_on=date.today() 
#     best_model_df=leader_board[:1]
#     best_model_df['location_name']=location_name
#     best_model_df['updated_on']=updated_on
#     best_model_df['threshold']=threshold
#     best_model_df['precision']=precision
#     best_model_df['recall']=recall
#     best_model_df['accuracy']=accuracy
#     best_model_df['f1']=f1
#     best_model_df['specificity']=specificity
#     best_model_df['mcc']=mcc
#   except Exception as e:
#     print(e)
#   try:    
#     save_model(model, location_name, updated_on,best_model_df)
#   except Exception as e:
#     print('saving: ', e)
  
#   print('DONE....')
#   return model, leader_board#best_model_df, feature_imp_model


In [0]:
main_training_function()
# model, leader_board = main_train_function_for_debugging() # for testing/debugging purpose

In [0]:
#predicted = model.transform(for_predictions)
# updated_on=date.today()

In [0]:
# %sql
# DROP TABLE IF EXISTS churn.h2o_leader_model_train_metrics