In [231]:
# Import pyspark library
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler, StandardScaler, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics

# import mlflow library
import mlflow
from mlflow.types.schema import Schema, ColSpec
from mlflow.models.signature import ModelSignature, infer_signature
from mlflow.tracking import MlflowClient

from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn import metrics

import os

In [232]:
spark = SparkSession.builder.master('local[*]').getOrCreate()
spark

schema = StructType([ \
    StructField("age", IntegerType(), True),\
    StructField("attrition", StringType(), True),\
    StructField("business_travel", StringType(), True),\
    StructField("daily_rate", IntegerType(), True),\
    StructField("department", StringType(), True),\
    StructField("distance_from_home", IntegerType(), True),\
    StructField("education", IntegerType(), True),\
    StructField("education_field", StringType(), True),\
    StructField("employee_count", IntegerType(), True),\
    StructField("employee_number", IntegerType(), True),\
    StructField("employment_satisfaction", IntegerType(), True),\
    StructField("gender", StringType(), True),\
    StructField("hourly_rate", IntegerType(), True),\
    StructField("job_involvement", IntegerType(), True),\
    StructField("job_level", IntegerType(), True),\
    StructField("job_role", StringType(), True),\
    StructField("marital_status", StringType(), True)\
    StructField("monthly_income", IntegerType(), True)\
    StructField("monthly_rate", IntegerType(), True)\
    StructField("num_companies_worked", IntegerType(), True)\
    StructField("over_18", StringType(), True)\
    StructField("over_time", StringType(), True)\
    StructField("percent_salary_hike", IntegerType(), True)\
    StructField("performance_rating", IntegerType(), True)\
    StructField("relationship_satisfaction", IntegerType(), True)\
    StructField("standard_hours", IntegerType(), True)\
    StructField("stock_option_level", IntegerType(), True)\
    StructField("total_working_years", IntegerType(), True)\
    StructField("training_times_last_year", IntegerType(), True)\
    StructField("work_life_balance", IntegerType(), True)\
    StructField("years_at_company", IntegerType(), True)\
    StructField("years_in_current_role", IntegerType(), True)\
    StructField("years_since_last_promotion", IntegerType(), True)\
    StructField("years_with_curr_manager", IntegerType(), True)\
    ])

In [233]:
df = spark.read.csv("WA_Fn-UseC_-HR-Employee-Attrition.csv",inferSchema=True,header=True)

In [234]:
df.limit(6).toPandas()

Unnamed: 0,Age,Attrition,BusinessTravel,DailyRate,Department,DistanceFromHome,Education,EducationField,EmployeeCount,EmployeeNumber,...,RelationshipSatisfaction,StandardHours,StockOptionLevel,TotalWorkingYears,TrainingTimesLastYear,WorkLifeBalance,YearsAtCompany,YearsInCurrentRole,YearsSinceLastPromotion,YearsWithCurrManager
0,41,Yes,Travel_Rarely,1102,Sales,1,2,Life Sciences,1,1,...,1,80,0,8,0,1,6,4,0,5
1,49,No,Travel_Frequently,279,Research & Development,8,1,Life Sciences,1,2,...,4,80,1,10,3,3,10,7,1,7
2,37,Yes,Travel_Rarely,1373,Research & Development,2,2,Other,1,4,...,2,80,0,7,3,3,0,0,0,0
3,33,No,Travel_Frequently,1392,Research & Development,3,4,Life Sciences,1,5,...,3,80,0,8,3,3,8,7,3,0
4,27,No,Travel_Rarely,591,Research & Development,2,1,Medical,1,7,...,4,80,1,6,3,3,2,2,2,2
5,32,No,Travel_Frequently,1005,Research & Development,2,2,Life Sciences,1,8,...,3,80,0,8,2,2,7,7,3,6


In [235]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- EmployeeCount: integer (nullable = true)
 |-- EmployeeNumber: integer (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- Over18: string (nullable = true)
 |-- OverTime: string 

In [236]:
df.groupBy("Attrition").count().show()

+---------+-----+
|Attrition|count|
+---------+-----+
|       No| 1233|
|      Yes|  237|
+---------+-----+



In [237]:
# Under Sampling
major_df = df.filter(col("Attrition")=="No")
minor_df = df.filter(col("Attrition")=="Yes")

ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

sampled_majority_df = major_df.sample(False, 1/ratio)
df = sampled_majority_df.unionAll(minor_df)
df.groupBy("Attrition").count().show()

ratio: 5
+---------+-----+
|Attrition|count|
+---------+-----+
|       No|  241|
|      Yes|  237|
+---------+-----+



# Over Sampling
major_df = df.filter(col("Attrition")=="No")
minor_df = df.filter(col("Attrition")=="Yes")

ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

a = range(ratio)

oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows 
df = major_df.unionAll(oversampled_df)
df.groupBy("Attrition").count().show()

In [238]:
col_list = ["Age","DailyRate","DistanceFromHome","HourlyRate","JobInvolvement","MonthlyIncome","MonthlyRate","NumCompaniesWorked","PercentSalaryHike","StockOptionLevel","TotalWorkingYears","TrainingTimesLastYear","YearsAtCompany","YearsInCurrentRole","YearsSinceLastPromotion","YearsWithCurrManager"]
col_list_fs = ["Age","DailyRate","DistanceFromHome","JobInvolvement","MonthlyIncome","NumCompaniesWorked","PercentSalaryHike","StockOptionLevel","TotalWorkingYears","TrainingTimesLastYear","YearsInCurrentRole","YearsSinceLastPromotion","YearsWithCurrManager"]
input_columns = col_list
#input_columns
#df.limit(6).toPandas()

dependent_var = "Attrition"

In [239]:
# change label (class variable) to string type to prep for reindexing
# Pyspark is expecting a zero indexed integer for the label column. 
# Just in case our data is not in that format... we will treat it by using the StringIndexer built in method
renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
indexed = indexer.fit(renamed).transform(renamed)

2022/10/16 03:04:07 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '7d62aa42acba488f8e8fcb23b5181b6e', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [240]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it

# Also we will use these lists later on
numeric_inputs = []
string_inputs = []
for column in input_columns:
    # First identify the string vars in your input column list
    if str(indexed.schema[column].dataType) == 'StringType':
        # Set up your String Indexer function
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        # Then call on the indexer you created here
        indexed = indexer.fit(indexed).transform(indexed)
        # Rename the column to a new name so you can disinguish it from the original
        new_col_name = column+"_num"
        # Add the new column name to the string inputs list
        string_inputs.append(new_col_name)
    else:
        # If no change was needed, take no action 
        # And add the numeric var to the num list
        numeric_inputs.append(column)

In [241]:
# Treat for skewness
# Flooring and capping
# Plus if right skew take the log +1
# if left skew do exp transformation
# This is best practice

# create empty dictionary d
d = {}
# Create a dictionary of quantiles from your numeric cols
# I'm doing the top and bottom 1% but you can adjust if needed
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number

#Now check for skewness for all numeric cols
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # If skewness is found,
    # This function will make the appropriate corrections
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

MonthlyIncome has been treated for positive (right) skewness. (skew =) 1.5239105561644637 )
StockOptionLevel has been treated for positive (right) skewness. (skew =) 1.2560670945509036 )
TotalWorkingYears has been treated for positive (right) skewness. (skew =) 1.2796925402059103 )
YearsAtCompany has been treated for positive (right) skewness. (skew =) 1.9805957824792155 )
YearsInCurrentRole has been treated for positive (right) skewness. (skew =) 1.0671915805104129 )
YearsSinceLastPromotion has been treated for positive (right) skewness. (skew =) 1.975622411574525 )


In [242]:
# Now check for negative values in the dataframe. 
# Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
# Note: we only need to check the numeric input values since anything that is indexed won't have negative values

# Calculate the mins for all columns in the df
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
# Create an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias("mins")) 
# Collect golobal min as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect() 
# Slice to get the number itself
df_minimum = df_minimum[0][0] 

# If there are ANY negative vals found in the df, print a warning message
if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")

No negative values were found in your dataframe.


In [243]:
# Before we correct for negative values that may have been found above, 
# We need to vectorize our df
# becauase the function that we use to make that correction requires a vector. 
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [244]:
output.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[49.0,279.0,8.0,6...|  0.0|
|[32.0,1005.0,2.0,...|  0.0|
|[38.0,371.0,2.0,4...|  0.0|
|[24.0,673.0,11.0,...|  0.0|
|[53.0,1282.0,5.0,...|  0.0|
|[30.0,125.0,9.0,8...|  0.0|
|[35.0,1229.0,8.0,...|  0.0|
|[26.0,1443.0,23.0...|  0.0|
|[37.0,1115.0,1.0,...|  0.0|
|[36.0,1223.0,8.0,...|  0.0|
|[32.0,548.0,1.0,6...|  0.0|
|[36.0,132.0,6.0,5...|  0.0|
|[35.0,776.0,1.0,3...|  0.0|
|[46.0,945.0,5.0,8...|  0.0|
|[23.0,541.0,2.0,6...|  0.0|
|[32.0,1093.0,6.0,...|  0.0|
|[24.0,1353.0,3.0,...|  0.0|
|[58.0,682.0,10.0,...|  0.0|
|[32.0,827.0,1.0,7...|  0.0|
|[37.0,1040.0,2.0,...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [245]:
# Create the mix max scaler object 
# This is what will correct for negative values
# I like to use a high range like 1,000 
#     because I only see one decimal place in the final_data.show() call
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

2022/10/16 03:04:11 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'bd78db57de47446e8d0261e00b77aafa', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


Features scaled to range: [0.000000, 1000.000000]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[738.095238095238...|
|  0.0|[333.333333333333...|
|  0.0|[476.190476190476...|
|  0.0|[142.857142857142...|
|  0.0|[833.333333333333...|
|  0.0|[285.714285714285...|
|  0.0|[404.761904761904...|
|  0.0|[190.476190476190...|
|  0.0|[452.380952380952...|
|  0.0|[428.571428571428...|
|  0.0|[333.333333333333...|
|  0.0|[428.571428571428...|
|  0.0|[404.761904761904...|
|  0.0|[666.666666666666...|
|  0.0|[119.047619047619...|
|  0.0|[333.333333333333...|
|  0.0|[142.857142857142...|
|  0.0|[952.380952380952...|
|  0.0|[333.333333333333...|
|  0.0|[452.380952380952...|
+-----+--------------------+
only showing top 20 rows



In [246]:
train,test = final_data.randomSplit([0.7,0.3])

In [247]:
train.count()

347

In [248]:
test.count()

131

In [249]:
# First - Read in dependencies
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [250]:
# Set up our evaluation objects
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction') #labelCol='label'
# Bin_evaluator = BinaryClassificationEvaluator() #labelCol='label'
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
MC_evaluator_acc = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
MC_evaluator_f1 = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')

In [251]:
mlflow.set_tracking_uri(uri="http://127.0.0.1:8001/")

mlflow.set_experiment(experiment_name = "employee_attrition_log_model")
client = MlflowClient()

In [252]:
# First tell Spark which classifier you want to use
mlflow.pyspark.ml.autolog()

with mlflow.start_run(run_name="logistic_regression") as run:
    classifier = LogisticRegression()

    # Then Set up your parameter grid for the cross validator to conduct hyperparameter tuning
    paramGrid = (ParamGridBuilder() \
                .addGrid(classifier.regParam, [0.1, 0.01]) \
                .addGrid(classifier.maxIter, [10, 15,20])
                .build())

    # Then set up the Cross Validator which requires all of the following parameters:
    crossval = CrossValidator(estimator=classifier,
                            estimatorParamMaps=paramGrid,
                            evaluator=MC_evaluator,
                            numFolds=3) # 3 + is best practice, but its just taking more times to train

    # Then fit your model
    fitModel = crossval.fit(train)

    # Collect the best model and
    # print the coefficient matrix
    # These values should be compared relative to eachother
    # And intercepts can be prepared to other models
    BestModel = fitModel.bestModel
    print("Intercept: " + str(BestModel.interceptVector))
    print("Coefficients: \n" + str(BestModel.coefficientMatrix))

    # You can extract the best model from this run like this if you want
    LR_BestModel = BestModel

    # Next you need to generate predictions on the test dataset
    # fitModel automatically uses the best model 
    # so we don't need to use BestModel here
    predictions = fitModel.transform(test)

    # Now print the accuracy rate of the model or AUC for a binary classifier
    accuracy = (MC_evaluator_acc.evaluate(predictions))*100
    print("accuracy:", accuracy)
    f1_score = (MC_evaluator_f1.evaluate(predictions))
    print("f1 score:", f1_score)
    #weighted_precision = MC_evaluator_prec.evaluate(predictions)
    #print("weighted precision:", weighted_precision)
    #weighted_recall = MC_evaluator_rec.evaluate(predictions)
    #print("weighted recall:", weighted_recall)
    #weighted_true_positive_rate = MC_evaluator_wtpr.evaluate(predictions)
    #print("weighted true positive rate:", weighted_true_positive_rate)
    #weighted_false_positice_rate = MC_evaluator_wfpr.evaluate(predictions)
    #print("weighted false positive rate:", weighted_false_positive_rate)
    auc = (Bin_evaluator.evaluate(predictions))
    print("auc:", auc)

    # Load the Summary
    trainingSummary = LR_BestModel.summary

    accuracy = trainingSummary.accuracy
    falsePositiveRate = trainingSummary.weightedFalsePositiveRate
    truePositiveRate = trainingSummary.weightedTruePositiveRate
    fMeasure = trainingSummary.weightedFMeasure()
    precision = trainingSummary.weightedPrecision
    recall = trainingSummary.weightedRecall

    ########### Track results in MLflow UI ################

    y_true = predictions.select(['label']).collect()
    y_pred = predictions.select(['prediction']).collect()
    y_pred_proba = predictions.select(['probability']).collect()

    print(classification_report(y_true, y_pred))
    print(confusion_matrix(y_true, y_pred))

    conf_matrix= confusion_matrix(y_true, y_pred)

    mlflow.spark.log_model(artifact_path = "model",
            spark_model = BestModel,
            input_example = df.limit(4).toPandas(),
            #code_paths = 'D:\testcase08\ready monitoring\atp_small_user_log.ipynb',
            #signature = infer_signature(test, fitModel.transform(test)),
            #signature = ModelSignature(inputs = input_schema, outputs = output_schema),
            registered_model_name = "employee_attrition_logreg")

    # Extract params of Best Model
    paramMap = BestModel.extractParamMap()

    # Log parameters to the client
    for key, val in paramMap.items():
        if 'maxIter' in key.name:
            client.log_param(run.info.run_id, "Max Iter", val)
    for key, val in paramMap.items():
        if 'regParam' in key.name:
            client.log_param(run.info.run_id, "Reg Param", val)

    #client.log_artifact(run.info.run_id, "Confusion Matrix", conf_matrix)

    mlflow.log_artifact("tugbes_paper_baru.ipynb")
    #mlflow.log_artifact(conf_matrix)

    # Log metrics to the client
    client.log_metric(run.info.run_id, "Accuracy", accuracy)
    client.log_metric(run.info.run_id, "F1 Score", f1_score)
    client.log_metric(run.info.run_id, "AUC", auc)
    #client.log_metric(run.info.run_id, "Weighted Precision", weighted_precision)
    #client.log_metric(run.info.run_id, "Weighted Recall", weighted_recall)
    #client.log_metric(run.info.run_id, "Weighted True Positive Rate", weighted_true_positive_rate)
    #client.log_metric(run.info.run_id, "Weighted False Positive Rate", weighted_false_positive_rate

    #test_1 = test.toPandas()

    #mlflow.whylogs.log_pandas(test_1)

    # Set a runs status to finished (best practice)
    client.set_terminated(run.info.run_id)



Intercept: [3.805882116296268]
Coefficients: 
DenseMatrix([[-8.41700043e-04, -5.49173546e-04,  1.56483716e-03,
               1.21430249e-04, -1.80879249e-03, -1.52565485e-03,
              -1.92388482e-04,  1.29247454e-03,  5.23021079e-05,
              -1.26579340e-03, -7.94031107e-04, -1.15757783e-03,
              -2.18807040e-03, -7.51426334e-04,  1.74983546e-03,
              -6.44618010e-04]])
accuracy: 67.17557251908397
f1 score: 0.672176624768027
auc: 0.676906779661017
              precision    recall  f1-score   support

         0.0       0.61      0.73      0.67        59
         1.0       0.74      0.62      0.68        72

    accuracy                           0.67       131
   macro avg       0.68      0.68      0.67       131
weighted avg       0.68      0.67      0.67       131

[[43 16]
 [27 45]]


Registered model 'employee_attrition_logreg' already exists. Creating a new version of this model...
2022/10/16 03:04:33 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: employee_attrition_logreg, version 14
Created version '14' of model 'employee_attrition_logreg'.


In [253]:
# zip input_columns qith feature importance scores and create df

# First convert featureimportance scores from numpy array to list
coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = []
for x in coeff_array[0]:
    coeff_scores.append(float(x))
# Then zip with input_columns list and create a df

#data_schema = [StructField("feature", StringType(), True),StructField("coeff", FloatType(), True)]
#final_struc = StructType(fields=data_schema)
#column = input_columns
#result = spark.createDataFrame(zip(input_columns,coeff_scores), column)

input_columns
#result.show()
#result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
#result.show(100)

['Age',
 'DailyRate',
 'DistanceFromHome',
 'HourlyRate',
 'JobInvolvement',
 'MonthlyIncome',
 'MonthlyRate',
 'NumCompaniesWorked',
 'PercentSalaryHike',
 'StockOptionLevel',
 'TotalWorkingYears',
 'TrainingTimesLastYear',
 'YearsAtCompany',
 'YearsInCurrentRole',
 'YearsSinceLastPromotion',
 'YearsWithCurrManager']

In [254]:
coeff_scores

[-0.0008417000425776801,
 -0.0005491735462164814,
 0.001564837163566775,
 0.00012143024934070559,
 -0.0018087924907009979,
 -0.0015256548497179165,
 -0.00019238848217411914,
 0.0012924745350110707,
 5.230210785524785e-05,
 -0.0012657934021481046,
 -0.0007940311072291798,
 -0.0011575778252562918,
 -0.002188070396149016,
 -0.0007514263343524888,
 0.001749835461896528,
 -0.0006446180099079643]

In [255]:
# Load the Summary
trainingSummary = LR_BestModel.summary

# General Describe
trainingSummary.predictions.describe().show()

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print(" ")
print("objectiveHistory: (scaled loss + regularization) at each iteration")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print(" ")
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print(" ")
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print(" ")
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

# Generate confusion matrix and print (includes accuracy)
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(" ")
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

+-------+------------------+-------------------+
|summary|             label|         prediction|
+-------+------------------+-------------------+
|  count|               347|                347|
|   mean|0.4755043227665706| 0.4495677233429395|
| stddev|0.5001207563529201|0.49816842324132116|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+

 
objectiveHistory: (scaled loss + regularization) at each iteration
0.6919466236310897
0.651042093567163
0.5754645277661793
0.5519654264709095
0.5437304767618284
0.5404854443624808
0.5387722675779191
0.5384635579913163
0.5383951539956239
0.5383521873504922
0.5383271294446412
 
False positive rate by label:
label 0: 0.2909090909090909
label 1: 0.21428571428571427
 
True positive rate by label:
label 0: 0.7857142857142857
label 1: 0.7090909090909091
 
Precision by label:
label 0: 0.7486910994764397
label 1: 0.75
 
Recall by label:
label 0: 0.78571428571

In [256]:
LR_BestModel.summary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x2153136d460>