## Spark-Project
## Author: Veera Marni
## Email: vmarni@iu.edu
----------------------------------------------------------------------------------------------------------

### Data Description

This research employed a binary variable, default payment (Yes = 1, No = 0), as the response variable. This study reviewed the literature and used the following 23 variables as explanatory variables:

1. X1: Amount of the given credit (NT dollar): it includes both the individual consumer credit and his/her family (supplementary) credit. 
2. X2: Gender (1 = male; 2 = female). 
3. X3: Education (1 = graduate school; 2 = university; 3 = high school; 4 = others). 
4. X4: Marital status (1 = married; 2 = single; 3 = others). 
5. X5: Age (year). 
6. X6 - X11: History of past payment. We tracked the past monthly payment records (from April to September, 2005) as follows: X6 = the repayment status in September, 2005; X7 = the repayment status in August, 2005; . . .;X11 = the repayment status in April, 2005. The measurement scale for the repayment status is: -1 = pay duly; 1 = payment delay for one month; 2 = payment delay for two months; . . .; 8 = payment delay for eight months; 9 = payment delay for nine months and above. 
7. X12-X17: Amount of bill statement (NT dollar). X12 = amount of bill statement in September, 2005; X13 = amount of bill statement in August, 2005; . . .; X17 = amount of bill statement in April, 2005. 
8. X18-X23: Amount of previous payment (NT dollar). X18 = amount paid in September, 2005; X19 = amount paid in August, 2005; . . .;X23 = amount paid in April, 2005. 

### Data Loading

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

conf = SparkConf().setAppName('MyAppName').setMaster('local')
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)


In [31]:
# default_card_clients_rdd = sc.textFile('./default_of_credit_card_clients.csv').map(lambda line: 
#                                                                                    line.split(','))
default_card_clients_rdd = sc.textFile('./default_of_credit_card_clients.csv', 4)
header = default_card_clients_rdd.first().split(',')
header[-1] = "next_month_payment_status"        

default_card_clients_rdd_header = default_card_clients_rdd. \
                                        filter(lambda line: 'ID' in line)
default_card_clients_rdd_Noheader = default_card_clients_rdd. \
                                        subtract(default_card_clients_rdd_header)
default_card_clients_rdd_Noheader = default_card_clients_rdd_Noheader.map(
                                        lambda line: [int(val) for val in line.split(',')])

feilds = [StructField(name=feild_name, dataType=IntegerType(), 
                      nullable=True) for feild_name in header]
schema = StructType(feilds)

# print header
# print schema

In [3]:
dfcc = sqlContext.createDataFrame(
            data=default_card_clients_rdd_Noheader,schema=schema)
dfcc.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- next_month_payment_status: integ

In [4]:
dfcc.createOrReplaceTempView('default_card_clients_rdd_Noheader')

results = sqlContext.sql(
    "SELECT LIMIT_BAL+ID FROM default_card_clients_rdd_Noheader where ID=1")

results.show()

+----------------+
|(LIMIT_BAL + ID)|
+----------------+
|           20001|
+----------------+



### Data Transformation

Here we are generating new features which are will be useful for modelling as we will see later

In [5]:
dfcc.createOrReplaceTempView("table1")

dfcc_monthly_dues = sqlContext.sql('SELECT \
                    ID, LIMIT_BAL,\
                    EDUCATION,\
                    MARRIAGE, \
                    BILL_AMT1-PAY_AMT1 AS DUE_AMT1,\
                    BILL_AMT2-PAY_AMT2 AS DUE_AMT2,\
                    BILL_AMT3-PAY_AMT3 AS DUE_AMT3,\
                    BILL_AMT4-PAY_AMT4 AS DUE_AMT4,\
                    BILL_AMT5-PAY_AMT5 AS DUE_AMT5,\
                    BILL_AMT6-PAY_AMT6 AS DUE_AMT6, \
                    next_month_payment_status AS DPNM from table1'
                                   )

dfcc_monthly_dues.take(3)

[Row(ID=1753, LIMIT_BAL=60000, EDUCATION=2, MARRIAGE=1, DUE_AMT1=-25736, DUE_AMT2=30500, DUE_AMT3=0, DUE_AMT4=-52829, DUE_AMT5=11425, DUE_AMT6=19003, DPNM=0),
 Row(ID=8961, LIMIT_BAL=50000, EDUCATION=3, MARRIAGE=3, DUE_AMT1=24302, DUE_AMT2=21241, DUE_AMT3=16368, DUE_AMT4=8121, DUE_AMT5=8292, DUE_AMT6=8462, DPNM=0),
 Row(ID=18693, LIMIT_BAL=130000, EDUCATION=2, MARRIAGE=3, DUE_AMT1=3143, DUE_AMT2=1574, DUE_AMT3=0, DUE_AMT4=-12677, DUE_AMT5=-17905, DUE_AMT6=30582, DPNM=1)]

In [6]:
from pyspark.mllib.regression import LabeledPoint

def parserPoint(line):
    return LabeledPoint(line[-1], line[:-1])

dfcc_monthly_dues_rdd = dfcc_monthly_dues.rdd
parsed_dfcc_monthly_dues = dfcc_monthly_dues_rdd.map(
                        lambda line: parserPoint(line))
print parsed_dfcc_monthly_dues.first()

(0.0,[1753.0,60000.0,2.0,1.0,-25736.0,30500.0,0.0,-52829.0,11425.0,19003.0])


In [7]:
train, test = parsed_dfcc_monthly_dues.randomSplit(
                                        weights=[0.8,0.2])
print train.count()
print test.count()
print type(train)
print train.first()

24091
5909
<class 'pyspark.rdd.PipelinedRDD'>
(0.0,[1753.0,60000.0,2.0,1.0,-25736.0,30500.0,0.0,-52829.0,11425.0,19003.0])


### Model Learning

Using Logistic Regersion as this is a binary classificaiton problem

In [9]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

lr = LogisticRegressionWithLBFGS.train(
            data=train, intercept=True, iterations=5)

In [10]:
print "Intercept: ",lr.intercept
print "Weights: ",lr.weights

Intercept:  0.870893134514
Weights:  [-1.89203185143e-05,-2.23713734051e-06,-0.264932385385,-0.581250940679,-1.61309015366e-09,1.11197994454e-07,2.73880992407e-08,1.85438746928e-07,1.75456075931e-07,2.17450429914e-07]


### Model Evaluation

In [11]:
predictions = test.map(lambda p: 
                       (p.label, lr.predict(p.features)))
test_err = predictions.filter(
                    lambda (v,p): v!=p).count()/float(test.count())
print "Test Err: ",test_err            

Test Err:  0.217464884075


### Hypothesis testing

#### Data Preparation

In [18]:
def parserPoint(line):
    return LabeledPoint(line[-1], line[:-1])

col = dfcc_monthly_dues.columns
drop_col = ['ID']
final_col = [c for c in col if c not in drop_col]
print "Final features: ",final_col

dfcc_monthly_dues_rdd = dfcc_monthly_dues.select(final_col).rdd
train, test = dfcc_monthly_dues_rdd.map(
                lambda line: parserPoint(line)).randomSplit(weights=[0.8,0.2])

print "\n"
print "Train: ",train.count()
print "Test : ",test.count()
print "sample --"
print train.first()

Final features:  ['LIMIT_BAL', 'EDUCATION', 'MARRIAGE', 'DUE_AMT1', 'DUE_AMT2', 'DUE_AMT3', 'DUE_AMT4', 'DUE_AMT5', 'DUE_AMT6', 'DPNM']


Train:  23989
Test :  6011
sample --
(0.0,[60000.0,2.0,1.0,-25736.0,30500.0,0.0,-52829.0,11425.0,19003.0])


#### Data Modelling

In [20]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

logit_model = LogisticRegressionWithLBFGS.train(
                    data=train, intercept=True)

#### Model Evaluation

In [21]:
predictions = test.map(lambda p: (p.label,
                                  logit_model.predict(p.features)))
test_err = predictions.filter(lambda (v,p):
                              v!=p).count()/float(test.count())

print "Test err: ",test_err

Test err:  0.22725004159


#### Hypothesis Testing

In [23]:
from pyspark.mllib.stat import Statistics
import pandas as pd
pd.set_option('display.max_colwidth', 30)

chi = Statistics.chiSqTest(train)


records = [(result.statistic, 
            result.pValue) for result in chi]

chi_df = pd.DataFrame(
    data=records, index=final_col[:-1] , columns=["Statistic","p-value"])

chi_df 

Unnamed: 0,Statistic,p-value
LIMIT_BAL,843.870715,0.0
EDUCATION,134.471122,0.0
MARRIAGE,33.346437,2.721829e-07
DUE_AMT1,19297.884229,0.8469954
DUE_AMT2,19089.437952,0.808756
DUE_AMT3,18649.504659,0.8942534
DUE_AMT4,18426.948821,0.8864715
DUE_AMT5,17929.078264,0.8943814
DUE_AMT6,17736.33333,0.8245731


From the above it can be seen that only education, marriage and due_amt1 are important and rest can be removed from the model with out lossing the modelling power

### Model after removing categorical features

#### Data Preparation

In [24]:
def parserPoint(line):
    return LabeledPoint(line[-1], line[:-1])

col = dfcc_monthly_dues.columns
drop_col = ['ID','EDUCATION','MARRIAGE']
final_col = [c for c in col if c not in drop_col]
print "Final features: ",final_col

dfcc_monthly_dues_rdd = dfcc_monthly_dues.select(
                        final_col).rdd
train, test = dfcc_monthly_dues_rdd.map(
                lambda line: parserPoint(line)).randomSplit(weights=[0.8,0.2])

print "\n"
print "Train: ",train.count()
print "Test : ",test.count()
print "sample --"
print train.first()

Final features:  ['LIMIT_BAL', 'DUE_AMT1', 'DUE_AMT2', 'DUE_AMT3', 'DUE_AMT4', 'DUE_AMT5', 'DUE_AMT6', 'DPNM']


Train:  24056
Test :  5944
sample --
(0.0,[60000.0,-25736.0,30500.0,0.0,-52829.0,11425.0,19003.0])


#### Data Modelling

In [26]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

logit_model = LogisticRegressionWithLBFGS.train(
                data=train, intercept=True)

#### Model Evaluation

In [27]:
predictions = test.map(lambda p: (
                p.label, logit_model.predict(p.features)))
test_err = predictions.filter(
            lambda (v,p): v!=p).count()/float(test.count())

print "Test err: ",test_err

Test err:  0.215847913863


#### Hypothesis Testing

In [29]:
from pyspark.mllib.stat import Statistics
import pandas as pd
pd.set_option('display.max_colwidth', 30)

chi = Statistics.chiSqTest(train)


records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, 
                      index=final_col[:-1] , columns=["Statistic","p-value"])

print final_col
chi_df 

['LIMIT_BAL', 'DUE_AMT1', 'DUE_AMT2', 'DUE_AMT3', 'DUE_AMT4', 'DUE_AMT5', 'DUE_AMT6', 'DPNM']


Unnamed: 0,Statistic,p-value
LIMIT_BAL,824.884543,0.0
DUE_AMT1,19364.913137,0.875616
DUE_AMT2,19130.905306,0.706639
DUE_AMT3,18735.049287,0.81291
DUE_AMT4,18467.556856,0.876335
DUE_AMT5,17958.994734,0.916342
DUE_AMT6,17684.354716,0.843363


### Conclusion

From the above we can conclude that that only Due_Amt1 which is generated using Bill_Amt1 and Pay_Amt1 
are imporatnt features and other features can be dropped without loss of modelling power