this multilogistic regression finds inspiration in <https://runawayhorse001.github.io/LearningApacheSpark/classification.html>

begins at 11.2. Multinomial logistic regression

<b> to do: create a pipeline => better for MLflow </b>

In [0]:
import sys
sys.version_info

# Training the model

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.dbutils import DBUtils
import numpy as np
import pandas as pd
# enables use in dbutils
dbutils = DBUtils(spark)
# display datframes
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark.conf.set("spark.databricks.io.cache.enabled", True) # delta caching
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) # adaptive query execution for skewed data
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # setting treshhold on broadcasting 
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # conversion between pyspark and pandas dataframe
spark.conf.set("spark.databricks.optimizer.rangeJoin.binSize", 1000) #range optimizer

In [0]:
#load in he necessary function
from pyspark.sql.functions import isnan, when, count, col
#data preprocessen 
df_train  = spark.read.format('delta').load(
 "dbfs:/mnt/bastore/modelling/recommendations/data_for_model/train")
df_train.count()



In [0]:

df_train = df_train.select("service", "Country","Seniority_max", "Product_sum", "Upsell_avg", "SubBreak_sum", "Retention_sum", "RetIntention_sum", "Discount_sum", "Payments_sum", "Fline", "Hline", "ExHline", "ExFline", "Seniority_Hline", "Seniority_Fline", "Seniority_ExFline", "Seniority_ExHline")
# drop nan/null values

df_train1 = df_train.na.drop(how="any")

df_train1.groupBy('service').count()

service,count
PAS,6089
Freebook,1485
Online,15522


In [0]:
#create a balanced dataset
newdata1 = df_train1.filter((df_train1.service == "Online")|(df_train1.service == "Freebook"))
newdata2 = df_train1.filter((df_train1.service == "PAS") | (df_train1.service == "Freebook"))

X1 = newdata1.drop("service")
Y1 = newdata1.select("service")
X2 = newdata2.drop("service")
Y2 = newdata2.select("service")



In [0]:
# resample(newdata2, 1, "service", "Freebook")

pos = newdata1.filter(col('service') == "Freebook")
neg = newdata1.filter(col('service') != "Freebook")
total_pos = pos.count()
total_neg = neg.count()
fraction=float(total_pos*1)/float(total_neg)
# data_balanced_over1

In [0]:
from pyspark.sql.functions import col
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction=float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(False,fraction)
    return sampled.union(pos)
 
data_balanced_over1 = resample(newdata1, 1, "service", "Freebook")
data_balanced_over2 = resample(newdata2, 1, "service", "Freebook")

newdata3 = data_balanced_over1.filter(col("service") == "Online")

Train = newdata3.union(data_balanced_over2)

In [0]:
#index country
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer


featureIndexer = StringIndexer(inputCol="Country", outputCol="CountryIndex").fit(Train)

Train1 = featureIndexer.transform(Train).drop('Country')

features need to be in a vector

In [0]:
#option 1 following website
#transform dataset to dataframe
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def transData(data): 
  return data.rdd.map(lambda r: [Vectors.dense(r[1:]),r[0]]).toDF(['features', 'label'])

In [0]:
Train2 =transData(Train1)
Train2.select("features", "label").show(10, False)

In [0]:
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(Train2)
Train3=labelIndexer.transform(Train2)



In [0]:
from pyspark.ml.classification import LogisticRegression
import numpy as np
from scipy.sparse import csr_matrix
logr = LogisticRegression(featuresCol='features', labelCol='indexedLabel')

In [0]:
# Convert indexed labels back to original labels.
#labelConverter = IndexToString(inputCol="prediction", #outputCol="predictedLabel", labels=labelIndexer.labels)
#Train4 = labelConverter.transform(Train3)
#Train4.show()

In [0]:
model = logr.fit(Train3)

In [0]:
model.explainParam('labelCol')

# model evaluation

In [0]:
print(model.coefficientMatrix)

In [0]:
trainingSummary = model.summary

In [0]:
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

In [0]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [0]:
predictions = model.transform(Train3)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
prediction = labelConverter.transform(predictions)
prediction.show(5, False)

In [0]:
import matplotlib.pyplot as plt
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [0]:
class_temp = predictions.select("label").groupBy("label")\
                        .count().sort('count', ascending=False).toPandas()
class_temp = class_temp["label"].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)


In [0]:
from sklearn.metrics import confusion_matrix
y_true = prediction.select("label")
y_true = y_true.toPandas()

y_pred = prediction.select("predictedLabel")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
cnf_matrix

In [0]:
#plt.figure()
#plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
#                      title='Normalized confusion matrix')
#
#plt.show()

test

In [0]:
#data preprocessen 
df_test  = spark.read.format('delta').load(
 "dbfs:/mnt/bastore/modelling/recommendations/data_for_model/test")
df_test1 = df_test.na.fill(0)

In [0]:
df_test

ConsumerID,FK_Time,FK_Time_Date,FK_Time_Insert,FK_Consumer,Country,Seniority_max,Product_sum,Upsell_avg,SubBreak_sum,RetIntention_sum,Retention_sum,Web_sum_Y1,Web_sum_Y2,Web_sum_DY1Y2,PAS_sum_Y1,PAS_sum_Y2,PAS_sum_DY1Y2,Freebook_sum,Newsletter_sum,Payments_sum,Discount_sum,Payments_sum_Y4,Payments_change,Web_sum_Y1_Fin,Web_sum_Y1_PortF,FB_Heal,FB_Fin,Fline,ExFline,Seniority_ExFline,Web_sum_Y1_Hline,Web_sum_Y2_Hline,Web_sum_DY1Y2_Hline,Newsletter_sum_Hline,Hline,Seniority_Fline,Newsletter_sum_Fline,Seniority_Hline,Seniority_ExHline,ExHline,Telemarketing_sum,ComMail_sum,Survey_sum,CampaignType_Fline,FK_Time2,webv_bool_inf,freeb_bool_inf,pas_Bool_inf,ServiceGroup
,20231201,,2024-01-14 18:33:...,481,BE,375,2,-225,0,0,0,184,,,0,,,2,0,273,0,,10,,,,,0,0,0,,,,,0,0,,0,6,1,,,,,20231201,1,1,0,Ni
,20231201,,2024-01-14 18:33:...,673,BE,578,2,0,0,0,0,195,,,0,,,1,0,273,0,,10,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,1,ALL
,20231201,,2024-01-14 18:33:...,1005,BE,373,2,0,0,0,0,0,,,0,,,0,0,270,0,,8,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,0,Ni
,20231201,,2024-01-14 18:33:...,1417,BE,74,2,0,0,0,0,0,,,3,,,1,0,273,0,,10,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,1,ALL
,20231201,,2024-01-14 18:33:...,1829,BE,247,1,0,0,0,0,85,,,8,,,1,0,173,0,,4,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,1,ALL
,20231201,,2024-01-14 18:33:...,2096,BE,198,4,0,0,2,0,0,,,0,,,0,0,426,0,,8,,,,,0,1,2,,,,,1,0,,63,0,0,,,,,20231201,1,0,1,Ni
,20231201,,2024-01-14 18:33:...,2156,BE,567,1,0,0,0,0,0,,,0,,,0,0,177,0,,11,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,0,1,Ni
,20231201,,2024-01-14 18:33:...,2542,BE,575,1,-41,0,0,0,328,,,15,,,0,0,255,0,,-7,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,1,ALL
,20231201,,2024-01-14 18:33:...,2748,BE,672,4,0,0,1,0,410,,,0,,,2,0,386,0,,67,,,,,0,0,0,,,,,0,0,,0,0,0,,,,,20231201,1,1,1,ALL
,20231201,,2024-01-14 18:33:...,2811,BE,77,4,0,0,0,2,0,,,0,,,5,0,436,0,,1,,,,,0,0,0,,,,,1,0,,77,0,0,,,,,20231201,1,1,0,Ni


In [0]:
from pyspark.sql.types import IntegerType
df_test2 = featureIndexer.transform(df_test1).drop('Country')

df_test3 = df_test2.select( "CountryIndex","Seniority_max", "Product_sum", "Upsell_avg", "SubBreak_sum", "Retention_sum", "RetIntention_sum", "Discount_sum", "Payments_sum", "Fline", "Hline", "ExHline", "ExFline", "Seniority_Hline", "Seniority_Fline", "Seniority_ExFline", "Seniority_ExHline",  col("FK_Consumer").cast(IntegerType()), col("FK_Time").cast(IntegerType()))


In [0]:

def transDataTest(data): 
  return data.rdd.map(lambda r: [Vectors.dense(r[0:17]),r[17], r[18]]).toDF(['features', 'FK_Consumer', 'FK_Time'])
Test =transDataTest(df_test3)
Test_pred = model.transform(Test)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
Test_pred1 = labelConverter.transform(Test_pred)
Test_pred1.show(10, False)

In [0]:
from pyspark.ml.functions import vector_to_array

df_result = Test_pred1.select(col('probability'), col('FK_Consumer').cast('int'), col('FK_Time').cast('int'), col('predictedLabel'))
df_result
df_servicegroup = df_test.select(col('serviceGroup'), col('FK_Consumer'), col('FK_Time'))
df_result1 = df_result.join(df_servicegroup, ["FK_Consumer","FK_Time"])
df_result2 = (df_result1.withColumn("xs", vector_to_array("probability")))
df_result3 = df_result2.select(["FK_Consumer", "FK_Time","serviceGroup", "predictedLabel"]+[col("xs")[i] for i in range(3)])\
  .withColumnRenamed("xs[0]", "Freebook") \
  .withColumnRenamed("xs[1]", "PAS") \
  .withColumnRenamed("xs[2]", "Online") 



In [0]:
df_result3.write.format("delta").mode("overwrite").save("dbfs:/mnt/bastore/modelling/recommendations/results_from_model/results_recoom")