# Setup Python for Spark

In [1]:
import os
import sys

In [2]:
# Setup the folder paths
# Where you downloaded the resource bundle
os.chdir("/Users/jlyang/Documents/Intern&Job/Spark_Python_Do_Big_Data_Analytics")
# Where you installed spark
os.environ['SPARK_HOME'] = '/Users/jlyang/Spark/spark-2.1.0-bin-hadoop2.7'

In [3]:
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

In [4]:
# Add the following paths to the system path
sys.path.insert(0, os.path.join(SPARK_HOME,"python"))
sys.path.insert(0, os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0, os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME,"python","lib","py4j-0.10.4-src.zip"))

In [5]:
# Initialize SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [6]:
# Create a Spark Session
SpSession = SparkSession.builder.master("local[2]").appName("jlyang_spark") \
            .config("spark.executor.memory", "1g") \
            .config("spark.cores.max","2") \
            .config("spark.sql.warehouse.dir", "/Users/jlyang/Spark/spark-warehouse")\
            .getOrCreate()

In [7]:
# Get the Spark Context from Spark Session    
SpContext = SpSession.sparkContext

# Load data from the data file

In [8]:
# Load the file into a RDD
ccRaw = SpContext.textFile('credit-card-default-1000.csv')
ccRaw.take(5)

[u'CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED',
 u'530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 u'38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 u'43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0',
 u'47,20000,2,1,2,22,0,0,2,-1,0,-1,1131,291,582,291,0,291,291,582,0,0,130291,651,0']

In [9]:
# Remove header row
dataLines = ccRaw.filter(lambda x: 'CUSTID' not in x)
dataLines.count()

1002

# Prepare and augment the data

In [10]:
# Cleanup data. Remove the last two lines which are not 'CSV'
cleanedLines = dataLines.filter(lambda x: x.find('aaa') == -1)
cleanedLines.count()

1000

In [11]:
from pyspark.sql import Row
import numpy as np

In [12]:
# Convert into SQL Dataframe. In the process perform a few cleanups and changes required for future work
def convertToRow(instr):
    attList = instr.split(',')
    
    # PR#06 Round of age to range of 10s
    ageRound = round(float(attList[5]) / 10) * 10
    
    # Normalize sex to only 1 and 2
    sex = float(attList[2].replace('M', '1').replace('F', '2'))
    
    # Find average billed amount
    avgBillAmt = np.array(attList[12:18], dtype = 'float').mean().item()
    
    # Find average pay amount
    avgPayAmt = np.array(attList[18:24], dtype = 'float').mean().item()
    
    # Find average pay duration, required for PR#04
    # Make sure numbers are rounded and negative values are eliminated
    payDuration = np.array([x if float(x) > 0 else 0 for x in attList[6:12]], dtype = 'float')
    avgPayDuration = round(payDuration.mean().item())
    
    # Average percentage paid. Add this as an additional field to see if this field has any predictive capabilities
    perPay = round((avgPayAmt / (avgBillAmt + 1) * 100) / 25) * 25
    
    values = Row(CUSTID = attList[0], LIMIT_BAL = float(attList[1]), SEX = sex, EDUCATION = float(attList[3]), \
                 MARRIAGE = float(attList[4]), AGE = ageRound, AVG_PAY_DUR = avgPayDuration, \
                 AVG_BILL_AMT = avgBillAmt, AVG_PAY_AMT = avgPayAmt, PER_PAID = perPay, DEFAULTED = float(attList[24]))
    
    return values

In [13]:
# Cleanedup RDD
ccRows = cleanedLines.map(convertToRow)
ccRows.take(5)

[Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=27000.0, AVG_PAY_DUR=1.0, CUSTID=u'530', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=2700000.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=262.6666666666667, AVG_PAY_DUR=0.0, CUSTID=u'38', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=60000.0, MARRIAGE=2.0, PER_PAID=26275.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=250.0, AVG_PAY_DUR=0.0, CUSTID=u'43', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=10000.0, MARRIAGE=2.0, PER_PAID=25000.0, SEX=1.0),
 Row(AGE=20.0, AVG_BILL_AMT=431.0, AVG_PAY_AMT=21969.166666666668, AVG_PAY_DUR=0.0, CUSTID=u'47', DEFAULTED=0.0, EDUCATION=1.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=5075.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=3349.5, AVG_PAY_AMT=28651.5, AVG_PAY_DUR=0.0, CUSTID=u'70', DEFAULTED=0.0, EDUCATION=4.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=850.0, SEX=1.0)]

In [14]:
# Create a dataframe
ccDf = SpSession.createDataFrame(ccRows)
ccDf.cache()
ccDf.show(10)

+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
| AGE|      AVG_BILL_AMT|       AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE| PER_PAID|SEX|
+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
|20.0|               0.0|           27000.0|        1.0|   530|      0.0|      2.0|  20000.0|     2.0|2700000.0|2.0|
|20.0|               0.0| 262.6666666666667|        0.0|    38|      0.0|      2.0|  60000.0|     2.0|  26275.0|2.0|
|20.0|               0.0|             250.0|        0.0|    43|      0.0|      2.0|  10000.0|     2.0|  25000.0|1.0|
|20.0|             431.0|21969.166666666668|        0.0|    47|      0.0|      1.0|  20000.0|     2.0|   5075.0|2.0|
|20.0|            3349.5|           28651.5|        0.0|    70|      0.0|      4.0|  20000.0|     2.0|    850.0|1.0|
|20.0|1025.3333333333333|            7358.0|        0.0|    79| 

In [15]:
ccDf.printSchema()

root
 |-- AGE: double (nullable = true)
 |-- AVG_BILL_AMT: double (nullable = true)
 |-- AVG_PAY_AMT: double (nullable = true)
 |-- AVG_PAY_DUR: double (nullable = true)
 |-- CUSTID: string (nullable = true)
 |-- DEFAULTED: double (nullable = true)
 |-- EDUCATION: double (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- MARRIAGE: double (nullable = true)
 |-- PER_PAID: double (nullable = true)
 |-- SEX: double (nullable = true)



In [16]:
import pandas as pd

In [17]:
# Add SEX_NAME to the data using SQL join. Required for PR#02
genderDf = SpSession.createDataFrame(pd.DataFrame({'SEX': [1.0, 2.0], 'SEX_NAME': ['Male', 'Female']}))
genderDf.show()

+---+--------+
|SEX|SEX_NAME|
+---+--------+
|1.0|    Male|
|2.0|  Female|
+---+--------+



In [18]:
ccDf1 = ccDf.join(genderDf, ccDf.SEX == genderDf.SEX).drop(genderDf.SEX)
ccDf1.show(5)

+----+-------------------+-----------------+-----------+------+---------+---------+---------+--------+--------+---+--------+
| AGE|       AVG_BILL_AMT|      AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE|PER_PAID|SEX|SEX_NAME|
+----+-------------------+-----------------+-----------+------+---------+---------+---------+--------+--------+---+--------+
|70.0| -65.66666666666667|416.6666666666667|        0.0|   388|      1.0|      3.0|  80000.0|     1.0|  -650.0|1.0|    Male|
|60.0|-56043.166666666664|          57956.5|        0.0|   103|      1.0|      1.0| 480000.0|     1.0|  -100.0|1.0|    Male|
|60.0|                0.0|              0.0|        0.0|   932|      1.0|      1.0| 320000.0|     1.0|     0.0|1.0|    Male|
|60.0|                0.0|              0.0|        0.0|   948|      1.0|      2.0|  50000.0|     1.0|     0.0|1.0|    Male|
|60.0| 25828.333333333332|              0.0|        0.0|   602|      1.0|      3.0|  30000.0|     1.0|     0.0|1.0|    Male|


In [19]:
# Add ED_STR to the data using SQL join. Required for PR#03
eduDf = SpSession.createDataFrame(pd.DataFrame({'EDUCATION': [1.0, 2.0, 3.0, 4.0], 
                                                'ED_STR': ['Graduate', 'University', 'High School', 'Others']}))
eduDf.show()

+---------+-----------+
|EDUCATION|     ED_STR|
+---------+-----------+
|      1.0|   Graduate|
|      2.0| University|
|      3.0|High School|
|      4.0|     Others|
+---------+-----------+



In [20]:
ccDf2 = ccDf1.join(eduDf, ccDf1.EDUCATION == eduDf.EDUCATION).drop(eduDf.EDUCATION)
ccDf2.show(5)

+----+-------------------+-----------+-----------+------+---------+---------+---------+--------+--------+---+--------+--------+
| AGE|       AVG_BILL_AMT|AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE|PER_PAID|SEX|SEX_NAME|  ED_STR|
+----+-------------------+-----------+-----------+------+---------+---------+---------+--------+--------+---+--------+--------+
|60.0|-56043.166666666664|    57956.5|        0.0|   103|      1.0|      1.0| 480000.0|     1.0|  -100.0|1.0|    Male|Graduate|
|60.0|                0.0|        0.0|        0.0|   932|      1.0|      1.0| 320000.0|     1.0|     0.0|1.0|    Male|Graduate|
|60.0|                0.0|        0.0|        0.0|   466|      1.0|      1.0| 230000.0|     1.0|     0.0|1.0|    Male|Graduate|
|60.0|            19763.0|        0.0|        1.0|    35|      1.0|      1.0| 500000.0|     1.0|     0.0|1.0|    Male|Graduate|
|60.0|              -87.5|        0.0|        0.0|    66|      1.0|      1.0| 200000.0|     1.0|    -0.0

In [21]:
# Add MARR_DESC to the data with SQL join. Required for PR#03
marrDf = SpSession.createDataFrame(pd.DataFrame({'MARRIAGE': [1.0, 2.0, 3.0], 
                                                 'MARR_DESC': ['Single', 'Married', 'Others']}))
marrDf.show()

+--------+---------+
|MARRIAGE|MARR_DESC|
+--------+---------+
|     1.0|   Single|
|     2.0|  Married|
|     3.0|   Others|
+--------+---------+



In [22]:
ccFinalDf = ccDf2.join(marrDf, ccDf2.MARRIAGE == marrDf.MARRIAGE).drop(marrDf.MARRIAGE)
ccFinalDf.show(5)

+----+-------------------+-----------+-----------+------+---------+---------+---------+--------+--------+---+--------+--------+---------+
| AGE|       AVG_BILL_AMT|AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE|PER_PAID|SEX|SEX_NAME|  ED_STR|MARR_DESC|
+----+-------------------+-----------+-----------+------+---------+---------+---------+--------+--------+---+--------+--------+---------+
|60.0|-56043.166666666664|    57956.5|        0.0|   103|      1.0|      1.0| 480000.0|     1.0|  -100.0|1.0|    Male|Graduate|   Single|
|60.0|                0.0|        0.0|        0.0|   932|      1.0|      1.0| 320000.0|     1.0|     0.0|1.0|    Male|Graduate|   Single|
|60.0|                0.0|        0.0|        0.0|   466|      1.0|      1.0| 230000.0|     1.0|     0.0|1.0|    Male|Graduate|   Single|
|60.0|            19763.0|        0.0|        1.0|    35|      1.0|      1.0| 500000.0|     1.0|     0.0|1.0|    Male|Graduate|   Single|
|60.0|              -87.5|        

# Do analysis as required by the problem statement

In [23]:
# create a temp view
ccFinalDf.createOrReplaceTempView('CCDATA')

In [24]:
# PR#02 solution
SpSession.sql('SELECT SEX_NAME, count(*) AS Total, SUM(DEFAULTED) AS Defaults, ' + \
             'ROUND(SUM(DEFAULTED) * 100 / count(*)) AS PER_DEFAULT FROM CCDATA GROUP BY SEX_NAME').show()

+--------+-----+--------+-----------+
|SEX_NAME|Total|Defaults|PER_DEFAULT|
+--------+-----+--------+-----------+
|  Female|  591|   218.0|       37.0|
|    Male|  409|   185.0|       45.0|
+--------+-----+--------+-----------+



In [25]:
# PR#03 solution
SpSession.sql('SELECT MARR_DESC, ED_STR, COUNT(*) AS Total, SUM(DEFAULTED) AS Defaults, ' + \
             'ROUND(SUM(DEFAULTED) * 100 / count(*)) AS PER_DEFAULT FROM CCDATA GROUP BY MARR_DESC, ED_STR ' + \
             'ORDER BY 1, 2').show()

+---------+-----------+-----+--------+-----------+
|MARR_DESC|     ED_STR|Total|Defaults|PER_DEFAULT|
+---------+-----------+-----+--------+-----------+
|  Married|   Graduate|  268|    69.0|       26.0|
|  Married|High School|   55|    24.0|       44.0|
|  Married|     Others|    4|     2.0|       50.0|
|  Married| University|  243|    65.0|       27.0|
|   Others|   Graduate|    4|     4.0|      100.0|
|   Others|High School|    8|     6.0|       75.0|
|   Others| University|    7|     3.0|       43.0|
|   Single|   Graduate|  123|    71.0|       58.0|
|   Single|High School|   87|    52.0|       60.0|
|   Single|     Others|    3|     2.0|       67.0|
|   Single| University|  198|   105.0|       53.0|
+---------+-----------+-----+--------+-----------+



In [26]:
# PR#04 solution
SpSession.sql('SELECT AVG_PAY_DUR, COUNT(*) AS Total, SUM(DEFAULTED) AS Defaults, ' + \
             'ROUND(SUM(DEFAULTED) * 100 / count(*)) AS PER_DEFAULT FROM CCDATA GROUP BY AVG_PAY_DUR ORDER BY 1').show()

+-----------+-----+--------+-----------+
|AVG_PAY_DUR|Total|Defaults|PER_DEFAULT|
+-----------+-----+--------+-----------+
|        0.0|  739|   270.0|       37.0|
|        1.0|  239|   122.0|       51.0|
|        2.0|   16|     9.0|       56.0|
|        3.0|    6|     2.0|       33.0|
+-----------+-----+--------+-----------+



# Transform to a data frame for input to machine learning

In [27]:
# Perform first round correlation analysis
for i in ccDf.columns:
    if not(isinstance(ccDf.select(i).take(1)[0][0], unicode)):
        print('Correlation to DEFAULTED for {} is {}'.format(i, ccDf.stat.corr('DEFAULTED', i)))

Correlation to DEFAULTED for AGE is 0.516560220288
Correlation to DEFAULTED for AVG_BILL_AMT is 0.186007563785
Correlation to DEFAULTED for AVG_PAY_AMT is -0.16359608891
Correlation to DEFAULTED for AVG_PAY_DUR is 0.115134607604
Correlation to DEFAULTED for DEFAULTED is 1.0
Correlation to DEFAULTED for EDUCATION is 0.11056265057
Correlation to DEFAULTED for LIMIT_BAL is 0.10722031324
Correlation to DEFAULTED for MARRIAGE is -0.228912872874
Correlation to DEFAULTED for PER_PAID is -0.0268407104868
Correlation to DEFAULTED for SEX is -0.0836518221502


In [28]:
from pyspark.ml.linalg import Vectors

In [29]:
def transformToLabeledPoint(row):
    lp = (row['DEFAULTED'], Vectors.dense([row['AGE'], row['AVG_BILL_AMT'], row['AVG_PAY_AMT'], row['AVG_PAY_DUR'], \
                                          row['EDUCATION'], row['LIMIT_BAL'], row['MARRIAGE'], row['PER_PAID'], \
                                          row['SEX']]))
    return lp

In [30]:
ccLp = ccFinalDf.rdd.map(transformToLabeledPoint)
ccLp.take(5)

[(1.0,
  DenseVector([60.0, -56043.1667, 57956.5, 0.0, 1.0, 480000.0, 1.0, -100.0, 1.0])),
 (1.0, DenseVector([60.0, 0.0, 0.0, 0.0, 1.0, 320000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, 0.0, 0.0, 0.0, 1.0, 230000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, 19763.0, 0.0, 1.0, 1.0, 500000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, -87.5, 0.0, 0.0, 1.0, 200000.0, 1.0, -0.0, 1.0]))]

In [31]:
ccNormDf = SpSession.createDataFrame(ccLp, ['label', 'features'])
ccNormDf.select('*').show(10, truncate = False)
ccNormDf.cache()

+-----+------------------------------------------------------------------+
|label|features                                                          |
+-----+------------------------------------------------------------------+
|1.0  |[60.0,-56043.166666666664,57956.5,0.0,1.0,480000.0,1.0,-100.0,1.0]|
|1.0  |[60.0,0.0,0.0,0.0,1.0,320000.0,1.0,0.0,1.0]                       |
|1.0  |[60.0,0.0,0.0,0.0,1.0,230000.0,1.0,0.0,1.0]                       |
|1.0  |[60.0,19763.0,0.0,1.0,1.0,500000.0,1.0,0.0,1.0]                   |
|1.0  |[60.0,-87.5,0.0,0.0,1.0,200000.0,1.0,-0.0,1.0]                    |
|1.0  |[60.0,125.0,0.0,1.0,1.0,50000.0,1.0,0.0,1.0]                      |
|1.0  |[50.0,0.0,0.0,0.0,1.0,170000.0,1.0,0.0,1.0]                       |
|1.0  |[50.0,1270.6666666666667,0.0,0.0,1.0,290000.0,1.0,0.0,1.0]        |
|1.0  |[50.0,0.0,0.0,1.0,1.0,600000.0,1.0,0.0,1.0]                       |
|1.0  |[50.0,81.83333333333333,0.0,1.0,1.0,200000.0,1.0,0.0,1.0]         |
+-----+------------------

DataFrame[label: double, features: vector]

In [32]:
# Indexing needed as pre-req
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol = 'label', outputCol = 'indexed')
si_model = stringIndexer.fit(ccNormDf)
td = si_model.transform(ccNormDf)
td.show(5)

+-----+--------------------+-------+
|label|            features|indexed|
+-----+--------------------+-------+
|  1.0|[60.0,-56043.1666...|    1.0|
|  1.0|[60.0,0.0,0.0,0.0...|    1.0|
|  1.0|[60.0,0.0,0.0,0.0...|    1.0|
|  1.0|[60.0,19763.0,0.0...|    1.0|
|  1.0|[60.0,-87.5,0.0,0...|    1.0|
+-----+--------------------+-------+
only showing top 5 rows



In [33]:
# split into training and testing data
trainingData, testData = td.randomSplit([0.7, 0.3])

In [34]:
trainingData.count()

700

In [35]:
testData.count()

300

# PR#05 Do predictions to predict defaults

In [36]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [37]:
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'indexed', 
                                              metricName = 'accuracy')

In [38]:
# Create the Dicision Tree model (with hyper-parameter tuning)
paramGrid = ParamGridBuilder().addGrid(DecisionTreeClassifier.maxDepth, [4, 5, 6]).build()

In [39]:
dtClassifier = DecisionTreeClassifier(labelCol = 'indexed', featuresCol = 'features')

In [40]:
dtClassifier_cv = CrossValidator(estimator = DecisionTreeClassifier(), estimatorParamMaps = paramGrid,
                                evaluator = MulticlassClassificationEvaluator(), numFolds = 2)

In [41]:
dtModel = dtClassifier_cv.fit(trainingData)

In [42]:
# Predict on the test data
predictions = dtModel.transform(testData)
predictions.select('prediction', 'indexed', 'label', 'features').show()

+----------+-------+-----+--------------------+
|prediction|indexed|label|            features|
+----------+-------+-----+--------------------+
|       0.0|    0.0|  0.0|[20.0,415.0,415.0...|
|       0.0|    0.0|  0.0|[20.0,4689.5,4777...|
|       0.0|    0.0|  0.0|[30.0,12.83333333...|
|       0.0|    0.0|  0.0|[30.0,4673.166666...|
|       0.0|    0.0|  0.0|[30.0,5737.166666...|
|       0.0|    0.0|  0.0|[30.0,6613.166666...|
|       0.0|    0.0|  0.0|[30.0,7506.5,4709...|
|       0.0|    0.0|  0.0|[30.0,9349.5,1049...|
|       0.0|    0.0|  0.0|[30.0,12637.5,800...|
|       0.0|    0.0|  0.0|[30.0,12778.16666...|
|       0.0|    0.0|  0.0|[30.0,17449.0,130...|
|       0.0|    0.0|  0.0|[30.0,17651.0,140...|
|       0.0|    0.0|  0.0|[30.0,21045.33333...|
|       0.0|    0.0|  0.0|[30.0,22045.16666...|
|       0.0|    0.0|  0.0|[30.0,27967.0,573...|
|       0.0|    0.0|  0.0|[30.0,29458.5,214...|
|       0.0|    0.0|  0.0|[30.0,41517.66666...|
|       0.0|    0.0|  0.0|[30.0,46249.5,

In [43]:
print('Results of Decision Tree: {}'.format(evaluator.evaluate(predictions)))

Results of Decision Tree: 0.736666666667


In [44]:
# Create the Random Forest model
rfClassifier = RandomForestClassifier(labelCol = 'indexed', featuresCol = 'features')
rfModel = rfClassifier.fit(trainingData)

In [45]:
# Predict on the test data
predictions = rfModel.transform(testData)
predictions.select('prediction', 'indexed', 'label', 'features').show()

+----------+-------+-----+--------------------+
|prediction|indexed|label|            features|
+----------+-------+-----+--------------------+
|       0.0|    0.0|  0.0|[20.0,415.0,415.0...|
|       0.0|    0.0|  0.0|[20.0,4689.5,4777...|
|       0.0|    0.0|  0.0|[30.0,12.83333333...|
|       0.0|    0.0|  0.0|[30.0,4673.166666...|
|       0.0|    0.0|  0.0|[30.0,5737.166666...|
|       0.0|    0.0|  0.0|[30.0,6613.166666...|
|       0.0|    0.0|  0.0|[30.0,7506.5,4709...|
|       0.0|    0.0|  0.0|[30.0,9349.5,1049...|
|       0.0|    0.0|  0.0|[30.0,12637.5,800...|
|       0.0|    0.0|  0.0|[30.0,12778.16666...|
|       0.0|    0.0|  0.0|[30.0,17449.0,130...|
|       0.0|    0.0|  0.0|[30.0,17651.0,140...|
|       0.0|    0.0|  0.0|[30.0,21045.33333...|
|       0.0|    0.0|  0.0|[30.0,22045.16666...|
|       0.0|    0.0|  0.0|[30.0,27967.0,573...|
|       0.0|    0.0|  0.0|[30.0,29458.5,214...|
|       0.0|    0.0|  0.0|[30.0,41517.66666...|
|       0.0|    0.0|  0.0|[30.0,46249.5,

In [46]:
print('Results of Random Forest: {}'.format(evaluator.evaluate(predictions)))

Results of Random Forest: 0.753333333333


# Group data into 4 groups based on the said parameters

In [47]:
# Filter only columns needed for clustering
ccClustDf = ccFinalDf.select('SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'CUSTID')

In [48]:
# Do centering and scaling for the values
summStats = ccClustDf.describe().toPandas()
summStats

Unnamed: 0,summary,SEX,EDUCATION,MARRIAGE,AGE,CUSTID
0,count,1000.0,1000.0,1000.0,1000.0,1000.0
1,mean,1.591,1.769,1.608,35.42,500.5
2,stddev,0.4918952743836445,0.7212206036707212,0.5259397423779768,9.85498456217566,288.8194360957493
3,min,1.0,1.0,1.0,20.0,1.0
4,max,2.0,4.0,3.0,80.0,999.0


In [49]:
meanValues = summStats.iloc[1, 1:].values.tolist()
stdValues = summStats.iloc[2, 1:].values.tolist()

In [50]:
bcMeans = SpContext.broadcast(meanValues)
bcStdDev = SpContext.broadcast(stdValues)

In [51]:
def centerAndScale(inRow):
    global bcMeans
    global bcStdDev
    
    meanArray = bcMeans.value
    stdArray = bcStdDev.value
    
    retArray = []
    for i in range(len(meanArray)):
        retArray.append((float(inRow[i]) - float(meanArray[i])) / float(stdArray[i]))
    return Row(CUSTID = inRow[4], features = Vectors.dense(retArray))

In [52]:
ccMap = ccClustDf.rdd.map(centerAndScale)
ccMap.take(5)

[Row(CUSTID=u'103', features=DenseVector([-1.2015, -1.0662, -1.156, 2.4942, -1.3763])),
 Row(CUSTID=u'932', features=DenseVector([-1.2015, -1.0662, -1.156, 2.4942, 1.494])),
 Row(CUSTID=u'466', features=DenseVector([-1.2015, -1.0662, -1.156, 2.4942, -0.1195])),
 Row(CUSTID=u'35', features=DenseVector([-1.2015, -1.0662, -1.156, 2.4942, -1.6117])),
 Row(CUSTID=u'66', features=DenseVector([-1.2015, -1.0662, -1.156, 2.4942, -1.5044]))]

In [53]:
# Create a data frame with the features
ccFinalClustDf = SpSession.createDataFrame(ccMap)
ccFinalClustDf.cache()
ccFinalClustDf.show(10, truncate = False)

+------+----------------------------------------------------------------------------------------------------+
|CUSTID|features                                                                                            |
+------+----------------------------------------------------------------------------------------------------+
|103   |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,-1.376292417758966]  |
|932   |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,1.494013027076714]   |
|466   |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,-0.11945179474889138]|
|35    |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,-1.611733636394462]  |
|66    |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,-1.504400139663574]  |
|553   |[-1.2014752545458702,-1.0662479636412228,-1.156025968395157,2.4941693053828113,0.1817744702700521]  |
|603   |[-

In [54]:
# Perform clustering
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k = 4, seed = 1)
model = kmeans.fit(ccFinalClustDf)

In [55]:
predictions = model.transform(ccFinalClustDf)
predictions.select('*').show()

+------+--------------------+----------+
|CUSTID|            features|prediction|
+------+--------------------+----------+
|   103|[-1.2014752545458...|         3|
|   932|[-1.2014752545458...|         3|
|   466|[-1.2014752545458...|         3|
|    35|[-1.2014752545458...|         3|
|    66|[-1.2014752545458...|         3|
|   553|[-1.2014752545458...|         3|
|   603|[-1.2014752545458...|         3|
|   576|[-1.2014752545458...|         3|
|   452|[-1.2014752545458...|         3|
|    91|[-1.2014752545458...|         3|
|   406|[-1.2014752545458...|         3|
|   451|[-1.2014752545458...|         3|
|   395|[-1.2014752545458...|         3|
|    18|[-1.2014752545458...|         3|
|   917|[-1.2014752545458...|         3|
|   260|[-1.2014752545458...|         3|
|   724|[-1.2014752545458...|         3|
|   686|[-1.2014752545458...|         3|
|   207|[-1.2014752545458...|         3|
|   627|[-1.2014752545458...|         3|
+------+--------------------+----------+
only showing top