In [111]:
# Start to use pyspark
from pyspark.sql import SparkSession

from pyspark.sql import functions as f
from pyspark.sql.functions import isnull, count, when, col

import numpy as np

import warnings
warnings.simplefilter('ignore')

In [1]:
spark = SparkSession.\
            builder.\
            appName('Cathay Project').\
            getOrCreate()

In [170]:
df = spark.read.csv('bank-additional/bank-additional-full.csv',
                    sep=';',
                    header=True,
                    inferSchema=True)

df.show(5, False)

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|job      |marital|education  |default|housing|loan|contact  |month|day_of_week|duration|campaign|pdays|previous|poutcome   |emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|y  |
+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|56 |housemaid|married|basic.4y   |no     |no     |no  |telephone|may  |mon        |261     |1       |999  |0       |nonexistent|1.1         |93.994        |-36.4        |4.857    |5191.0     |no |
|57 |services |married|high.school|unknown|no     |no  |telephone|may  |mon        |149     |1       |999  |0       |nonexistent|1.1         |93.994        |-36.4        |4.857    |5191.0     |no |
|37 |servi

In [171]:
# change column nama from '.' to '_'
df = df.toDF(*[c.replace('.', '_') for c in df.columns])

In [164]:
def data_overview():
    """
    input: dataset
    output: data summary
    """
    print('Rows:', df.count())
    print('Columns:', len(df.columns))
    print('Missing', df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).\
                        toPandas().sum().sum())
    for c in df.columns:
        print('{:15}: '.format(c), df.select(c).distinct().count(), '\t', 
              [row[c] for row in df.select(c).distinct().collect()[:5]])
        
data_overview()

Rows: 41188
Columns: 21
Missing 0
age            :  78 	 [31, 85, 65, 53, 78]
job            :  12 	 ['management', 'retired', 'unknown', 'self-employed', 'student']
marital        :  4 	 ['unknown', 'divorced', 'married', 'single']
education      :  8 	 ['high.school', 'unknown', 'basic.6y', 'professional.course', 'university.degree']
default        :  3 	 ['unknown', 'no', 'yes']
housing        :  3 	 ['unknown', 'no', 'yes']
loan           :  3 	 ['unknown', 'no', 'yes']
contact        :  2 	 ['cellular', 'telephone']
month          :  10 	 ['jun', 'aug', 'may', 'sep', 'mar']
day_of_week    :  5 	 ['fri', 'thu', 'tue', 'wed', 'mon']
duration       :  1544 	 [471, 463, 148, 496, 833]
campaign       :  42 	 [31, 34, 28, 26, 27]
pdays          :  27 	 [26, 27, 12, 22, 1]
previous       :  8 	 [1, 6, 3, 5, 4]
poutcome       :  3 	 ['success', 'failure', 'nonexistent']
emp_var_rate   :  10 	 [-1.7, -0.1, 1.4, -1.8, -3.0]
cons_price_idx :  26 	 [92.201, 93.369, 92.713, 94.215, 92.963]
con

In [165]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp_var_rate: double (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: integer (nullable = false)



In [172]:
# Change Yes/No to 1/0
df = df.withColumn('y', when(df.y == 'yes', 1).otherwise(0))

## EDA

In [112]:
import plotly.offline as py
py.init_notebook_mode(connected=True)
import plotly.graph_objs as go
import plotly.tools as tls
import plotly.figure_factory as ff

In [173]:
# Get label
lab = df.groupBy('y').count().toPandas()['y'] # type: pd.Series
# Get count
values = df.groupBy('y').count().toPandas()['count'] # type: pd.Series

trace = go.Pie(labels=lab, values=values,
               marker=dict(colors=['lime', 'royalblue'],
                           line=dict(color='white',
                                     width=1.3)),
               rotation=90,
               hole=.5)

layout = go.Layout(dict(title='Subscribe Term Deposit'),
                        plot_bgcolor="rgb(243,243,243)",
                        paper_bgcolor="rgb(243,243,243)")

fig = go.Figure(data=[trace], layout=layout)
py.iplot(fig)

In [174]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

vector_col = 'cor_features'
assembler = VectorAssembler(inputCols=[d[0] for d in df.dtypes if d[1] != 'string'], 
                            outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

# Get correlation matrix
matrix = Correlation.corr(dataset=df_vector, column=vector_col)

In [196]:
trace = ff.create_annotated_heatmap(z=np.round(matrix.collect()[0][0].toArray().tolist(), 2),
                                    x=[d[0] for d in df.dtypes if d[1] != 'string'],
                                    y=[d[0] for d in df.dtypes if d[1] != 'string'],
                                    colorscale='Viridis',
                                    hoverinfo='z')

fig = go.Figure(data=trace)
py.iplot(fig)

## Job 

In [198]:
# value counts
df.filter(df['job'] == 'unknown').groupBy('job', 'y').count().show()

+-------+---+-----+
|    job|  y|count|
+-------+---+-----+
|unknown|  0|  293|
|unknown|  1|   37|
+-------+---+-----+



In [199]:
# Filter unknown value
df = df.filter(df['job'] != 'unknown')

In [201]:
import plotly.tools as tls

In [233]:
# Count plot
grouped = df.groupBy('job', 'y').count()


trace1 = go.Bar(x=[row['job'] for row in grouped.filter(grouped['y'] == 0).collect()],
                y=[row['count'] for row in grouped.filter(grouped['y'] == 0).collect()],
                name='No',showlegend=True)

trace2 = go.Bar(x=[row['job'] for row in grouped.filter(grouped['y'] == 1).collect()],
                y=[row['count'] for row in grouped.filter(grouped['y'] == 1).collect()],
                name='Yes')

# Mean Plot
grouped_mean = df.groupBy('job').agg({'y': 'mean'})
trace3 = go.Bar(x=[row['job'] for row in grouped_mean.collect()],
                y=[round(row['avg(y)'], 2) for row in grouped_mean.collect()],
                name='Mean Subscription Rate', showlegend=False)

fig = tls.make_subplots(rows=1, cols=2,
                        subplot_titles=('Count Plot of Job',
                                        'Mean Subscription Rate of Job'))
fig.append_trace(trace1, 1, 1)
fig.append_trace(trace2, 1, 1)
fig.append_trace(trace3, 1, 2)

py.iplot(fig)

In [275]:
trace1 = ff.create_distplot([df.filter(df['y'] == 1).rdd.map(lambda x: x['age']).collect(),
                            df.filter(df['y'] == 0).rdd.map(lambda x: x['age']).collect()],
                            group_labels=['Subscription 1',
                                          'Subscription 0'],
                            show_rug=False)

grouped_mean = df.groupBy('age').agg({'y': 'mean'}).orderBy('age')
trace2 = go.Scatter(x=grouped_mean.rdd.map(lambda x: x['age']).collect(),
                    y=grouped_mean.rdd.map(lambda x: x['avg(y)']).collect(),
                    mode='lines+markers',
                    showlegend=False)

fig = tls.make_subplots(rows=1, cols=2,
                        subplot_titles=('Count Plot of Age',
                                        'Mean Subscription Rate vs. age'))

# If you want to make distplot in the subplots,
# you need to plot if separately after make ff.creat_distplot
fig.add_trace(go.Histogram(trace1['data'][1],
                           marker_color='blue',
                           marker={'opacity':0.3},
                           showlegend=True), 1, 1)
fig.add_trace(go.Histogram(trace1['data'][0],
                           marker_color='red',
                           marker={'opacity':0.4},
                           showlegend=True), 1, 1)
fig.append_trace(trace2, 1, 2)
fig.show()

## Martial

In [292]:
df = df.filter(df.marital != 'unknown')

## Education

In [300]:
from pyspark.sql.types import StringType
change_education = f.udf(lambda x: 'university.degree' if x == 'unknown' else x, StringType())
df = df.withColumn('education', change_education(df.education))

## Age

In [306]:
change_age = f.udf(lambda x: 'old' if x >= 60 else 'mid' if x >= 30 else 'low')
df = df.withColumn('age_group', change_age(df.age))

In [309]:
df = df.drop('age')

## Duration 

In [312]:
df = df.drop('duration')

## Campaign

In [316]:
df = df.withColumn('campaign', df.campaign-1)

## pdays

In [330]:
from pyspark.sql.types import IntegerType
change_pdays = f.udf(lambda x: 0 if x==999 else 1, IntegerType())
df = df.withColumn('pdays', change_pdays(df.pdays))

## emp.var.rate

In [333]:
df = df.drop('emp_var_rate')

## StandardScaler 

In [334]:
categorical_features = [d[0] for d in df.dtypes if (d[1] == 'string') & (d[0] != 'y')]
continuous_features = [d[0] for d in df.dtypes if (d[1] != 'string') & (d[0] != 'y')]

In [428]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler

# Every categorical features have own indexer
indexers = [StringIndexer(inputCol=c, outputCol='{}_indexed'.format(c)) for c in categorical_features]
# Depend each indexer to build encoder
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), 
                          outputCol='{}_encoded'.format(indexer.getOutputCol())) for indexer in indexers]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]+continuous_features,
                            outputCol='features')

pipeline = Pipeline(stages=indexers+encoders+[assembler])

model = pipeline.fit(df)
data = model.transform(df)

data = data.withColumn('label', col('y'))
data = data.select('features', 'label')

In [445]:
data.show(5)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(49,[8,10,16,18,2...|    0|(49,[8,10,16,18,2...|
|(49,[3,10,13,19,2...|    0|(49,[3,10,13,19,2...|
|(49,[3,10,13,18,2...|    0|(49,[3,10,13,18,2...|
|(49,[0,10,17,18,2...|    0|(49,[0,10,17,18,2...|
|(49,[3,10,13,18,2...|    0|(49,[3,10,13,18,2...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [448]:
trainingData.show()

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|
|(49,[0,10,12,18,2...|    0|(49,[0,10,12,18,2...|


In [344]:
data.select('label')

DataFrame[label: int]

In [432]:
# Features
from pyspark.ml.feature import VectorIndexer
# Automatically identify categorical features, and index them.
# Set maxCategories so feature with > distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol='features',
                               outputCol='indexedFeatures',
                               maxCategories=15).fit(data)

data = featureIndexer.transform(data)

In [451]:
data.show(20)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(49,[8,10,16,18,2...|    0|(49,[8,10,16,18,2...|
|(49,[3,10,13,19,2...|    0|(49,[3,10,13,19,2...|
|(49,[3,10,13,18,2...|    0|(49,[3,10,13,18,2...|
|(49,[0,10,17,18,2...|    0|(49,[0,10,17,18,2...|
|(49,[3,10,13,18,2...|    0|(49,[3,10,13,18,2...|
|(49,[3,10,14,19,2...|    0|(49,[3,10,14,19,2...|
|(49,[0,10,15,18,2...|    0|(49,[0,10,15,18,2...|
|(49,[1,10,12,19,2...|    0|(49,[1,10,12,19,2...|
|(49,[2,11,15,18,2...|    0|(49,[2,11,15,18,2...|
|(49,[3,11,13,18,2...|    0|(49,[3,11,13,18,2...|
|(49,[1,10,12,19,2...|    0|(49,[1,10,12,19,2...|
|(49,[3,11,13,18,2...|    0|(49,[3,11,13,18,2...|
|(49,[1,11,13,18,2...|    0|(49,[1,11,13,18,2...|
|(49,[8,16,18,20,2...|    0|(49,[8,16,18,20,2...|
|(49,[1,10,17,18,2...|    0|(49,[1,10,17,18,2...|
|(49,[5,10,14,19,2...|    0|(49,[5,10,14,19,2...|
|(49,[1,10,17,18,2...|    0|(49,[1,10,17,18,2...|


In [452]:
# Split the data into training and test sets
trainingData, testData = data.randomSplit([0.8, 0.2])

trainingData.show(5, False)
testData.show(5, False)

+----------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                          |label|indexedFeatures                                                                                                                 |
+----------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------+
|(49,[0,10,12,18,20,22,24,26,34,38,40,42,45,46,47,48],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,93.918,-42.7,4.968,5228.1]) |0    |(49,[0,10,12,18,20,22,24,26,34,38,40,42,45,46,47,48],[1.0,

In [570]:
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

logr = LogisticRegression(featuresCol='indexedFeatures',
                          labelCol='label')
rf = RandomForestClassifier(featuresCol='indexedFeatures',
                            labelCol='label')
gbt = GBTClassifier(featuresCol='indexedFeatures',
                    labelCol='label')


In [566]:
model = gbt.fit(trainingData)

In [575]:
predictions.withColumn('label', col('label').cast(FloatType())).select('prediction', 'label')


DataFrame[prediction: double, label: float]

In [556]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [591]:
from sklearn.metrics import roc_curve

In [597]:
def eval_result(name, pred):
    """
    input: name
    output: metric
    """
    evaluator = MulticlassClassificationEvaluator(labelCol='label', 
                                                  predictionCol='prediction',
                                                  metricName=name)
    return evaluator.evaluate(pred)
    

def select_model_prediction(algo, train, test, cf='coefficient'):

    model = algo.fit(train)
    predictions = model.transform(test)
    pred = predictions.select('label', 'prediction')
    probabilities = predictions.select('label', 'probability')
    
    # Categorical Features
    binary_metadata = predictions.select("features").schema[0].metadata.\
                              get('ml_attr').get('attrs').get('binary')
    # Numerical Features
    numeric_metadata = predictions.select("features").schema[0].metadata.\
                               get('ml_attr').get('attrs').get('numeric')
    all_metadata = [m['name'] for m in binary_metadata + numeric_metadata]
    
    if cf == 'coefficient':
        coefficients = np.array(model.coefficients).tolist() # For Logistic Regression
    elif cf == 'features':
        coefficients = np.array(model.featureImportances).tolist() # For tree
        
    coef_sumry = spark.createDataFrame([*zip(all_metadata, coefficients)],
                                          ['feature', 'coefficients']).orderBy('coefficients')
    
    accuracy = eval_result('accuracy', pred)
    pred_and_label = predictions.withColumn('label', col('label').cast(FloatType())).select('prediction', 'label')
    metrics = MulticlassMetrics(pred_and_label.rdd.map(tuple))
    #f1 = eval_result('f1', pred)
    #precision = eval_result('weightedPrecision', pred)
    #recall = eval_result('weightedRecall', pred)
    
    print(type(algo).__name__)
    print('-' * 80)
    print('Accuracy: {:5.2f}'.format(accuracy))
    print('F1: {:11.2f}'.format(metrics.fMeasure()))
    print('Precision: {:4.2f}'.format(metrics.precision()))
    print('Recall: {:7.2f}'.format(metrics.recall()))
    
    pred_and_label = predictions.withColumn('label', col('label').cast(FloatType())).select('prediction', 'label')
    conf_matrix = metrics.confusionMatrix().toArray()
            
    preds = predictions.select('label','probability').\
                        rdd.map(lambda row: (float(row['probability'][1]), float(row['label']))).\
                        collect()
    
    y_score, y_true = zip(*preds)
    fpr, tpr, thresholds = roc_curve(y_true, y_score, pos_label = 1)
        
    # plot confusion matrix
    trace1 = go.Heatmap(z=conf_matrix,
                        x=['Not Subscribe', 'Subscribe'],
                        y=['Not Subscribe', 'Subscribe'],
                        showscale=False, colorscale='Picnic',
                        name='Confusion_matrix')
    
    # plot roc curve
    trace2 = go.Scatter(x=fpr,y=tpr,
                        line=dict(color=('rgb(205, 12, 24)'), width=2))
    
    trace3 = go.Scatter(x=[0, 1], y=[0, 1],
                        line=dict(color=('rgb(205, 12, 24)'), width=2,
                                  dash='dot'))
    
    trace4 = go.Bar(y=list(map(lambda x: x['feature'], coef_sumry.select('feature').collect())),
                    x=list(map(lambda x: x['coefficients'], coef_sumry.select('coefficients').collect())),
                    orientation='h',
                    marker=dict(color=list(map(lambda x: x['coefficients'], coef_sumry.select('coefficients').collect())),
                                colorscale='Picnic',
                                line=dict(width=0.6, color='black')))
    
    fig = tls.make_subplots(rows=2, cols=2, specs=[[{}, {}], [{'colspan': 2}, None]],
                            subplot_titles=('Confusion Matrix',
                                            'Receiver Operating Characteristic'))
    fig.append_trace(trace1, 1, 1)
    fig.append_trace(trace2, 1, 2)
    fig.append_trace(trace3, 1, 2)
    fig.append_trace(trace4, 2, 1)
    
    fig['layout'].update(showlegend=False, title='Model performance',
                         autosize=False, height=900, width=1000,
                         plot_bgcolor='rgba(240, 240, 240, 0.95)',
                         paper_bgcolor='rgba(240, 240, 240, 0.95)',
                         margin=dict(b=195))
    fig['layout']['xaxis2'].update(dict(title='false positive rate'))
    fig['layout']['yaxis2'].update(dict(title='true positive rate'))
    fig['layout']['xaxis3'].update(dict(showgrid=True, tickfont=dict(size=10)),
                                   tickangle=0)
    py.iplot(fig)

In [595]:
select_model_prediction(logr, trainingData, testData)

LogisticRegression
--------------------------------------------------------------------------------
Accuracy:  0.89
F1:        0.89
Precision: 0.89
Recall:    0.89


In [598]:
select_model_prediction(rf, trainingData, testData, 'features')

RandomForestClassifier
--------------------------------------------------------------------------------
Accuracy:  0.89
F1:        0.89
Precision: 0.89
Recall:    0.89


In [599]:
select_model_prediction(gbt, trainingData, testData, 'features')

GBTClassifier
--------------------------------------------------------------------------------
Accuracy:  0.90
F1:        0.90
Precision: 0.90
Recall:    0.90


In [475]:
model.summary.areaUnderROC

0.7903145535048696

In [416]:
np.array(model.coefficients).shape

(49,)

In [616]:
from pyspark.sql.types import FloatType, StructField, StructType
spark.createDataFrame(model.featureImportances.toArray().tolist(), FloatType()).show()

+------------+
|       value|
+------------+
|0.0022826074|
| 0.016603746|
| 0.016905045|
|0.0035631713|
|0.0032998493|
| 0.013040811|
| 0.004505583|
| 0.004194423|
|  0.00350487|
| 0.008660103|
|0.0115516465|
|0.0027491874|
| 0.023483954|
|0.0074201147|
| 0.007899232|
| 0.011400304|
|0.0030640136|
| 0.003966118|
| 0.016594259|
|         0.0|
+------------+
only showing top 20 rows



In [612]:
type(model.featureImportances)

pyspark.ml.linalg.SparseVector