In [2]:
import os
import sys

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF,IDF
from pyspark.ml.feature import StringIndexer


#from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC, OneVsRest

from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

import pandas as pd

https://spark.apache.org/docs/3.1.2/

https://spark.apache.org/docs/3.1.2/ml-classification-regression.html#linear-support-vector-machine

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.ml.classification.LinearSVC.html

https://machinelearningmastery.com/one-vs-rest-and-one-vs-one-for-multi-class-classification/

https://github.com/tonifuc3m/svm-pyspark/blob/master/svm-spark.ipynb

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.ml.classification.OneVsRest.html#pyspark.ml.classification.OneVsRest



https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.ml.evaluation.MulticlassClassificationEvaluator.html#multiclassclassificationevaluator


https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.html




https://stackoverflow.com/questions/48260412/environment-variables-pyspark-python-and-pyspark-driver-python

In [3]:
os.environ['JAVA_HOME'] = '/opt/jdk' #Mostra aonde está o JDK
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:

spark = SparkSession.builder \
        .appName('app_name') \
        .master('local[*]') \
        .config('spark.sql.execution.arrow.pyspark.enabled', True) \
        .config('spark.sql.session.timeZone', 'UTC') \
        .config('spark.driver.memory','12G') \
        .config('spark.ui.showConsoleProgress', True) \
        .config('spark.sql.repl.eagerEval.enabled', True) \
        .getOrCreate()

sc=spark.sparkContext

21/10/31 16:50:34 WARN Utils: Your hostname, abner-Lenovo-G40-70 resolves to a loopback address: 127.0.1.1; using 192.168.15.20 instead (on interface wlp2s0)
21/10/31 16:50:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/10/31 16:50:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark

In [6]:
caminho_df_test = 'data_test.csv'
caminho_df_training = 'data_training.csv'
#caminho_df_test = 'menordata_test.csv'
#caminho_df_training = 'menordata_training.csv'

df_test = spark.read.csv(caminho_df_test, header=True, inferSchema=True)
df_training = spark.read.csv(caminho_df_training, header=True, inferSchema=True)



In [7]:
df_test.printSchema()

root
 |-- origem: string (nullable = true)
 |-- topico: string (nullable = true)
 |-- arquivo: integer (nullable = true)
 |-- texto: string (nullable = true)



In [8]:
df_training.printSchema()

root
 |-- origem: string (nullable = true)
 |-- topico: string (nullable = true)
 |-- arquivo: integer (nullable = true)
 |-- texto: string (nullable = true)



In [9]:
label_stringIdx = StringIndexer(inputCol = "topico", outputCol = "label", handleInvalid='keep')
tokenization = Tokenizer(inputCol="texto", outputCol="palavras")
remover_stopword = StopWordsRemover(inputCol="palavras", outputCol="palavras_filtradas")
hashingTF = HashingTF(inputCol="palavras_filtradas", outputCol="tf_features")
idf = IDF(inputCol="tf_features", outputCol="tf_idf_features")

In [10]:
lsvc = LinearSVC()
ovr = OneVsRest(featuresCol='tf_idf_features', labelCol='label', classifier=lsvc)

pipelineLSVC = Pipeline(stages=[label_stringIdx, tokenization, remover_stopword, hashingTF, idf, ovr])

In [11]:
lsvcparamGrid = ParamGridBuilder() \
               .addGrid(lsvc.maxIter, [10, 100]) \
               .addGrid(lsvc.regParam, [0.01, 10.0]) \
               .build()     

In [12]:
lsvcCV = CrossValidator(estimator = pipelineLSVC,
                      estimatorParamMaps = lsvcparamGrid,
                      evaluator = MulticlassClassificationEvaluator(predictionCol="prediction"),
                      numFolds = 3)

In [13]:
lsvcCVModel = lsvcCV.fit(df_training)

21/10/31 16:51:24 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:26 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/31 16:51:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/10/31 16:51:28 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:28 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:29 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:30 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/10/31 16:51:

In [14]:
df_test_lsvcCVModel = lsvcCVModel.transform(df_test)

In [15]:
df_predictionAndLabels = df_test_lsvcCVModel.select(['prediction', 'label'])

In [16]:
labels = df_predictionAndLabels.rdd.map(lambda x: x.label).distinct().collect() # transforma o df para rdd e para poder extrair criar uma lista de labels distintas

21/10/31 18:13:22 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB


In [17]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label' )

In [18]:
vec_fMeasureByLabel = []
qtdClasses = 0
metric_list=[]

for label in sorted(labels):
    precisionByLabel = evaluator.evaluate(df_predictionAndLabels, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: label})
    recallByLabel = evaluator.evaluate(df_predictionAndLabels, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: label})
    fMeasureByLabel = evaluator.evaluate(df_predictionAndLabels, {evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: label})
    
    metric_tuple_one = (label, precisionByLabel, recallByLabel,fMeasureByLabel, None , None)
    
    metric_list.append(metric_tuple_one)
    
    vec_fMeasureByLabel.append(fMeasureByLabel)
    qtdClasses +=1 

accuracy = evaluator.evaluate(df_predictionAndLabels, {evaluator.metricName: "accuracy"})
macroF1 = (sum(vec_fMeasureByLabel))/qtdClasses

metric_tuple_two = (None,None,None,None, accuracy, macroF1)

metric_list.append(metric_tuple_two)

21/10/31 18:13:25 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:28 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:30 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:35 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:36 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:38 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:39 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:41 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:42 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
21/10/31 18:13:44 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB


In [19]:
dt_metrics = pd.DataFrame(metric_list)
dt_metrics.columns=(['class', 'precision', 'recall', 'F1', 'accuracy', 'macroF1'])
dt_metrics.to_csv('metrics_svm.csv', index=False)
