In [1]:
!pip install pyspark
!pip install spark-sklearn
from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark_session = SparkSession.builder.master('local[*]').appName('sparkML').getOrCreate()

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 60kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=2316391971d734c4ee286a33ca6a649e94879f2a0456b30c4073fa2de58d9dfa
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
Coll

In [2]:
url = 'http://www.ppgia.pucpr.br/~jean.barddal/bigdata/sparkml/giveMeLoanKaggle.csv'
spark_session.sparkContext.addFile(url)
df = spark_session.read.csv("file://"+SparkFiles.get("giveMeLoanKaggle.csv"), header=True, inferSchema= True, nullValue='?')

In [3]:
df.printSchema()

root
 |-- RevolvingUtilizationOfUnsecuredLines: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- NumberOfTime30-59DaysPastDueNotWorse: integer (nullable = true)
 |-- DebtRatio: double (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- NumberOfOpenCreditLinesAndLoans: integer (nullable = true)
 |-- NumberOfTimes90DaysLate: integer (nullable = true)
 |-- NumberRealEstateLoansOrLines: integer (nullable = true)
 |-- NumberOfTime60-89DaysPastDueNotWorse: integer (nullable = true)
 |-- NumberOfDependents: integer (nullable = true)
 |-- isFraud: integer (nullable = true)



In [4]:
from pyspark.sql.functions import col
df.groupBy('NumberOfDependents').count().orderBy(col('count').desc()).show()

+------------------+-----+
|NumberOfDependents|count|
+------------------+-----+
|                 0|86903|
|                 1|26316|
|                 2|19522|
|                 3| 9483|
|              null| 3923|
|                 4| 2862|
|                 5|  746|
|                 6|  158|
|                 7|   51|
|                 8|   24|
|                 9|    5|
|                10|    5|
|                13|    1|
|                20|    1|
+------------------+-----+



In [5]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=(['MonthlyIncome','NumberOfDependents']), outputCols=(['OUT_MonthlyIncome','OUT_NumberOfDependents']) )

In [6]:
model = imputer.fit(df)
df_normalizado=model.transform(df)

In [7]:
df_normalizado = df_normalizado[['RevolvingUtilizationOfUnsecuredLines','age','NumberOfTime30-59DaysPastDueNotWorse','DebtRatio','NumberOfOpenCreditLinesAndLoans','NumberOfTimes90DaysLate','NumberRealEstateLoansOrLines','NumberOfTime60-89DaysPastDueNotWorse','OUT_MonthlyIncome','OUT_NumberOfDependents','isFraud']]
df_normalizado.show(5)

+------------------------------------+---+------------------------------------+---------+-------------------------------+-----------------------+----------------------------+------------------------------------+-----------------+----------------------+-------+
|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30-59DaysPastDueNotWorse|DebtRatio|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60-89DaysPastDueNotWorse|OUT_MonthlyIncome|OUT_NumberOfDependents|isFraud|
+------------------------------------+---+------------------------------------+---------+-------------------------------+-----------------------+----------------------------+------------------------------------+-----------------+----------------------+-------+
|                            0.766127| 45|                                   2| 0.802982|                             13|                      0|                           6|                                   0|      

In [8]:
cols_treinamento = list(df_normalizado.columns)
cols_treinamento.remove('isFraud')
print(cols_treinamento)

['RevolvingUtilizationOfUnsecuredLines', 'age', 'NumberOfTime30-59DaysPastDueNotWorse', 'DebtRatio', 'NumberOfOpenCreditLinesAndLoans', 'NumberOfTimes90DaysLate', 'NumberRealEstateLoansOrLines', 'NumberOfTime60-89DaysPastDueNotWorse', 'OUT_MonthlyIncome', 'OUT_NumberOfDependents']


In [9]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols_treinamento, outputCol='features_treinamento')
df_normalizado = assembler.transform(df_normalizado)

In [10]:
df_normalizado.show(5)

+------------------------------------+---+------------------------------------+---------+-------------------------------+-----------------------+----------------------------+------------------------------------+-----------------+----------------------+-------+--------------------+
|RevolvingUtilizationOfUnsecuredLines|age|NumberOfTime30-59DaysPastDueNotWorse|DebtRatio|NumberOfOpenCreditLinesAndLoans|NumberOfTimes90DaysLate|NumberRealEstateLoansOrLines|NumberOfTime60-89DaysPastDueNotWorse|OUT_MonthlyIncome|OUT_NumberOfDependents|isFraud|features_treinamento|
+------------------------------------+---+------------------------------------+---------+-------------------------------+-----------------------+----------------------------+------------------------------------+-----------------+----------------------+-------+--------------------+
|                            0.766127| 45|                                   2| 0.802982|                             13|                      0|         

# Logistic Regeression (Holdout - 10 Folds)

In [32]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline


train, test = df_normalizado.randomSplit([0.7,0.3], seed=10)
lr = LogisticRegression(featuresCol='features_treinamento', labelCol='isFraud')
pipeline = Pipeline(stages=[lr])
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01])\
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="isFraud"),
                          numFolds=10) 

cvModel = crossval.fit(train)




# Logistic Regeression Results

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

avaliador = MulticlassClassificationEvaluator(labelCol='isFraud', predictionCol='prediction', metricName='accuracy')

predicoes_lr = cvModel.transform(test)
predicoes_lr = predicoes_lr.withColumn('isFraud', predicoes_lr ['isFraud'].cast('double'))
predictions_and_labels = predicoes_lr.rdd.map(lambda lp: (lp['prediction'], lp['isFraud']))

calculadora = MulticlassMetrics(predictions_and_labels)


print(f'Tx de acerto = {100.0*calculadora.accuracy}%')
print(f'Recall ponderado = {100.0*calculadora.weightedRecall}%')
print(f'Precisão ponderada = {100.0*calculadora.weightedPrecision}%')


Tx de acerto = 93.4097995545657%
Recall ponderado = 93.4097995545657%
Precisão ponderada = 90.99310663566538%
[[4.1904e+04 2.9000e+01]
 [2.9300e+03 3.7000e+01]]


# Decision Tree Classifier (Holdout - 10 Folds)

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features_treinamento', labelCol='isFraud')
pipeline = Pipeline(stages=[dt])
paramGrid = ParamGridBuilder().build()
     


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="isFraud"),
                          numFolds=10) 

cvModel = crossval.fit(train)



# Decision Tree Results


In [None]:
avaliador = MulticlassClassificationEvaluator(labelCol='isFraud', predictionCol='prediction', metricName='accuracy')

predicoes_dt = cvModel.transform(test)
predicoes_dt = predicoes_rf.withColumn('isFraud', predicoes_dt ['isFraud'].cast('double'))
predictions_and_labels = predicoes_dt.rdd.map(lambda dt: (rf['prediction'], dt['isFraud']))

calculadora = MulticlassMetrics(predictions_and_labels)


print(f'Tx de acerto = {100.0*calculadora.accuracy}%')
print(f'Recall ponderado = {100.0*calculadora.weightedRecall}%')
print(f'Precisão ponderada = {100.0*calculadora.weightedPrecision}%')

# Random Forest Classifier (Holdout - 10 Folds)

In [27]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features_treinamento', labelCol='isFraud', numTrees=50)
pipeline = Pipeline(stages=[rf])
paramGrid = ParamGridBuilder().build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="isFraud"),
                          numFolds=10) 

cvModel = crossval.fit(train)


# Random Forest Results

In [30]:
avaliador = MulticlassClassificationEvaluator(labelCol='isFraud', predictionCol='prediction', metricName='accuracy')

predicoes_rf = cvModel.transform(test)
predicoes_rf = predicoes_rf.withColumn('isFraud', predicoes_rf ['isFraud'].cast('double'))
predictions_and_labels = predicoes_rf.rdd.map(lambda rf: (rf['prediction'], rf['isFraud']))

calculadora = MulticlassMetrics(predictions_and_labels)


print(f'Tx de acerto = {100.0*calculadora.accuracy}%')
print(f'Recall ponderado = {100.0*calculadora.weightedRecall}%')
print(f'Precisão ponderada = {100.0*calculadora.weightedPrecision}%')

Tx de acerto = 93.63474387527839%
Recall ponderado = 93.63474387527839%
Precisão ponderada = 91.792161264034%
