### Alunos:
- **Laianna Lana Virginio da Silva** - *llvs2@cin.ufpe.br*
- **Lucas Natan Correia Couri** - *lncc2@cin.ufpe.br*

# Bibliotecas

In [1]:
# O Google Colab é executado em um servidor remoto

!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Instalando o Java

!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz # Baixando o Spark
!tar xf spark-3.2.1-bin-hadoop3.2.tgz # Descompacta o arquivo spark-3.1.1-bin-hadoop3.2.tgz.

!pip install -q findspark # Instalando o spark

In [2]:
import os # Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.
import findspark

In [3]:
# configurando as variáveis de ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [4]:
# Com as variáveis definidas, podemos utilizar o findspark que vai permitir a importação dos pacotes necessários para utilizar o PySpark.
findspark.init()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.functions import sum
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [6]:
spark = SparkSession.builder.appName("Spark DataFrames parte 2").master("local[*]").getOrCreate()
spark

# Carregando os Dados

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
diretorio = "/content/drive/MyDrive/Colab Notebooks/11. PD em Larga Escala/dados"

In [9]:
# VAMOS COMEÇAR ENTÃO A IMPORTAÇÃO DOS ARQUIVOS. Vamos começar então pelo formato Parquet. 

pt7_parquet = spark.read.format("parquet").load(f"{diretorio}/pt7-hash.parquet")

#multi_parquet = spark.read.format("parquet").load(f"{diretorio}/pt7-multilabel").withColumnRenamed("text64byte", "words")

# Parte 1 (DataFrames)

In [10]:
pt7_parquet.show(5)
pt7_parquet.schema

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[561,1758...|
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[1758,215...|
+-----+--------------------+
only showing top 5 rows



StructType(List(StructField(label,StringType,true),StructField(features,VectorUDT,true)))

In [11]:
pt7_parquet.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(262144,[1758,215...|
|(262144,[1758,215...|
|(262144,[561,1758...|
|(262144,[1758,215...|
|(262144,[1758,215...|
+--------------------+
only showing top 5 rows



In [12]:
pt7_parquet.where(func.col("label") == ".ao").show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[561,1758...|
|  .ao|(262144,[1758,215...|
|  .ao|(262144,[1758,215...|
+-----+--------------------+
only showing top 5 rows



# Parte 2

In [13]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#from pyspark.ml.classification import RandomForestClassificationModel

#from pyspark.mllib.evaluation import BinaryClassificationMetrics

from pyspark.ml.feature import StringIndexer
#from pyspark.sql.functions import col, when

SEED = 42

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

## Codificando a Label pra Inteiro

In [15]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
label_indexador = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(pt7_parquet)

## Separando o Dataset em Treino e Teste

In [16]:
a, b = pt7_parquet.randomSplit(weights = [0.01, 0.3], seed = SEED)

In [17]:
treino, teste = a.randomSplit(weights = [0.7, 0.3], seed = SEED)

In [18]:
print(f"Total:  {a.count()}\nTreino: {treino.count()}\nTeste:  {teste.count()}")

Total:  568
Treino: 408
Teste:  160


In [19]:
print(f"Total:  {pt7_parquet.count()}\nTreino: {treino.count()}\nTeste:  {teste.count()}")

Total:  17014
Treino: 408
Teste:  160


## Classificador

In [20]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol = "indexedLabel", featuresCol = "features", numTrees = 10)

In [21]:
# Convert indexed labels back to original labels.
label_conversor = IndexToString(inputCol = "prediction", outputCol = "predictedLabel", labels = label_indexador.labels)

In [22]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages = [label_indexador, rf, label_conversor])

In [23]:
# Train model.  This also runs the indexers.
modelo = pipeline.fit(treino)

In [24]:
print(modelo) # apenas o sumário

PipelineModel_a7a93ad36691


In [25]:
# Make predictions.
predicao = modelo.transform(teste)

In [26]:
# Select example rows to display.
predicao.show(1)

+-----+--------------------+------------+--------------------+--------------------+----------+--------------+
|label|            features|indexedLabel|       rawPrediction|         probability|prediction|predictedLabel|
+-----+--------------------+------------+--------------------+--------------------+----------+--------------+
|  .ao|(262144,[188,452,...|         3.0|[3.01520696828720...|[0.30152069682872...|       0.0|           .br|
+-----+--------------------+------------+--------------------+--------------------+----------+--------------+
only showing top 1 row



## Salvando e Carregando o Modelo

In [28]:
#modelo.save(f"{diretorio}/modelo")

In [None]:
#modelo_load = RandomForestClassificationModel.load(f"{diretorio}/modelo")

## Acurácia

In [None]:
# Select (prediction, true label) and compute test error
avaliador_acuracia = MulticlassClassificationEvaluator(labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
acuracia = avaliador_acuracia.evaluate(predicao)

print(f"Acurácia = {acuracia}")

## F1

In [None]:
evaluator_f1 = MulticlassClassificationEvaluator(labelCol = "indexedLabel", predictionCol = "prediction", metricName = "f1")

f1 = evaluator_f1.evaluate(predicao)
print(f"F1 = {f1}")

## Sumário

In [None]:
rf_model = modelo.stages[2]
print(rf_model)  # summary only

## Curva ROC

In [None]:
#avaliador_roc = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = 'areaUnderROC')
#asc_roc = avaliador_roc.evaluate(predicao)

#print(f"Área Sob a Curva ROC = {asc_roc}")

# Teste

In [29]:
f = open("demofile3.txt", "w")
f.write("Woops! I have deleted the content!")
f.close()