# Artigo no Linkedin

## Classificação usando Spark - RandonForestClassifier

### Dados

Dados de Acidentes da Polícia Rodoviária Federal

# 1. INICIALIZAÇÃO DO NOTEBOOK

In [None]:
####################################
#  Imports

# Pandas
import pandas as pd

# Utilidades
from datetime import date, datetime, timedelta

# Importar o PySpark
import pyspark

# pyspark machine learning
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

# pyspark SQL
from pyspark.sql.functions import when, col, trim, countDistinct, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, LongType

# import SparkSession
from pyspark.sql import SparkSession

# retirar mensagens de warnings
import warnings
warnings.filterwarnings("ignore")


In [None]:
%%time
####################################
# Sessão Pyspark - SparkSession

# Sessão
spark = SparkSession.builder \
    .master('local[4]') \
    .appName("ClassifierCrash") \
    .getOrCreate()

# 2. TRATAMENTO DOS DADOS

In [None]:
%%time
####################################
# Quantidade de anos de dados de acidentes a serem processados
# 1 = 2016 | 2 = 2016 e 2017 | ... | 7 = 2016 até 2021

qtd_anos_processamento = 7

In [None]:
%%time
####################################
#  Definição do Schema - Campos dos CSVs que serão carregados
# Observação: target não é campo do CSV

acidente_schema = StructType([
         StructField("id",IntegerType(),True),
         StructField("data_inversa",StringType(),True),
         StructField("dia_semana",StringType(),True),
         StructField("horario",StringType(),True),
         StructField("uf",StringType(),True),
         StructField("br",IntegerType(),True),
         StructField("km",StringType(),True),
         StructField("municipio",StringType(),True),
         StructField("causa_acidente",StringType(),True),
         StructField("tipo_acidente",StringType(),True),
         StructField("classificacao_acidente",StringType(),True),
         StructField("fase_dia",StringType(),True),
         StructField("sentido_via",StringType(),True),
         StructField("condicao_metereologica",StringType(),True),
         StructField("tipo_pista",StringType(),True),
         StructField("tracado_via",StringType(),True),
         StructField("uso_solo",StringType(),True),
         StructField("pessoas",IntegerType(),True),
         StructField("mortos",IntegerType(),True),
         StructField("feridos_leves",IntegerType(),True),
         StructField("feridos_graves",IntegerType(),True),
         StructField("ilesos",IntegerType(),True),
         StructField("ignorados",IntegerType(),True),
         StructField("feridos",IntegerType(),True),
         StructField("veiculos",IntegerType(),True),
         StructField("target",IntegerType(),True)
         ])

In [None]:
%%time
####################################
#  Procedures e Funções

def _carrega_spark_dataframe(_ano, df=None, mySchema=None, _separador=",", _enconding="latin1"):
    print(f"Início da carga do arquivo de acidentes de {_ano}....", datetime.today())
    
    # Carregar o arquivo para o spark dataframe
    dftmp = spark.read.format("csv").schema(mySchema).option("header","True").option("sep",_separador).option("encoding",_enconding).load(f"./dados/datatran{_ano}.csv")
    # Verificar se foi passado dataframe
    if df==None:
        df = dftmp
    else:
        df = df.union(dftmp)
    
    # print após carga
    print(f"Fim da carga do arquivo de acidentes de {_ano}....", datetime.today())
    print("Total de registros carregados...",dftmp.count())
    print("Total de registros acumulados...",df.count())
    # delete de dataframe temporário
    del dftmp
    # retornar o dataframe concatenado
    return df
    

In [None]:
%%time
# Realização da carga do arquivos para dataframe
# parâmetros: ano dos regitros, dataframe, separador, encoding

if qtd_anos_processamento >= 1:
    dft = _carrega_spark_dataframe("2016", None, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 2:
    dft = _carrega_spark_dataframe("2017", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 3:
    dft = _carrega_spark_dataframe("2018", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 4:
    dft = _carrega_spark_dataframe("2019", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 5:
    dft = _carrega_spark_dataframe("2020", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 6:
    dft = _carrega_spark_dataframe("2021", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 7:
    dft = _carrega_spark_dataframe("2022", dft, acidente_schema, ";","latin1")

In [None]:
%%time
####################################
# Copiar DataFrame

sparkDF = dft

In [None]:
%%time
####################################
#  Atualizar campo target

# Marcar a coluna target - 1 = Acidente Grave | 2 = Acidente não grave
sparkDF = sparkDF.withColumn("target", when(sparkDF.mortos >= 1, 1) \
      .when(sparkDF.feridos_graves >=1, 1) \
      .otherwise(0))


In [None]:
%%time
####################################
# Retirar os campos com colunas vazias - usar o na.drop()

print("Retirada de registros que tem campos nulos ....")
print("Total de registros no Dataframe antes da limpeza = ", sparkDF.count())
sparkDF = sparkDF.na.drop()
print("Total de registros no Dataframe após a limpeza = ", sparkDF.count())

In [None]:
%%time
####################################
# Retirar registros que não farão parte da classificação
# Deixar somente os registros de acidentes com vítimas

# Filtrar

print("Retirada de registros de acidentes sem vítimas e ignorados ....")
print("Total de registros no Dataframe antes da limpeza = ", sparkDF.count())
sparkDF = sparkDF.filter(col("classificacao_acidente") != 'Ignorados')
print("Total de registros no Dataframe após a limpeza de 'Ignorados' = ", sparkDF.count())

sparkDF = sparkDF.filter(col("classificacao_acidente") != 'Sem Vítimas')
print("Total de registros no Dataframe após a limpeza de 'Sem Vítimas' = ", sparkDF.count())


In [None]:
%%time
####################################
# Colunas categóricas - Lista das colunas

categoricalColumns = [ "dia_semana"
                      ,"causa_acidente"
                      ,"tipo_acidente"
                      ,"classificacao_acidente"
                      ,"fase_dia"
                      ,"sentido_via"
                      ,"condicao_metereologica"
                      ,"tipo_pista"
                      ,"tracado_via"
                      ,"uso_solo"
                      ,"pessoas"
                      ,"veiculos"
                     ]


In [None]:
%%time
####################################
# Encode dos dados das Colunas categóricas

# loop 
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol+"_encoded").fit(sparkDF)
    sparkDF = stringIndexer.transform(sparkDF)
    sparkDF = sparkDF.withColumn(categoricalCol+"_encoded", sparkDF[categoricalCol+"_encoded"].cast('int'))
    
    

In [None]:
%%time
####################################
# Print do Schema do dataframe

sparkDF.printSchema()

In [None]:
%%time
####################################
# Criar DataFrame com enconded

encoded_df =  sparkDF.select("dia_semana_encoded"
                      ,"causa_acidente_encoded"
                      ,"tipo_acidente_encoded"
                      ,"classificacao_acidente_encoded"
                      ,"fase_dia_encoded"
                      ,"sentido_via_encoded"
                      ,"condicao_metereologica_encoded"
                      ,"tipo_pista_encoded"
                      ,"tracado_via_encoded"
                      ,"uso_solo_encoded"
                      ,"pessoas_encoded"
                      ,"veiculos_encoded"
                      ,"target")


In [None]:
%%time
####################################
# Preparar features extraction

from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(inputCols=["dia_semana_encoded"
                      ,"causa_acidente_encoded"
                      ,"tipo_acidente_encoded"
                      ,"classificacao_acidente_encoded"
                      ,"fase_dia_encoded"
                      ,"sentido_via_encoded"
                      ,"condicao_metereologica_encoded"
                      ,"tipo_pista_encoded"
                      ,"tracado_via_encoded"
                      ,"uso_solo_encoded"
                      ,"pessoas_encoded"
                      ,"veiculos_encoded"
                                             ],outputCol="features")



In [None]:
%%time
####################################
# Assembler 

output = featureAssembler.transform(encoded_df)

output.withColumnRenamed("target","labels").printSchema()


In [None]:
%%time
####################################
# Mostrar o resultado do assembler 

output.select("features","target").show(5)


In [None]:
%%time
####################################
# Preparação labels 

udf_result = StructType([StructField('target',IntegerType())])

target_dict = {'Não Grave': '0', 'Grave': '1'}


In [None]:
%%time
# função
def assign_labels(target):
    return Row(target_dict[target])

In [None]:
%%time

#assign_labels_udf = F.udf(assign_labels, udf_result)
assign_labels_udf = udf(assign_labels, udf_result)

output.withColumn('labels', assign_labels_udf('target')).drop('target').printSchema()


# 3. SPARK - RANDOM FOREST CLASSIFIER

In [None]:
%%time
####################################
# Separar em treino e teste

# Quantidade de rodadas na variável m
m = 5

# inicialização de variáveis 
resultado = []
l_start_fit_spark = []
l_stop_fit_spark = []
l_start_predict_spark = []
l_stop_predict_spark = []
l_acuracia = []
l_total_registros = []
l_rodada = []

# Quantidade de registros no processamento
total_registros = sparkDF.count()

# classificador com parâmetros básicos
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target')

# loop repetindo o total de registros para avaliar o tempo médio de execução e acurária
for i in range(m):
    # split
    train, test = output.randomSplit([0.7, 0.3])
    
    #train.show(5)
    
    #Training Model
    start_fit_spark =  datetime.today()
    print("Start : RandomForestClassifier ....",start_fit_spark)
    rfModel = rf.fit(train)
    stop_fit_spark =  datetime.today()
    print("Stop  : RandomForestClassifier ....", datetime.today())

    
    #Prediction
    start_predict_spark =  datetime.today()
    print("Start : RandomForestClassifier Transform ....",start_predict_spark)
    predictions = rfModel.transform(test)
    stop_predict_spark =  datetime.today() 
    print("Stop  : RandomForestClassifier Transform ....",stop_predict_spark)
    
    
    #Avaliação da performance
    evaluator = MulticlassClassificationEvaluator()
    evaluator.setLabelCol("target")
    evaluator.setPredictionCol("prediction")
    acucacia = evaluator.evaluate(predictions)

    # guardar resultado
    l_start_fit_spark.append(start_fit_spark)
    l_stop_fit_spark.append(stop_fit_spark)
    l_start_predict_spark.append(start_predict_spark)
    l_stop_predict_spark.append(stop_predict_spark)
    l_acuracia.append(acucacia)
    l_total_registros.append(total_registros)
    l_rodada.append(qtd_anos_processamento)

    

In [None]:
%%time
####################################
# Cria dataframe com os resultados do processamento de loop

df_resultado = pd.DataFrame(zip(l_rodada, l_total_registros, l_start_fit_spark, l_stop_fit_spark, l_start_predict_spark, l_stop_predict_spark, l_acuracia),
                            columns = ['rodada','total_registros', 'start_fit', 'stop_fit', 'start_predict', 'stop_predict', 'acuracia'])

    

In [None]:
%%time
####################################
# Criar campo com o tempo de processamento

df_resultado['tempo_fit'] = df_resultado['stop_fit'] - df_resultado['start_fit']



In [None]:
%%time
####################################
# Imprimir resultado

df_resultado