## Colocando dataset no ambiente Spark

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark
from pyspark.sql import Row
import csv 
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
sc = SparkContext('local','example')
sql_c = SQLContext(sc)
df =  sql_c.read.csv('C:\\Users\\lucas\\Documents\\Itau_Analytics\\Big Data\\consumer_complaints_fim.csv',header=True, sep=";")



In [3]:
df.columns

['_c0',
 'product',
 'issue',
 'company',
 'state',
 'submitted_via',
 'timely_response',
 'consumer_disputed?']

## Importancia da variavel

In [4]:
import pandas as pd
import numpy as np


# Calculate information value
def calc_iv(df, feature, target, pr=False):
    """
    Set pr=True to enable printing of output.
    
    Output: 
      * iv: float,
      * data: pandas.DataFrame
    """
    
    lst = []

    df[feature] = df[feature].fillna("NULL")

    for i in range(df[feature].nunique()):
        val = list(df[feature].unique())[i]
        lst.append([feature,                                                        # Variable
                    val,                                                            # Value
                    df[df[feature] == val].count()[feature],                        # All
                    df[(df[feature] == val) & (df[target] == 'No')].count()[feature],  # Good (think: Fraud == 0)
                    df[(df[feature] == val) & (df[target] == 'Yes')].count()[feature]]) # Bad (think: Fraud == 1)

    data = pd.DataFrame(lst, columns=['Variable', 'Value', 'All', 'Good', 'Bad'])

    data['Share'] = data['All'] / data['All'].sum()
    data['Bad Rate'] = data['Bad'] / data['All']
    data['Distribution Good'] = (data['All'] - data['Bad']) / (data['All'].sum() - data['Bad'].sum())
    data['Distribution Bad'] = data['Bad'] / data['Bad'].sum()
    data['WoE'] = np.log(data['Distribution Good'] / data['Distribution Bad'])

    data = data.replace({'WoE': {np.inf: 0, -np.inf: 0}})

    data['IV'] = data['WoE'] * (data['Distribution Good'] - data['Distribution Bad'])

    data = data.sort_values(by=['Variable', 'Value'], ascending=[True, True])
    data.index = range(len(data.index))

    if pr:
        print(data)
        print('IV = ', data['IV'].sum())


    iv = data['IV'].sum()
    # print(iv)

    return iv, data

In [5]:
dfp=df.toPandas()


In [6]:
dfp.describe()

Unnamed: 0,date_received,product,sub_product,issue,sub_issue,consumer_complaint_narrative,company_public_response,company,state,zipcode,tags,consumer_consent_provided,submitted_via,date_sent_to_company,company_response_to_consumer,timely_response,consumer_disputed?,complaint_id
count,686898,625019,623154.0,622797,622654.0,622595.0,556699.0,556222,556091,556035,556012.0,522231.0,489617,489461,489450,489448,489444,489444
unique,62631,3819,1640.0,1267,324.0,65471.0,1068.0,4029,263,26982,33828.0,32653.0,191,1586,26,18,14,489443
top,""";""Company chooses not to provide a public res...",Mortgage,,"Loan modification,collection,foreclosure",,,,Bank of America,CA,Yes,,,Web,03/12/2014,Closed with explanation,Yes,No,Racketeering - XXXX
freq,19200,186475,158510.0,97191,370263.0,489276.0,436987.0,52510,71751,40183,422854.0,432499.0,294812,917,353580,477535,392805,2


In [7]:
calc_iv(dfp, 'timely_response', 'consumer_disputed?', pr=False)

(0.14273259342478248,
            Variable                                              Value  \
 0   timely_response                                Case number : XXXX    
 1   timely_response                Creditor Class Medical/Health Care    
 2   timely_response   Excessive Fees, such as Forced -Placed Insura...   
 3   timely_response   Filing forged/faked documents in courts and p...   
 4   timely_response                                         I XXXX gt    
 5   timely_response   Ignoring customer complaints and """" qualifi...   
 6   timely_response   Participate in the active concealment of orig...   
 7   timely_response                                              XXXX    
 8   timely_response                                    XXXX/XXXX/XXXX    
 9   timely_response     admitted to breaking in/and claims dual roles    
 10  timely_response   also XXXX where my only daughter/child and gr...   
 11  timely_response   especially when I was in the VEHICLE OF MY CH...   
 12

In [8]:
dfp['consumer_disputed?'].describe()

count     489444
unique        14
top           No
freq      392805
Name: consumer_disputed?, dtype: object

In [10]:
df_certo=dfp.loc[dfp['consumer_disputed?'].isin(['Yes','No'])]

In [11]:
df_certo.describe()

Unnamed: 0,date_received,product,sub_product,issue,sub_issue,consumer_complaint_narrative,company_public_response,company,state,zipcode,tags,consumer_consent_provided,submitted_via,date_sent_to_company,company_response_to_consumer,timely_response,consumer_disputed?,complaint_id
count,489431,489431,489431.0,489431,489431.0,489431.0,489431.0,489431,489431,489431.0,489431.0,489431.0,489431,489431,489431,489431,489431,489431
unique,1608,11,47.0,95,69.0,280.0,11.0,3376,63,26896.0,4.0,5.0,6,1557,8,2,2,489431
top,06/26/2014,Mortgage,,"Loan modification,collection,foreclosure",,,,Bank of America,CA,,,,Web,03/12/2014,Closed with explanation,Yes,No,465948
freq,916,171617,137947.0,92048,309609.0,489151.0,436985.0,52510,71751,4316.0,422854.0,432499.0,294812,917,353580,477535,392805,1


In [12]:
calc_iv(df_certo, 'timely_response', 'consumer_disputed?', pr=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


(0.007951261416124414,
           Variable Value     All    Good    Bad     Share  Bad Rate  \
 0  timely_response    No   11896   10513   1383  0.024306  0.116258   
 1  timely_response   Yes  477535  382292  95243  0.975694  0.199447   
 
    Distribution Good  Distribution Bad       WoE        IV  
 0           0.026764          0.014313  0.625892  0.007793  
 1           0.973236          0.985687 -0.012712  0.000158  )

In [13]:
calc_iv(df_certo, 'submitted_via', 'consumer_disputed?', pr=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


(0.027804499509781894,
         Variable        Value     All    Good    Bad     Share  Bad Rate  \
 0  submitted_via        Email     344     273     71  0.000703  0.206395   
 1  submitted_via          Fax    8118    6371   1747  0.016587  0.215201   
 2  submitted_via        Phone   40026   33285   6741  0.081781  0.168416   
 3  submitted_via  Postal mail   36752   31398   5354  0.075091  0.145679   
 4  submitted_via     Referral  109379   90696  18683  0.223482  0.170810   
 5  submitted_via          Web  294812  230782  64030  0.602357  0.217189   
 
    Distribution Good  Distribution Bad       WoE        IV  
 0           0.000695          0.000735 -0.055674  0.000002  
 1           0.016219          0.018080 -0.108609  0.000202  
 2           0.084737          0.069764  0.194433  0.002911  
 3           0.079933          0.055410  0.366435  0.008986  
 4           0.230893          0.193354  0.177434  0.006661  
 5           0.587523          0.662658 -0.120344  0.009042  )

In [14]:
calc_iv(df_certo, 'product', 'consumer_disputed?', pr=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


(0.03319812684036684,
    Variable                    Value     All    Good    Bad     Share  \
 0   product  Bank account or service   56874   45954  10920  0.116204   
 1   product            Consumer Loan   17329   13548   3781  0.035406   
 2   product              Credit card   58576   46569  12007  0.119682   
 3   product         Credit reporting   79371   66972  12399  0.162170   
 4   product          Debt collection   83574   69178  14396  0.170757   
 5   product          Money transfers    3150    2748    402  0.006436   
 6   product                 Mortgage  171617  132311  39306  0.350646   
 7   product  Other financial service     450     365     85  0.000919   
 8   product              Payday loan    3157    2670    487  0.006450   
 9   product             Prepaid card    1613    1381    232  0.003296   
 10  product             Student loan   13720   11109   2611  0.028033   
 
     Bad Rate  Distribution Good  Distribution Bad       WoE        IV  
 0   0.192003  

## Separando as variaveis

In [4]:
df=df.select('product','issue','company',
 'state',
 'submitted_via',
 'timely_response',
 'consumer_disputed?')
df = df.na.drop()

## Pre-processamento com pipeline

In [5]:
from numpy import array



#parse data

categoricalColumns=['product','issue','company',                 
 'state',
 'submitted_via',
 'timely_response']

stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

    
label_stringIdx = StringIndexer(inputCol = 'consumer_disputed?', outputCol = 'label')
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in categoricalColumns] 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [6]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features']
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



## Treino e teste split

In [19]:
train, test = df.randomSplit([0.85, 0.15], seed = 2018)


In [20]:
df.select('features')

DataFrame[features: vector]

## Regressao logistica

In [21]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [22]:
predictions = lrModel.transform(test)
predictions.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[0.92221513545515...|       0.0|[0.71549323973741...|
|  0.0|[0.92221513545515...|       0.0|[0.71549323973741...|
|  0.0|[0.92221513545515...|       0.0|[0.71549323973741...|
|  0.0|[1.36118998174428...|       0.0|[0.79595303286145...|
|  0.0|[1.36118998174428...|       0.0|[0.79595303286145...|
|  0.0|[1.36118998174428...|       0.0|[0.79595303286145...|
|  0.0|[1.36118998174428...|       0.0|[0.79595303286145...|
|  0.0|[1.45832628379906...|       0.0|[0.81127655076969...|
|  0.0|[1.48339910852791...|       0.0|[0.81508544708186...|
|  0.0|[1.48339910852791...|       0.0|[0.81508544708186...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [23]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5616130018907806


## Arvore de decisao

In [24]:

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+---------------+----------+--------------------+
|label|  rawPrediction|prediction|         probability|
+-----+---------------+----------+--------------------+
|  0.0|[3717.0,1147.0]|       0.0|[0.76418585526315...|
|  0.0|[3717.0,1147.0]|       0.0|[0.76418585526315...|
|  0.0|[3717.0,1147.0]|       0.0|[0.76418585526315...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
|  0.0| [1973.0,456.0]|       0.0|[0.81226842321943...|
+-----+---------------+----------+--------------------+
only showing top 10 rows



In [25]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5874124124124125


## Random Forest

In [26]:

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

In [27]:
predictions = rfModel.transform(test)
predictions.select( 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[15.6905042865477...|       0.0|[0.78452521432738...|
|  0.0|[15.6905042865477...|       0.0|[0.78452521432738...|
|  0.0|[15.6905042865477...|       0.0|[0.78452521432738...|
|  0.0|[15.9773352419499...|       0.0|[0.79886676209749...|
|  0.0|[15.9773352419499...|       0.0|[0.79886676209749...|
|  0.0|[15.9773352419499...|       0.0|[0.79886676209749...|
|  0.0|[15.9773352419499...|       0.0|[0.79886676209749...|
|  0.0|[16.0803682338124...|       0.0|[0.80401841169062...|
|  0.0|[16.0752516476657...|       0.0|[0.80376258238328...|
|  0.0|[16.0752516476657...|       0.0|[0.80376258238328...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [28]:

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.5975864753642531


## Gradiente Boost

In [29]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.select( 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|  0.0|[0.58051478134439...|       0.0|[0.76151974141210...|
|  0.0|[0.58051478134439...|       0.0|[0.76151974141210...|
|  0.0|[0.58051478134439...|       0.0|[0.76151974141210...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
|  0.0|[0.69286120143209...|       0.0|[0.79990847097634...|
+-----+--------------------+----------+--------------------+
only showing top 10 rows



In [30]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.6150539428317208
