In [0]:
from pyspark.sql import SparkSession 


In [0]:
spark = SparkSession.builder.appName("SVM_MLP").getOrCreate()

In [0]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/Modulo4/,Modulo4/,0
dbfs:/FileStore/tables/adult_data.csv,adult_data.csv,5608318
dbfs:/FileStore/tables/aula_2_TPD.ipynb,aula_2_TPD.ipynb,174755
dbfs:/FileStore/tables/cap2/,cap2/,0
dbfs:/FileStore/tables/cap3/,cap3/,0
dbfs:/FileStore/tables/cap4/,cap4/,0
dbfs:/FileStore/tables/cap5/,cap5/,0
dbfs:/FileStore/tables/cap6/,cap6/,0
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551
dbfs:/FileStore/tables/regressaoLinear.csv,regressaoLinear.csv,564


In [0]:
diretorioArvore="/FileStore/tables/iris_bezdekIris.csv"

In [0]:
#lendo arquivos armazenados CSV com o esquema definido
df_iris = spark.read.format('csv').options(inferSchema=True,header='false',delimiter=',').load(diretorioArvore)

In [0]:
df_iris.printSchema()

In [0]:
#modificando o nome das colunas existentes no cabeçalho 
df_iris = df_iris.selectExpr("_c0 as sep_len", "_c1 as sep_wid", "_c2 as pet_len", "_c3 as pet_wid", "_c4 as label")

In [0]:
df_iris.show(5)

Conhecendo o Banco de Dados

In [0]:
#encontrando as "estatísticas"
df_iris.describe(['sep_len','sep_wid','pet_len','pet_wid']).show()

In [0]:
#definindo a visão do dataframe para ser utilizado como uma tabela pelo SQL
df_iris.createOrReplaceTempView("irisTable")

In [0]:
display(spark.sql('select * from irisTable '))

sep_len,sep_wid,pet_len,pet_wid,label
5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa


Iniciando o Processo de Construção e Aplicação do SVM

In [0]:
from pyspark.ml.linalg import Vectors  
from pyspark.ml.feature import VectorAssembler 

In [0]:
vector_assembler = VectorAssembler(inputCols=["sep_len", "sep_wid", "pet_len", "pet_wid"],outputCol="features")
df_temp = vector_assembler.transform(df_iris)
df_temp.show(5)

In [0]:
df_temp.printSchema()

In [0]:
#removendo as colunas que não serão utilizadas
df_menor = df_temp.drop('sep_len', 'sep_wid', 'pet_len', 'pet_wid')
df_menor.show(5)

In [0]:
#codificação da coluna label 
from pyspark.ml.feature import StringIndexer 

l_indexer = StringIndexer(inputCol="label", outputCol="labelEncoder") 
df_final = l_indexer.fit(df_menor).transform(df_menor)

In [0]:
df_final.show(5)

In [0]:
df_final.printSchema()

Normalizando os dados

In [0]:
#normalizando os dados
from pyspark.ml.feature import MinMaxScaler  #biblioteca para colocar os valores entre 0 e 1
scaler = MinMaxScaler( inputCol="features", outputCol="scaledFeatures")  #criando o objeto para a escala
scalerModel = scaler.fit(df_final)

In [0]:
df_final = scalerModel.transform(df_final).drop('features').withColumnRenamed('scaledFeatures', 'features')

In [0]:
df_final.show()

Modificando o Dataset Para a Classificação Binária - SVF só classifica binário

In [0]:
import pyspark.sql.functions as F
df_SVM=df_final.where((F.col("labelEncoder") == 0) | (F.col("labelEncoder") == 1))  #Transforma o dataset em um problema de classificação binária

In [0]:
df_SVM.show(150)

In [0]:
#removendo as colunas que não serão utilizadas
df_SVM = df_SVM.drop('label')
df_SVM.show(130)

In [0]:
df_SVM=df_SVM.selectExpr('features',"labelEncoder as label")

In [0]:
df_SVM.show(5)

Dividindo o dataset entre treinamento e teste

In [0]:
#dividindo entre dados de treinamento e teste
(train, test) = df_SVM.randomSplit([0.7, 0.3])

In [0]:
train.show(100)

In [0]:
print("Dados para treinamento: ", train.count())
print("Dados para teste: ", test.count())

Definindo o Algoritmo

In [0]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel  #modelo de svm
from pyspark.ml.evaluation import MulticlassClassificationEvaluator  #utilizada para encontrar as métricas de desempenho
from pyspark.mllib.linalg import Vectors  #vetores densos

In [0]:
from pyspark.mllib.util import MLUtils
df_train = MLUtils.convertVectorColumnsFromML(train, "features")
df_test = MLUtils.convertVectorColumnsFromML(test, "features")

In [0]:
df_train.show(5,False)

In [0]:
from pyspark.mllib.regression import LabeledPoint  #cria a "linha" (características e label) a ser utilizada

trainingData = df_train.rdd.map(lambda row:LabeledPoint(row.label,row.features))  #aplica o label ao treinamento
testingData = df_test.rdd.map(lambda row:LabeledPoint(row.label,row.features))  #aplica o label ao teste

In [0]:
for xs in trainingData.take(10):
        print(xs)

In [0]:
#contrução do modelo
modelSVM = SVMWithSGD.train(trainingData, iterations=100)

In [0]:
#realizando a previsão
labelsAndPreds = testingData.map(lambda p: (p.label, modelSVM.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(testingData.count())
print("Erro na previsão: ",trainErr)