In [33]:
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.sql import Row

from pyspark.sql.functions import *

INPUT_PATH = "../data/external/acidentes*.csv"
OUTPUT_PATH = "../data/processed/"
VERSION = "v1"

In [34]:
spark = SparkSession.builder.appName("ds-classify").getOrCreate()
df = spark.read.option('encoding', 'ISO-8859-1').csv(INPUT_PATH, header=True, sep=';')

In [3]:
df.show()

24/02/22 10:30:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+-----------+-----------+----------------------+----------+-----------------+--------+----------+---------------+--------------------+----------------------+--------------+-------------+-----+-------------+------+-------------+--------------+------+------------+------------+--------+---------+---------------+
|    id| pesid|data_inversa|  dia_semana| horario| uf| br|   km|           municipio|      causa_acidente|       tipo_acidente|classificacao_acidente|   fase_dia|sentido_via|condicao_metereologica|tipo_pista|      tracado_via|uso_solo|id_veiculo|   tipo_veiculo|               marca|ano_fabricacao_veiculo|tipo_envolvido|estado_fisico|idade|         sexo|ilesos|feridos_leves|feridos_graves|mortos|    latitude|   longitude|regional|delegacia|            uop|
+------+------+------------+------------+--------+---+---+-----+----------------

In [35]:
df = df.withColumn("idade", when(df.idade == "NA", None) \
      .otherwise(df.idade))

In [36]:
from pyspark.sql.types import IntegerType
df = df.withColumn("idade", col("idade").cast(IntegerType()))

In [4]:
df.schema.__dict__

{'fields': [StructField('id', StringType(), True),
  StructField('pesid', StringType(), True),
  StructField('data_inversa', StringType(), True),
  StructField('dia_semana', StringType(), True),
  StructField('horario', StringType(), True),
  StructField('uf', StringType(), True),
  StructField('br', StringType(), True),
  StructField('km', StringType(), True),
  StructField('municipio', StringType(), True),
  StructField('causa_acidente', StringType(), True),
  StructField('tipo_acidente', StringType(), True),
  StructField('classificacao_acidente', StringType(), True),
  StructField('fase_dia', StringType(), True),
  StructField('sentido_via', StringType(), True),
  StructField('condicao_metereologica', StringType(), True),
  StructField('tipo_pista', StringType(), True),
  StructField('tracado_via', StringType(), True),
  StructField('uso_solo', StringType(), True),
  StructField('id_veiculo', StringType(), True),
  StructField('tipo_veiculo', StringType(), True),
  StructField('mar

In [5]:
dataset = df.select(col("id"), col("data_inversa"), col("uf"), col("sentido_via"), col("tipo_acidente"),  col("km"), col("causa_acidente"), col("br"), col("fase_dia"), col("estado_fisico"), col("condicao_metereologica"), col("tipo_pista"),col("ano_fabricacao_veiculo"), col("tipo_veiculo"), col("idade"), col("sexo"), col("mortos"))

In [6]:
dataset.show()

+------+------------+---+-----------+--------------------+-----+--------------------+---+-----------+-------------+----------------------+----------+----------------------+---------------+-----+-------------+------+
|    id|data_inversa| uf|sentido_via|       tipo_acidente|   km|      causa_acidente| br|   fase_dia|estado_fisico|condicao_metereologica|tipo_pista|ano_fabricacao_veiculo|   tipo_veiculo|idade|         sexo|mortos|
+------+------------+---+-----------+--------------------+-----+--------------------+---+-----------+-------------+----------------------+----------+----------------------+---------------+-----+-------------+------+
|182256|  2019-01-01| CE|Decrescente|Atropelamento de ...|136,9|    Animais na Pista|116|  Amanhecer|        Ileso|                 Vento|   Simples|                  2012|       Caminhão|   35|    Masculino|     0|
|182263|  2019-01-01| MT|Decrescente|            Incêndio|599,5|Defeito Mecânico ...|158|  Amanhecer|        Ileso|        Garoa/Chuvisc

In [7]:
import holidays

def get_holiday_name_by_state(date, state):
    # Ajuste para garantir que a biblioteca holidays reconheça o estado
    # Se o estado não for fornecido ou não for válido, a função usará os feriados nacionais
    if state not in holidays.Brazil.subdivisions:
        state = None
    
    br_holidays = holidays.Brazil(subdiv=state) if state else holidays.Brazil()
    return br_holidays.get(date, default=None)
# Registrando a função como uma UDF
get_holiday_name_by_state_udf = udf(get_holiday_name_by_state, StringType())

In [8]:
from pyspark.sql.types import DateType

dataset = dataset.withColumn("feriado", get_holiday_name_by_state_udf(col("data_inversa"), col("uf")))

dataset = dataset.withColumn("date", col("data_inversa").cast(DateType()).alias('date')).select(
    '*',
    year("date").alias('ano'), 
    month("date").alias('mes'), 
    dayofmonth("date").alias('dia')
).drop("data_inversa")

In [9]:
dataset.show()



+------+---+-----------+--------------------+-----+--------------------+---+-----------+-------------+----------------------+----------+----------------------+---------------+-----+-------------+------+--------------------+----------+----+---+---+
|    id| uf|sentido_via|       tipo_acidente|   km|      causa_acidente| br|   fase_dia|estado_fisico|condicao_metereologica|tipo_pista|ano_fabricacao_veiculo|   tipo_veiculo|idade|         sexo|mortos|             feriado|      date| ano|mes|dia|
+------+---+-----------+--------------------+-----+--------------------+---+-----------+-------------+----------------------+----------+----------------------+---------------+-----+-------------+------+--------------------+----------+----+---+---+
|182256| CE|Decrescente|Atropelamento de ...|136,9|    Animais na Pista|116|  Amanhecer|        Ileso|                 Vento|   Simples|                  2012|       Caminhão|   35|    Masculino|     0|Confraternização ...|2019-01-01|2019|  1|  1|
|182263|

                                                                                

In [10]:
dataset.count()

                                                                                

783053

In [11]:
dataset.groupBy("mortos").count().show()



+------+------+
|mortos| count|
+------+------+
|     0|755972|
|     1| 27081|
+------+------+



                                                                                

In [28]:
pd.set_option('display.max_rows', 500)

(dataset
    .filter(col('causa_acidente').isin(['Ingestão de Álcool', 'Ingestão de álcool pelo condutor'])) 
    .groupBy("causa_acidente")
    .count()
).toPandas()

                                                                                

Unnamed: 0,causa_acidente,count
0,Ingestão de Álcool,23723
1,Ingestão de álcool pelo condutor,27592


In [29]:
dataset = dataset.filter(col('causa_acidente').isin(['Ingestão de Álcool', 'Ingestão de álcool pelo condutor'])) 

In [30]:
# Ingestão de álcool pelo condutor
# Ingestão de Álcool	

In [32]:
dataset.write.mode('overwrite').parquet(f"{OUTPUT_PATH}/dataset_{VERSION}")


24/02/22 10:43:16 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [None]:
df.group