In [1]:
#sc.stop()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession\
    .builder\
    .master("local[4]")\
    .appName("MLib Random Forests")\
    .getOrCreate()

sc = spark.sparkContext

In [3]:
sc._conf.getAll()

[('spark.app.startTime', '1638916856787'),
 ('spark.driver.port', '40499'),
 ('spark.local.dir', '/home/marco/claseBigData/ProyectoBD/tmp'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '16g'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.driver.host', '192.168.3.5'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1638916857453'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

### Carga de Datos
[Link al Dataset](https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip)

In [4]:
bankData = sc.textFile("clase/bank.csv")
bankData.cache()
bankData.take(10)

['"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"',
 '30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"',
 '33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"yes"',
 '35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"yes"',
 '30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"yes"',
 '59;"blue-collar";"married";"secondary";"no";0;"yes";"no";"unknown";5;"may";226;1;-1;0;"unknown";"no"',
 '35;"management";"single";"tertiary";"no";747;"no";"no";"cellular";23;"feb";141;2;176;3;"failure";"yes"',
 '36;"self-employed";"married";"tertiary";"no";307;"yes";"no";"cellular";14;"may";341;1;330;2;"other";"yes"',
 '39;"technician";"married";"secondary";"no";147;"yes";"no";"cellular";6;"may";151;2;-1;0;"

### Limpieza de datos

In [5]:
# Retirar encabezados
firstLine = bankData.first()
dataLines = bankData.filter(lambda l:l!=firstLine)
dataLines.count()

541

### Preprocesamiento de datos

In [6]:
# Cambia las etiquetas a numericas y crea un objeto Row

import math
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

def transformToNumeric(inputStr):
    
    attList = inputStr.replace("\"","").split(";")
    
    age = float(attList[0])
    # convertir a float
    outcome = 0.0 if attList[16] == "no" else 1.0
    
    # Crear variables de indicadores para soltero/casado
    
    single = 1.0 if attList[2] == "single" else 0.0
    married = 1.0 if attList[2] == "married" else 0.0
    divorced = 1.0 if attList[2] == "divorced" else 0.0
    
    # crear variables indicadores para educacion
    
    primary = 1.0 if attList[3] == "primary" else 0.0
    secondary = 1.0 if attList[3] == "secondary" else 0.0
    tertiary = 1.0 if attList[3] == "tertiary" else 0.0
    
    # convertir default a float
    
    default = 0.0 if attList[4] == "no" else 1.0
    
    # convertir balance a float
    
    balance = float(attList[5])
    
    # convertir prestamo a float
    
    loan = 0.0 if attList[7] == "no" else 1.0
    
    # crear una fila con los datos limpios y datos en buen formato
    values = Row( OUTCOME = outcome, \
                AGE = age, \
                SINGLE = single, \
                MARRIED = married, \
                DIVORCED = divorced, \
                PRIMARY = primary, \
                SECONDARY = secondary, \
                TERTIARY = tertiary, \
                DEFAULT = default, \
                BALANCE = balance, \
                LOAN = loan)
    return values

In [7]:
# cambiar vector
bankRows = dataLines.map(transformToNumeric)
bankData = spark.createDataFrame(bankRows)
bankData.show()

+-------+----+------+-------+--------+-------+---------+--------+-------+-------+----+
|OUTCOME| AGE|SINGLE|MARRIED|DIVORCED|PRIMARY|SECONDARY|TERTIARY|DEFAULT|BALANCE|LOAN|
+-------+----+------+-------+--------+-------+---------+--------+-------+-------+----+
|    0.0|30.0|   0.0|    1.0|     0.0|    1.0|      0.0|     0.0|    0.0| 1787.0| 0.0|
|    1.0|33.0|   0.0|    1.0|     0.0|    0.0|      1.0|     0.0|    0.0| 4789.0| 1.0|
|    1.0|35.0|   1.0|    0.0|     0.0|    0.0|      0.0|     1.0|    0.0| 1350.0| 0.0|
|    1.0|30.0|   0.0|    1.0|     0.0|    0.0|      0.0|     1.0|    0.0| 1476.0| 1.0|
|    0.0|59.0|   0.0|    1.0|     0.0|    0.0|      1.0|     0.0|    0.0|    0.0| 0.0|
|    1.0|35.0|   1.0|    0.0|     0.0|    0.0|      0.0|     1.0|    0.0|  747.0| 0.0|
|    1.0|36.0|   0.0|    1.0|     0.0|    0.0|      0.0|     1.0|    0.0|  307.0| 0.0|
|    0.0|39.0|   0.0|    1.0|     0.0|    0.0|      1.0|     0.0|    0.0|  147.0| 0.0|
|    0.0|41.0|   0.0|    1.0|     0.0|    0

### Hacer el analisis

In [8]:
# ver analisis descriptivo
bankData.describe().toPandas().head()

Unnamed: 0,summary,OUTCOME,AGE,SINGLE,MARRIED,DIVORCED,PRIMARY,SECONDARY,TERTIARY,DEFAULT,BALANCE,LOAN
0,count,541.0,541.0,541.0,541.0,541.0,541.0,541.0,541.0,541.0,541.0,541.0
1,mean,0.3974121996303142,41.26987060998152,0.2754158964879852,0.6155268022181146,0.1090573012939001,0.1534195933456562,0.4953789279112754,0.3142329020332717,0.022181146025878,1444.7818853974122,0.1626617375231053
2,stddev,0.4898154926233514,10.555374170161665,0.4471370479760759,0.4869207382098541,0.3119995822161848,0.360725025449802,0.5004413742994283,0.4646392600205975,0.1474086424402979,2423.2722735171924,0.3693983273588199
3,min,0.0,19.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1206.0,0.0
4,max,1.0,78.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,16873.0,1.0


In [9]:
# encuentra la correlacion entre los predictores y el objetivo
for i in bankData.columns:
    if not (isinstance(bankData.select(i).take(1)[0][0],str)):
        print("Correlation to OUTCOME for ",i, bankData.stat.corr("OUTCOME",i))

Correlation to OUTCOME for  OUTCOME 1.0
Correlation to OUTCOME for  AGE -0.1823210432736525
Correlation to OUTCOME for  SINGLE 0.46323284934360515
Correlation to OUTCOME for  MARRIED -0.3753241299133561
Correlation to OUTCOME for  DIVORCED -0.07812659940926987
Correlation to OUTCOME for  PRIMARY -0.12561548832677982
Correlation to OUTCOME for  SECONDARY 0.026392774894072973
Correlation to OUTCOME for  TERTIARY 0.08494840766635618
Correlation to OUTCOME for  DEFAULT -0.04536965206737378
Correlation to OUTCOME for  BALANCE 0.03657486611997681
Correlation to OUTCOME for  LOAN -0.030420586112717318


### Preparar datos para usar MLlib

In [10]:
def transformToLabeledPoint(row):
    lp = (row["OUTCOME"], Vectors.dense([row["AGE"],\
                                         row["BALANCE"],\
                                         row["DEFAULT"],\
                                         row["DIVORCED"],\
                                        row["LOAN"],\
                                        row["MARRIED"],\
                                        row["PRIMARY"],\
                                        row["SECONDARY"],\
                                        row["SINGLE"],\
                                        row["TERTIARY"]]))
    return lp

In [11]:
bankLp = bankData.rdd.map(transformToLabeledPoint)
bankDf = spark.createDataFrame(bankLp,["label","features"])
bankDf.select("label","features").show(10)
bankDf.cache()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[30.0,1787.0,0.0,...|
|  1.0|[33.0,4789.0,0.0,...|
|  1.0|[35.0,1350.0,0.0,...|
|  1.0|[30.0,1476.0,0.0,...|
|  0.0|[59.0,0.0,0.0,0.0...|
|  1.0|[35.0,747.0,0.0,0...|
|  1.0|[36.0,307.0,0.0,0...|
|  0.0|[39.0,147.0,0.0,0...|
|  0.0|[41.0,221.0,0.0,0...|
|  1.0|[43.0,-88.0,0.0,0...|
+-----+--------------------+
only showing top 10 rows



DataFrame[label: double, features: vector]

### Realizar aprendizaje de maquina

In [12]:
# Hacer PCA
from pyspark.ml.feature import PCA
bankPCA = PCA(k=3, inputCol="features",outputCol="pcaFeatures")
pcaModel = bankPCA.fit(bankDf)
pcaResult = pcaModel.transform(bankDf).select("label","pcaFeatures")
pcaResult.show(truncate = False)



+-----+------------------------------------------------------------+
|label|pcaFeatures                                                 |
+-----+------------------------------------------------------------+
|0.0  |[-1787.018897197381,28.86209683775529,-0.06459982604876241] |
|1.0  |[-4789.020177138492,29.922562636341947,-0.9830243513096373] |
|1.0  |[-1350.022213163262,34.10110809796688,0.8951427168301704]   |
|1.0  |[-1476.0189517184556,29.051333993596703,0.3952723868021948] |
|0.0  |[-0.037889185366442445,58.9897182000177,-0.7290792383661886]|
|1.0  |[-747.0223377634923,34.48829198181773,0.9045654956970108]   |
|1.0  |[-307.0230691022592,35.799850539655225,0.5170631523785976]  |
|0.0  |[-147.02501216176339,38.90107856650329,-0.8069627548799397] |
|0.0  |[-221.0262985348787,40.853633675694944,0.5373036365803221]  |
|1.0  |[87.9723868768871,43.062659441151055,-0.0670164287117152]   |
|0.0  |[-9374.023105550941,32.97645883799288,-0.9511484606914431]  |
|0.0  |[-264.02755731528384,42.824

In [13]:
# Indizacion necesaria como requisito previo para arboles de decision
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(pcaResult)
td = si_model.transform(pcaResult)
td.collect()

[Row(label=0.0, pcaFeatures=DenseVector([-1787.0189, 28.8621, -0.0646]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([-4789.0202, 29.9226, -0.983]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1350.0222, 34.1011, 0.8951]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-1476.019, 29.0513, 0.3953]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-0.0379, 58.9897, -0.7291]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([-747.0223, 34.4883, 0.9046]), indexed=1.0),
 Row(label=1.0, pcaFeatures=DenseVector([-307.0231, 35.7999, 0.5171]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-147.025, 38.9011, -0.807]), indexed=0.0),
 Row(label=0.0, pcaFeatures=DenseVector([-221.0263, 40.8536, 0.5373]), indexed=0.0),
 Row(label=1.0, pcaFeatures=DenseVector([87.9724, 43.0627, -0.067]), indexed=1.0),
 Row(label=0.0, pcaFeatures=DenseVector([-9374.0231, 32.9765, -0.9511]), indexed=0.0),
 Row(label=0.0, pcaFeatures=DenseVector([-264.0276, 42.8248, -0

### Dividiendo datos en entrenamiento y test

In [14]:
(trainingData, testData) = td.randomSplit([0.7,0.3])
print("trainingData.count(): ",trainingData.count())
print("testData.count(): ",testData.count())

trainingData.count():  396
testData.count():  145


In [15]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Crear modelo
rmClassifier = RandomForestClassifier(labelCol="indexed", featuresCol="pcaFeatures")
rmModel = rmClassifier.fit(trainingData)

# Predecir con los datos de test
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","pcaFeatures").collect()

[Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-14093.0337, 47.9412, -0.9569])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-11494.0342, 49.61, -0.9162])),
 Row(prediction=1.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-9374.0231, 32.9765, -0.9511])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-8104.0336, 49.7873, -0.8708])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-5887.0296, 44.2135, 0.4752])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-4012.0313, 47.4169, 0.5121])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-3571.025, 37.7029, 0.4812])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1831.0215, 32.8212, -0.8522])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1811.0266, 40.8265, -0.5194])),
 Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-1808.0

In [16]:
# Evaluar Accuracy

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",\
                               labelCol="indexed", metricName="accuracy")
evaluator.evaluate(predictions)

0.6827586206896552

### Matriz de confusion

In [17]:
predictions.groupBy("indexed","prediction").count().show()

+-------+----------+-----+
|indexed|prediction|count|
+-------+----------+-----+
|    1.0|       1.0|   22|
|    0.0|       1.0|   10|
|    1.0|       0.0|   36|
|    0.0|       0.0|   77|
+-------+----------+-----+



In [18]:
sc.stop()

In [19]:
import sys
print(sys.executable)
print(sys.version)
print(sys.version_info)

/home/marco/anaconda3/bin/python
3.8.3 (default, Jul  2 2020, 16:21:59) 
[GCC 7.3.0]
sys.version_info(major=3, minor=8, micro=3, releaselevel='final', serial=0)
