### Load data

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Music").getOrCreate()
from pyspark.sql.types import *
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  return f(*args, **kwds)


In [2]:
df = spark.read.csv('../data/df_model_final.csv',header=True,inferSchema=True).cache()
df.show(5)

AnalysisException: 'Path does not exist: file:/Users/hli/Dropbox/Turner/Tools/DS/DS501/HW/capstone/Capstone_Music_Box_Spark/music-box/data/df_model_final.csv;'

In [None]:
df

### Prepare training data

In [None]:
selected_features = df.columns
selected_features.remove('uid')
selected_features.remove('label')
selected_features

In [None]:
# training data
assembler = VectorAssembler(
    inputCols=selected_features,
    outputCol="features")
data = assembler.transform(df).select('label', 'features')

# train test split
(train, test) = data.randomSplit([0.7, 0.3], seed=1)

### Fit logistic regression

In [None]:
lr = LogisticRegression()

# Run cross-validation, and choose the best set of parameters.
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 20, 30, 40]) \
    .addGrid(lr.regParam, [0.3, 0.1, 0.01]) \
    .build()
    
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4) 

cvModel = crossval.fit(train)
print('The best maxIter: {}'.format(cvModel.bestModel._java_obj.getMaxIter()))
print('The best regParam: {}'.format(cvModel.bestModel._java_obj.getRegParam()))

In [None]:
import matplotlib.pyplot as plt, pandas as pd, numpy as np
%matplotlib inline
lrCoeffs = pd.DataFrame(list(zip(selected_features, cvModel.bestModel.coefficients)), columns = ['feature', 'coeff']).sort_values(by="coeff", ascending=False)
ax =  lrCoeffs.plot.barh(figsize=(8, 10))
t = np.arange(lrCoeffs.shape[0])
ax.set_yticks(t)
ax.set_yticklabels(lrCoeffs['feature'])
ax.set_title("Coefficient of Logistic Regression")
plt.show()

### Predict and Evaluate performance

In [None]:
#### predict and evaluate performance

# Predict train data
predictions_train = cvModel.transform(train)

# Select example rows to display.
predictions_train.select("probability","prediction", "label", "features").show(5)
res_train = predictions_train.select("probability","prediction", "label").toPandas()


# Predict test data
predictions_test = cvModel.transform(test)
res_test = predictions_test.select("probability","prediction", "label").toPandas()


### Evaluate performance

In [None]:
%matplotlib inline
from sklearn.metrics import roc_curve, auc, roc_auc_score
import sklearn.metrics
import pandas as pd
import numpy as np


# helper
def plot_roc_curve(y_train, y_train_pred, y_test, y_test_pred):
    """
    Plot ROC curves for the training set and the test set
    """
    roc_auc_train = roc_auc_score(y_train, y_train_pred)
    fpr_train, tpr_train, _ = roc_curve(y_train, y_train_pred)

    roc_auc_test = roc_auc_score(y_test, y_test_pred)
    fpr_test, tpr_test, _ = roc_curve(y_test, y_test_pred)
    plt.figure()
    lw = 2
    plt.plot(fpr_train, tpr_train, color='green',
             lw=lw, label='ROC Train (AUC = %0.4f)' % roc_auc_train)
    plt.plot(fpr_test, tpr_test, color='darkorange',
             lw=lw, label='ROC Test (AUC = %0.4f)' % roc_auc_test)
    plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic example')
    plt.legend(loc="lower right")
    plt.show()


def plot_cumulative_gains(lift: pd.DataFrame):
    """
    Plot cumulative gains 
    Reference: https://analyticsgyanblog.wordpress.com/category/lift-curve/
    http://www2.cs.uregina.ca/~dbd/cs831/notes/lift_chart/lift_chart.html
    """
    fig, ax = plt.subplots(1, 2)
    fig.set_figwidth(10)
    fig.canvas.draw()

    handles = []
    handles.append(ax[0].plot(lift['numCases'], lift['TP'], 'r-', label='gains'))
    handles.append(ax[0].plot(lift['numCases'], lift['randP'], 'b-', label='baseline'))
    ax[0].set_xlabel('Number of Cases')
    ax[0].set_ylabel('Gains')
    ax[0].legend()
    ax[0].set_title('Cumulative Gain Chart')
    fig.show()
    
    ax[1].plot(lift['proportionOfCases'], lift['lift'], 'r-', label='lift curve')
    ax[1].plot(lift['proportionOfCases'], lift['baseline'], 'b-', label='baseline')
    ax[1].set_ylabel('Lift Curve')
    ax[1].set_xlabel('Cumulative Proportion of Cases')
    ax[1].set_title('Lift Chart')
    plt.tight_layout()
    
def calc_cumulative_gains(df: pd.DataFrame, actual_col: str, predicted_col:str, probability_col:str):
    """
    Generate the performance chart plot gains and lift
    Reference: https://stackoverflow.com/questions/42699243/how-to-build-a-lift-chart-a-k-a-gains-chart-in-python
    """
    lift = df.sort_values(by=probability_col, ascending=False)
    numChurn = sum(lift[actual_col] == True)
    numCases = lift.shape[0]
    randProb = numChurn / numCases
    lift['numCases'] = range(1, numCases+1)
    # Gain Chart
    lift['TP'] = lift[actual_col].cumsum()
    lift['randP'] = randProb
    lift['randP'] = lift['randP'].cumsum()
    # Lift Chart
    lift['proportionOfChurnCaptured'] = lift['TP'] / numChurn
    lift['proportionOfCases'] = lift['numCases'] / numCases
    lift['lift'] = lift['proportionOfChurnCaptured'] / lift['proportionOfCases']
    lift['baseline'] = 1
    plot_cumulative_gains(lift)
    return lift    



In [None]:
# Plot ROC curves for the training set and the test set
y_train = res_train['label']
res_train['probability'] = [v[1] for v in res_train['probability']]
y_train_pred = res_train['probability']
y_test = res_test['label']
res_test['probability'] = [v[1] for v in res_test['probability']]
y_test_pred = res_test['probability']

plot_roc_curve(y_train, y_train_pred, y_test, y_test_pred)

In [None]:
model_gain = calc_cumulative_gains(res_test, 'label',"prediction", "probability")

In [None]:
model_gain

### Random Forest

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features"
                            ,featureSubsetStrategy="auto"
                            ,impurity='gini'
                            ,maxDepth=20
                            ,minInstancesPerNode=10
                            ,maxBins=16
                            , seed=42
                            )
# Run cross-validation, and choose the best set of parameters.
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 150]) \
    .build()
    
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4) 

cvModel = crossval.fit(train)
print('The best numTrees: {}'.format(cvModel.bestModel._java_obj.getNumTrees()))

In [None]:
rfImp = pd.DataFrame(list(zip(selected_features, cvModel.bestModel.featureImportances.toArray())), columns = ['feature', 'imp']).sort_values(by="imp", ascending=False)
ax =  rfImp.plot.barh(figsize=(8, 10))
t = np.arange(rfImp.shape[0])
ax.set_yticks(t)
ax.set_yticklabels(rfImp['feature'])
ax.set_title("RF: feature importance")
plt.show()

In [None]:
#### predict and evaluate performance

# Predict train data
predictions_train = cvModel.transform(train)

# Select example rows to display.
predictions_train.select("probability","prediction", "label", "features").show(5)
res_train = predictions_train.select("probability","prediction", "label").toPandas()


# Predict test data
predictions_test = cvModel.transform(test)

# Select example rows to display.
predictions_test.select("probability","prediction", "label", "features").show(5)
res_test = predictions_test.select("probability","prediction", "label").toPandas()

In [None]:
# Plot ROC curves for the training set and the test set
y_train = res_train['label']
res_train['probability'] = [v[1] for v in res_train['probability']]
y_train_pred = res_train['probability']
y_test = res_test['label']
res_test['probability'] = [v[1] for v in res_test['probability']]
y_test_pred = res_test['probability']

plot_roc_curve(y_train, y_train_pred, y_test, y_test_pred)

In [None]:
model_gain = calc_cumulative_gains(res_test, 'label',"prediction", "probability")

### Gradient Boosting Trees

In [None]:
# Train a GBT model.
gbt = GBTClassifier(labelCol="label", featuresCol="features", seed=42)

# Run cross-validation, and choose the best set of parameters.
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [20, 50, 100]) \
    .addGrid(gbt.maxDepth, [2, 3, 4]) \
    .build()
    
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4) 

cvModel = crossval.fit(train)
print('The best maxIter: {}'.format(cvModel.bestModel._java_obj.getMaxIter()))
print('The best maxDepth: {}'.format(cvModel.bestModel._java_obj.getMaxDepth()))

In [None]:
rfImp = pd.DataFrame(list(zip(selected_features, cvModel.bestModel.featureImportances.toArray())), columns = ['feature', 'imp']).sort_values(by="imp", ascending=False)
ax =  rfImp.plot.barh(figsize=(8, 10))
t = np.arange(rfImp.shape[0])
ax.set_yticks(t)
ax.set_yticklabels(rfImp['feature'])
ax.set_title("GBT: feature importance")
plt.show()

In [None]:
#### predict and evaluate performance

# Predict train data
predictions_train = cvModel.transform(train)

# Select example rows to display.
predictions_train.select("probability","prediction", "label", "features").show(5)
res_train = predictions_train.select("probability","prediction", "label").toPandas()


# Predict test data
predictions_test = cvModel.transform(test)

# Select example rows to display.
predictions_test.select("probability","prediction", "label", "features").show(5)
res_test = predictions_test.select("probability","prediction", "label").toPandas()

In [None]:
# Plot ROC curves for the training set and the test set
y_train = res_train['label']
res_train['probability'] = [v[1] for v in res_train['probability']]
y_train_pred = res_train['probability']
y_test = res_test['label']
res_test['probability'] = [v[1] for v in res_test['probability']]
y_test_pred = res_test['probability']

plot_roc_curve(y_train, y_train_pred, y_test, y_test_pred)

In [None]:
model_gain = calc_cumulative_gains(res_test, 'label',"prediction", "probability")

In [None]:
res_test.to_csv('res_test')

In [None]:
model_gain.to_csv('gain.csv')