In [1]:
import numpy as np
import pandas as pd

from pyspark.sql.functions import lit
from pyspark.sql import SparkSession, SQLContext, Row

from pyspark.sql.functions import col, when
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.feature import Normalizer, VectorIndexer, IndexToString, StringIndexer

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Lendo Dados

In [10]:
spark = SparkSession.builder.appName("dataset").getOrCreate()
dataset = spark.read.csv("heart.csv", header=True) \
    .select("age", "cp", "target")
dataset.show(5, truncate=False)

+---+---+------+
|age|cp |target|
+---+---+------+
|63 |3  |1     |
|37 |2  |1     |
|41 |1  |1     |
|56 |1  |1     |
|57 |0  |1     |
+---+---+------+
only showing top 5 rows



### Normalizando Dados

In [12]:
data_vector = dataset.rdd.map(lambda row: Row(data=Vectors.dense(row[0], row[1]), label=row[2])).toDF()
normalizer = Normalizer(inputCol="data", outputCol="features", p=1.0)
features = normalizer.transform(data_vector).drop("data")
features.show(5, truncate=False)

+-----+-----------------------------------------+
|label|features                                 |
+-----+-----------------------------------------+
|1    |[0.9545454545454546,0.045454545454545456]|
|1    |[0.9487179487179487,0.05128205128205128] |
|1    |[0.9761904761904762,0.023809523809523808]|
|1    |[0.9824561403508771,0.017543859649122806]|
|1    |[1.0,0.0]                                |
+-----+-----------------------------------------+
only showing top 5 rows



### Extraindo Valores Normalizados

In [13]:
def extract(row):
    return (row.label, ) + tuple(row.features.toArray().tolist())

data_frame = features.rdd.map(extract).toDF(["label", "age", "cp"])
data_frame.show(5, truncate=False)

+-----+------------------+--------------------+
|label|age               |cp                  |
+-----+------------------+--------------------+
|1    |0.9545454545454546|0.045454545454545456|
|1    |0.9487179487179487|0.05128205128205128 |
|1    |0.9761904761904762|0.023809523809523808|
|1    |0.9824561403508771|0.017543859649122806|
|1    |1.0               |0.0                 |
+-----+------------------+--------------------+
only showing top 5 rows



### Formato LIBSVM

In [14]:
def libsvm_converter(row):
    return Row(label = row["label"], features=SparseVector(3, [0, 1, 2], [row["age"], row["cp"], 1]))

libsvm = data_frame.rdd.map(libsvm_converter)
sqlContext = SQLContext(spark.sparkContext)
data = sqlContext.createDataFrame(libsvm)
data.show(5, truncate=False)

+---------------------------------------------------------+-----+
|features                                                 |label|
+---------------------------------------------------------+-----+
|(3,[0,1,2],[0.9545454545454546,0.045454545454545456,1.0])|1    |
|(3,[0,1,2],[0.9487179487179487,0.05128205128205128,1.0]) |1    |
|(3,[0,1,2],[0.9761904761904762,0.023809523809523808,1.0])|1    |
|(3,[0,1,2],[0.9824561403508771,0.017543859649122806,1.0])|1    |
|(3,[0,1,2],[1.0,0.0,1.0])                                |1    |
+---------------------------------------------------------+-----+
only showing top 5 rows



### Dados de Treino / Teste

In [15]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(data)

(train, test) = data.randomSplit([0.7, 0.3], seed=10)

### Treinando  o Modelo

In [16]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

model = pipeline.fit(train)

### Classificando Dados de Teste

In [17]:
predictions = model.transform(test)
resumed_predictions = predictions.select("label", "predictedLabel")
resumed_predictions.show(5, truncate=False)

+-----+--------------+
|label|predictedLabel|
+-----+--------------+
|0    |1             |
|1    |1             |
|1    |1             |
|1    |1             |
|1    |1             |
+-----+--------------+
only showing top 5 rows



### Desempenho

In [18]:
evaluator = MulticlassClassificationEvaluator(
      labelCol="indexedLabel" \
    , predictionCol="prediction" \
    , metricName="accuracy")

print("Accuracy = " + str(evaluator.evaluate(predictions)))

Accuracy = 0.7216494845360825
