# Artigo no Linkedin

## Classificação usando Spark - RandonForestClassifier

### Dados

Dados de Acidentes da Polícia Rodoviária Federal

# 1. TRATAMENTO DOS DADOS

In [1]:
####################################
#  Imports

# 
import pandas as pd
import numpy as np

# utilidades
from datetime import date, datetime, timedelta
#
# retirar mensagens de warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
####################################
#  Imports

# Importar o PySpark
import pyspark

# 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# SQL

from pyspark.sql.functions import when, col, trim, countDistinct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# import SparkSession
from pyspark.sql import SparkSession


In [3]:
####################################
# Sessão Pyspark - SparkSession

# Sessão
spark = SparkSession.builder \
    .master('local[4]') \
    .appName("ClassifierCrash") \
    .getOrCreate()

23/03/02 18:55:34 WARN Utils: Your hostname, Luiss-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.188 instead (on interface en0)
23/03/02 18:55:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/02 18:55:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/02 18:55:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
####################################
# Quantidade de anos de dados de acidentes a serem processados
# 1 = 2016 | 2 = 2016 e 2017 | ... | 7 = 2016 até 2021

qtd_anos_processamento = 7

In [5]:
####################################
#  Definição do Schema - Campos dos CSVs que serão carregados
# Observação: target não é campo do CSV

acidente_schema = StructType([
         StructField("id",IntegerType(),True),
         StructField("data_inversa",StringType(),True),
         StructField("dia_semana",StringType(),True),
         StructField("horario",StringType(),True),
         StructField("uf",StringType(),True),
         StructField("br",IntegerType(),True),
         StructField("km",StringType(),True),
         StructField("municipio",StringType(),True),
         StructField("causa_acidente",StringType(),True),
         StructField("tipo_acidente",StringType(),True),
         StructField("classificacao_acidente",StringType(),True),
         StructField("fase_dia",StringType(),True),
         StructField("sentido_via",StringType(),True),
         StructField("condicao_metereologica",StringType(),True),
         StructField("tipo_pista",StringType(),True),
         StructField("tracado_via",StringType(),True),
         StructField("uso_solo",StringType(),True),
         StructField("pessoas",IntegerType(),True),
         StructField("mortos",IntegerType(),True),
         StructField("feridos_leves",IntegerType(),True),
         StructField("feridos_graves",IntegerType(),True),
         StructField("ilesos",IntegerType(),True),
         StructField("ignorados",IntegerType(),True),
         StructField("feridos",IntegerType(),True),
         StructField("veiculos",IntegerType(),True),
         StructField("target",IntegerType(),True)
         ])

In [6]:
####################################
#  Procedures e Funções

def _carrega_spark_dataframe(_ano, df=None, mySchema=None, _separador=",", _enconding="latin1"):
    print(f"Início da carga do arquivo de acidentes de {_ano}....", datetime.today())
    
    # Carregar o arquivo para o spark dataframe
    dftmp = spark.read.format("csv").schema(mySchema).option("header","True").option("sep",_separador).option("encoding",_enconding).load(f"./dados/datatran{_ano}.csv")
    # Verificar se foi passado dataframe
    if df==None:
        df = dftmp
    else:
        df = df.union(dftmp)
    
    # print após carga
    print(f"Fim da carga do arquivo de acidentes de {_ano}....", datetime.today())
    print("Total de registros carregados...",dftmp.count())
    print("Total de registros acumulados...",df.count())
    # delete de dataframe temporário
    del dftmp
    # retornar o dataframe concatenado
    return df
    

In [7]:
# Realização da carga do arquivos para dataframe
# parâmetros: ano dos regitros, dataframe, separador, encoding

if qtd_anos_processamento >= 1:
    dft = _carrega_spark_dataframe("2016", None, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 2:
    dft = _carrega_spark_dataframe("2017", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 3:
    dft = _carrega_spark_dataframe("2018", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 4:
    dft = _carrega_spark_dataframe("2019", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 5:
    dft = _carrega_spark_dataframe("2020", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 6:
    dft = _carrega_spark_dataframe("2021", dft, acidente_schema, ";","latin1")
if qtd_anos_processamento >= 7:
    dft = _carrega_spark_dataframe("2022", dft, acidente_schema, ";","latin1")

Início da carga do arquivo de acidentes de 2016.... 2023-03-02 18:55:37.773111
Fim da carga do arquivo de acidentes de 2016.... 2023-03-02 18:55:39.888112


                                                                                

Total de registros carregados... 96363
Total de registros acumulados... 96363
Início da carga do arquivo de acidentes de 2017.... 2023-03-02 18:55:44.834036
Fim da carga do arquivo de acidentes de 2017.... 2023-03-02 18:55:44.908935
Total de registros carregados... 89567
Total de registros acumulados... 185930
Início da carga do arquivo de acidentes de 2018.... 2023-03-02 18:55:46.339525
Fim da carga do arquivo de acidentes de 2018.... 2023-03-02 18:55:46.417866
Total de registros carregados... 69332
Total de registros acumulados... 255262
Início da carga do arquivo de acidentes de 2019.... 2023-03-02 18:55:47.744995
Fim da carga do arquivo de acidentes de 2019.... 2023-03-02 18:55:47.812930
Total de registros carregados... 67556
Total de registros acumulados... 322818
Início da carga do arquivo de acidentes de 2020.... 2023-03-02 18:55:49.545980
Fim da carga do arquivo de acidentes de 2020.... 2023-03-02 18:55:49.609612
Total de registros carregados... 63576
Total de registros acumula

In [8]:
####################################
# Copiar DataFrame

sparkDF = dft

In [9]:
####################################
#  Fazer limpeza e atualização em campos

# Marcar a coluna target - 1 = Acidente Grave | 2 = Acidente não grave
sparkDF = sparkDF.withColumn("target", when(sparkDF.mortos >= 1, 1) \
      .when(sparkDF.feridos_graves >=1, 1) \
      .otherwise(0))


In [10]:
####################################
# Retirar os campos com colunas vazias - usar o na.drop()

print("Retirada de registros que tem campos nulos ....")
print("Total de registros no Dataframe antes da limpeza = ", sparkDF.count())
sparkDF = sparkDF.na.drop()
print("Total de registros no Dataframe após a limpeza = ", sparkDF.count())

Retirada de registros que tem campos nulos ....
Total de registros no Dataframe antes da limpeza =  515380


23/03/02 18:55:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Total de registros no Dataframe após a limpeza =  514449


                                                                                

In [11]:
####################################
#  Fazer limpeza e atualização em campos

# Filtrar - deixar somente registros com vítimas
print("Retirada de registros de acidentes sem vítimas e ignorados ....")
print("Total de registros no Dataframe antes da limpeza = ", sparkDF.count())
sparkDF = sparkDF.filter(col("classificacao_acidente") != 'Ignorados')
print("Total de registros no Dataframe após a limpeza de 'Ignorados' = ", sparkDF.count())
sparkDF = sparkDF.filter(col("classificacao_acidente") != 'Sem Vítimas')

print("Total de registros no Dataframe após a limpeza de 'Sem Vítimas' = ", sparkDF.count())

#[nan 'COM VÍTIMAS FERIDAS' 'SEM VÍTIMAS' 'COM VÍTIMAS FATAIS' 'IGNORADO'
 
 
 
    

Retirada de registros de acidentes sem vítimas e ignorados ....


                                                                                

Total de registros no Dataframe antes da limpeza =  514449


                                                                                

Total de registros no Dataframe após a limpeza de 'Ignorados' =  514449




Total de registros no Dataframe após a limpeza de 'Sem Vítimas' =  388017


                                                                                

In [12]:
####################################
# Colunas categóricas - Lista das colunas

categoricalColumns = [ "dia_semana"
                      #,"horario"
                      #,"br"
                      ,"causa_acidente"
                      ,"tipo_acidente"
                      ,"classificacao_acidente"
                      ,"fase_dia"
                      ,"sentido_via"
                      ,"condicao_metereologica"
                      ,"tipo_pista"
                      ,"tracado_via"
                      ,"uso_solo"
                      ,"pessoas"
                      ,"veiculos"
                      #,"hora"
                     ]


In [13]:
####################################
# Encode dos dados das Colunas categóricas

# loop 
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol+"_encoded").fit(sparkDF)
    sparkDF = stringIndexer.transform(sparkDF)
    sparkDF = sparkDF.withColumn(categoricalCol+"_encoded", sparkDF[categoricalCol+"_encoded"].cast('int'))
    
    

                                                                                

In [14]:
sparkDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- data_inversa: string (nullable = true)
 |-- dia_semana: string (nullable = true)
 |-- horario: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- br: integer (nullable = true)
 |-- km: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- causa_acidente: string (nullable = true)
 |-- tipo_acidente: string (nullable = true)
 |-- classificacao_acidente: string (nullable = true)
 |-- fase_dia: string (nullable = true)
 |-- sentido_via: string (nullable = true)
 |-- condicao_metereologica: string (nullable = true)
 |-- tipo_pista: string (nullable = true)
 |-- tracado_via: string (nullable = true)
 |-- uso_solo: string (nullable = true)
 |-- pessoas: integer (nullable = true)
 |-- mortos: integer (nullable = true)
 |-- feridos_leves: integer (nullable = true)
 |-- feridos_graves: integer (nullable = true)
 |-- ilesos: integer (nullable = true)
 |-- ignorados: integer (nullable = true)
 |-- feridos: integer (nullable

In [15]:
####################################
# Criar DataFrame com enconded

encoded_df =  sparkDF.select("dia_semana_encoded"
                      ,"causa_acidente_encoded"
                      ,"tipo_acidente_encoded"
                      ,"classificacao_acidente_encoded"
                      ,"fase_dia_encoded"
                      ,"sentido_via_encoded"
                      ,"condicao_metereologica_encoded"
                      ,"tipo_pista_encoded"
                      ,"tracado_via_encoded"
                      ,"uso_solo_encoded"
                      ,"pessoas_encoded"
                      ,"veiculos_encoded"
                      #,"hora_encoded"
                      ,"target")


In [16]:
# mostrar 5 primeiras linhas
encoded_df.count()

                                                                                

388017

In [17]:
####################################
# Preparar features extraction

from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(inputCols=["dia_semana_encoded"
                      ,"causa_acidente_encoded"
                      ,"tipo_acidente_encoded"
                      ,"classificacao_acidente_encoded"
                      ,"fase_dia_encoded"
                      ,"sentido_via_encoded"
                      ,"condicao_metereologica_encoded"
                      ,"tipo_pista_encoded"
                      ,"tracado_via_encoded"
                      ,"uso_solo_encoded"
                      ,"pessoas_encoded"
                      ,"veiculos_encoded"
                      #,"hora_encoded"
                                             ],outputCol="features")



In [18]:
####################################
# Assembler 

output = featureAssembler.transform(encoded_df)
#output2 = labelsAssembler.transform(encoded_df)

output.withColumnRenamed("target","labels").printSchema()


root
 |-- dia_semana_encoded: integer (nullable = true)
 |-- causa_acidente_encoded: integer (nullable = true)
 |-- tipo_acidente_encoded: integer (nullable = true)
 |-- classificacao_acidente_encoded: integer (nullable = true)
 |-- fase_dia_encoded: integer (nullable = true)
 |-- sentido_via_encoded: integer (nullable = true)
 |-- condicao_metereologica_encoded: integer (nullable = true)
 |-- tipo_pista_encoded: integer (nullable = true)
 |-- tracado_via_encoded: integer (nullable = true)
 |-- uso_solo_encoded: integer (nullable = true)
 |-- pessoas_encoded: integer (nullable = true)
 |-- veiculos_encoded: integer (nullable = true)
 |-- labels: integer (nullable = false)
 |-- features: vector (nullable = true)



In [19]:
####################################
# Mostrar o resultado do assembler 

output.select("features","target").show(5)


+--------------------+------+
|            features|target|
+--------------------+------+
|(12,[0,1,4,6,9],[...|     0|
|(12,[0,1,4,6,7,9]...|     0|
|[9.0,5.0,20.0,0.0...|     1|
|[9.0,3.0,14.0,0.0...|     0|
|[9.0,3.0,3.0,0.0,...|     0|
+--------------------+------+
only showing top 5 rows



In [20]:
# labels
# using custom schema to make sure "species" column is "string" datatype. 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

#udf_result = StructType([StructField('target',StringType())])
udf_result = StructType([StructField('target',IntegerType())])

target_dict = {'Não Grave': '0', 'Grave': '1'}


In [21]:
from pyspark.sql import functions as F
def assign_labels(target):
    return Row(target_dict[target])

In [22]:

assign_labels_udf = F.udf(assign_labels, udf_result)
output.withColumn('labels', assign_labels_udf('target')).drop('target').printSchema()
# since "species" column is labeled under "labels", droped species column to avoid redundancy



root
 |-- dia_semana_encoded: integer (nullable = true)
 |-- causa_acidente_encoded: integer (nullable = true)
 |-- tipo_acidente_encoded: integer (nullable = true)
 |-- classificacao_acidente_encoded: integer (nullable = true)
 |-- fase_dia_encoded: integer (nullable = true)
 |-- sentido_via_encoded: integer (nullable = true)
 |-- condicao_metereologica_encoded: integer (nullable = true)
 |-- tipo_pista_encoded: integer (nullable = true)
 |-- tracado_via_encoded: integer (nullable = true)
 |-- uso_solo_encoded: integer (nullable = true)
 |-- pessoas_encoded: integer (nullable = true)
 |-- veiculos_encoded: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- labels: struct (nullable = true)
 |    |-- target: integer (nullable = true)



# 4. SPARK - RANDOM FOREST CLASSIFIER

In [23]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
m = 5

In [24]:
####################################
# Separar em treino e teste

resultado = []
l_start_fit_spark = []
l_stop_fit_spark = []
l_start_predict_spark = []
l_stop_predict_spark = []
l_acuracia = []
l_total_registros = []
l_rodada = []

total_registros = sparkDF.count()

# classificador
#rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target', numTrees = 500, maxDepth = 5)
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'target')

# loop repetindo o total usado para o teste não spark
for i in range(m):
    # split
    train, test = output.randomSplit([0.7, 0.3])
    
    #train.show(5)
    
    #Training Model
    start_fit_spark =  datetime.today()
    print("Start : RandomForestClassifier ....",start_fit_spark)
    rfModel = rf.fit(train)
    stop_fit_spark =  datetime.today()
    print("Stop  : RandomForestClassifier ....", datetime.today())

    
    #Prediction
    start_predict_spark =  datetime.today()
    print("Start : RandomForestClassifier Transform ....",start_predict_spark)
    predictions = rfModel.transform(test)
    stop_predict_spark =  datetime.today() 
    print("Stop  : RandomForestClassifier Transform ....",stop_predict_spark)
    
    
    #Evaluating the performance
    evaluator = MulticlassClassificationEvaluator()
    evaluator.setLabelCol("target")
    evaluator.setPredictionCol("prediction")
    acucacia = evaluator.evaluate(predictions)

    # guardar resultado
    #rodada = [start_fit, stop_fit, start_proba, stop_proba, start_predict, stop_predict, acuracia_teste]
    #resultado.append(rodada)
    l_start_fit_spark.append(start_fit_spark)
    l_stop_fit_spark.append(stop_fit_spark)
    #l_start_proba.append(start_proba)
    #l_stop_proba.append(stop_proba)
    l_start_predict_spark.append(start_predict_spark)
    l_stop_predict_spark.append(stop_predict_spark)
    l_acuracia.append(acucacia)
    l_total_registros.append(total_registros)
    l_rodada.append(qtd_anos_processamento)

    

                                                                                

Start : RandomForestClassifier .... 2023-03-02 18:56:42.274547


                                                                                

Stop  : RandomForestClassifier .... 2023-03-02 18:57:09.263407
Start : RandomForestClassifier Transform .... 2023-03-02 18:57:09.263746
Stop  : RandomForestClassifier Transform .... 2023-03-02 18:57:09.370358


                                                                                

Start : RandomForestClassifier .... 2023-03-02 18:57:14.293908


                                                                                

Stop  : RandomForestClassifier .... 2023-03-02 18:57:34.556063
Start : RandomForestClassifier Transform .... 2023-03-02 18:57:34.556247
Stop  : RandomForestClassifier Transform .... 2023-03-02 18:57:34.670888


                                                                                

Start : RandomForestClassifier .... 2023-03-02 18:57:38.342312


                                                                                

Stop  : RandomForestClassifier .... 2023-03-02 18:57:57.179783
Start : RandomForestClassifier Transform .... 2023-03-02 18:57:57.180160
Stop  : RandomForestClassifier Transform .... 2023-03-02 18:57:57.310763


                                                                                

Start : RandomForestClassifier .... 2023-03-02 18:58:01.065718


                                                                                

Stop  : RandomForestClassifier .... 2023-03-02 18:58:20.406903
Start : RandomForestClassifier Transform .... 2023-03-02 18:58:20.407244
Stop  : RandomForestClassifier Transform .... 2023-03-02 18:58:20.489025


                                                                                

Start : RandomForestClassifier .... 2023-03-02 18:58:25.400701


                                                                                

Stop  : RandomForestClassifier .... 2023-03-02 18:58:43.821245
Start : RandomForestClassifier Transform .... 2023-03-02 18:58:43.821589
Stop  : RandomForestClassifier Transform .... 2023-03-02 18:58:43.918550


                                                                                

In [25]:
df_resultado = pd.DataFrame(zip(l_rodada, l_total_registros, l_start_fit_spark, l_stop_fit_spark, l_start_predict_spark, l_stop_predict_spark, l_acuracia),
                            columns = ['rodada','total_registros', 'start_fit', 'stop_fit', 'start_predict', 'stop_predict', 'acuracia'])

    

In [26]:
from dateutil.relativedelta import relativedelta

df_resultado['tempo_fit'] = df_resultado['stop_fit'] - df_resultado['start_fit']



In [28]:
df_resultado

Unnamed: 0,rodada,total_registros,start_fit,stop_fit,start_predict,stop_predict,acuracia,tempo_fit
0,7,388017,2023-03-02 18:56:42.274547,2023-03-02 18:57:09.263356,2023-03-02 18:57:09.263746,2023-03-02 18:57:09.370358,0.701159,0 days 00:00:26.988809
1,7,388017,2023-03-02 18:57:14.293908,2023-03-02 18:57:34.556006,2023-03-02 18:57:34.556247,2023-03-02 18:57:34.670888,0.705421,0 days 00:00:20.262098
2,7,388017,2023-03-02 18:57:38.342312,2023-03-02 18:57:57.179708,2023-03-02 18:57:57.180160,2023-03-02 18:57:57.310763,0.700493,0 days 00:00:18.837396
3,7,388017,2023-03-02 18:58:01.065718,2023-03-02 18:58:20.406856,2023-03-02 18:58:20.407244,2023-03-02 18:58:20.489025,0.700597,0 days 00:00:19.341138
4,7,388017,2023-03-02 18:58:25.400701,2023-03-02 18:58:43.821198,2023-03-02 18:58:43.821589,2023-03-02 18:58:43.918550,0.700028,0 days 00:00:18.420497
