In [1]:
from pyspark.sql.functions import col, regexp_replace, year, substring, to_date, regexp_extract, asc, desc, split, when, count, trim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import geopandas as gpd
import pandas as pd
import shapely
import calendar
import requests
import locale
import boto3
import os

In [2]:
URL = "https://dadosabertos.poa.br/api/3/action/datastore_search?resource_id=a46aaaca-8cc1-4082-aa78-ce9f859e2df5&limit=30000"

load_dotenv()

# AWS CONFIGS
ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
REGION_NAME = os.getenv('AWS_REGION')
BUCKET_NAME = 'traffic-accident-porto-alegre'

# PYSPARK CONFIGS
spark = SparkSession.builder \
    .appName("Traffic Accident Analysis") \
    .getOrCreate()

data = requests.get(URL).json()
records = data["result"]["records"]

# Check if the data is retrieved correctly from URL
try:
    if records:
        print("- Endpoint connected! A SAMPLE FROM THE ENDPOINT: ", records[0])
    else:
        print("- No records found.")

    print("- Number of records:", len(records))
    
except Exception as e:
    print("- An error occurred:", str(e))

{'_id': 1, 'data_extracao': '2023-05-01T01:36:51', 'longitude': -51.178292, 'latitude': -30.066858, 'idacidente': 643556, 'data': '2018-01-02T00:00:00', 'hora': '18:00:00.0000000', 'idade': 37, 'sexo': 'MASCULINO', 'sit_vitima': 'CONDUTOR', 'log1': 'AV ROCIO', 'log2': 'R TEN ALPOIM', 'predial1': 0, 'regiao': 'LESTE', 'tipo_acid': 'ABALROAMENTO', 'auto': 0, 'taxi': 1, 'onibus_urb': 0, 'onibus_met': 0, 'onibus_int': 0, 'caminhao': 0, 'moto': 1, 'carroca': 0, 'bicicleta': 0, 'outro': 0, 'lotacao': 0, 'dia_sem': 'TERÇA-FEIRA', 'periododia': 'NOITE', 'fx_et': '36 A 45', 'tipo_veic': 'MOTOCICLETA', 'consorcio': ''}
Number of records: 29482


In [3]:
try:
    # Defining the DataFrame schema
    schema = StructType([
        StructField("_id", IntegerType()),
        StructField("data", StringType()),
        StructField("hora", StringType()),
        StructField("idade", IntegerType()),
        StructField("sexo", StringType()),
        StructField("sit_vitima", StringType()),
        StructField("log1", StringType()),
        StructField("regiao", StringType()),
        StructField("tipo_acid", StringType()),
        StructField("auto", IntegerType()),
        StructField("taxi", IntegerType()),
        StructField("onibus_urb", IntegerType()),
        StructField("onibus_met", IntegerType()),
        StructField("onibus_int", IntegerType()),
        StructField("caminhao", IntegerType()),
        StructField("moto", IntegerType()),
        StructField("carroca", IntegerType()),
        StructField("bicicleta", IntegerType()),
        StructField("outro", IntegerType()),
        StructField("lotacao", IntegerType()),
        StructField("dia_sem", StringType()),
        StructField("periododia", StringType()),
        StructField("fx_et", StringType()),
    ])

    data_tuples = [
        (
            record["_id"], record["data"], record["hora"], record["idade"], record["sexo"],
            record["sit_vitima"], record["log1"], record["regiao"], record["tipo_acid"], record["auto"],
            record["taxi"], record["onibus_urb"], record["onibus_met"], record["onibus_int"], record["caminhao"],
            record["moto"], record["carroca"], record["bicicleta"], record["outro"], record["lotacao"], record["dia_sem"],
            record["periododia"], record["fx_et"]
        ) for record in records
    ]
    
    # Create the DataFrame with the updated schema and data_tuples
    df = spark.createDataFrame(data_tuples, schema)

    # Check if both the numbers in the JSON data and DataFrame are the same
    print("records in the JSON data: ", len(records))
    print("records in the DataFrame: ", df.count())
    print("A RANDOM SAMPLE FROM DATAFRAME:")
    df.sample(False, 0.1).show(5)

except Exception as e:
    print("An error occurred:", str(e))

records in the JSON data:  29482
records in the DataFrame:  29482
+---+-------------------+----------------+-----+---------+----------+--------------------+------+-------------+----+----+----------+----------+----------+--------+----+-------+---------+-----+-------+-------------+----------+-------+
|_id|               data|            hora|idade|     sexo|sit_vitima|                log1|regiao|    tipo_acid|auto|taxi|onibus_urb|onibus_met|onibus_int|caminhao|moto|carroca|bicicleta|outro|lotacao|      dia_sem|periododia|  fx_et|
+---+-------------------+----------------+-----+---------+----------+--------------------+------+-------------+----+----+----------+----------+----------+--------+----+-------+---------+-----+-------+-------------+----------+-------+
|  4|2018-01-05T00:00:00|18:00:00.0000000|   34|MASCULINO|  CONDUTOR|           R REGENTE| LESTE|      COLISÃO|   1|   0|         0|         0|         0|       0|   1|      0|        0|    0|      0|  SEXTA-FEIRA|     NOITE|26 A 35

In [7]:
# ___DATA CLEANING
class DataCleaning:
    
    def __init__(self, df):
        self.df = df
        
    # This function was commented out as it was determined that there was no need to transform null spaces into 'N/A'.
    '''def fillna_values(self):
        columns_to_exclude = ["idade", "log1"]
        for column in self.df.columns:
            if column not in columns_to_exclude:
                self.df = self.df.withColumn(column, when(col(column).isNull() | (col(column) == ""), "N/A").otherwise(col(column)))'''

    def change_id_field_name(self):
        self.df = self.df.withColumnRenamed("_id", "id")

        
    def extract_day_from_date(self):
        self.df = self.df.withColumn('data', F.to_date(F.substring(F.col('data'), 1, 10)))

        
    def extract_first_8_digits_from_hora(self):
        self.df = self.df.withColumn('hora', F.substring(F.col('hora'), 1, 8))

        
    def remove_whitespaces_from_log1(self):
        self.df = self.df.withColumn('log1', F.trim(F.col('log1')))

        
    def replace_log1_values(self):
        self.df = self.df.withColumn('log1', regexp_replace(col('log1'), r'^(\bPROTASIO ALVES\b)', 'AV PROTASIO ALVES'))
        self.df = self.df.withColumn('log1', regexp_replace(col('log1'), r'^(\R A AV ASSIS BRASIL\b)', 'AV ASSIS BRASIL'))

        
    def remove_decimal_age_value(self):
        self.df = self.df.withColumn('idade', regexp_replace(col('idade'), r"\.0$", ""))

      
    def create_new_column_log_correspondency(self):
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log1'), '^(R |AV |ESTR |TUN |BC |TRAV |AL |LG |PSG |PCA |VDT |PRQ |AC |DOS |DAS |DO |DA )\\s*', ''))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^(DOS |DAS |DA |DO |TL |DESEMBARGADOR )', ''))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^(NS DA CONCEICAO-AC AV )', ''))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^(QUATRO JARDIM )', ''))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^(NS DA CONCEICAO-AC AV )', ''))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^ESTRELA$', 'ESTRELA PARQUE BELEM'))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^ANDRE DA ROCHA$', 'DES ANDRE DA ROCHA'))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^SEVERO DULLI$', 'SEVERO DULLIUS'))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^A I VILA BOM JESUS$', 'A E VILA BOM JESUS'))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^A VILA N S FATIMA-DIVINEIA$', 'C VILA N S FATIMA-DIVINEIA'))
        self.df = self.df.withColumn('log_cleaned', regexp_replace(col('log_cleaned'), '^HUGO CANDAL$', 'DES HUGO CANDAL'))

    def remove_whitespaces_from_log_cleaned(self):
        self.df = self.df.withColumn('log_cleaned', F.trim(F.col('log_cleaned')))       
        
    def remove_lines_with_no_log_correspondency(self):
        lst = ['LEGALIDADE E DA DEMOCRACIA', '', 'RTL CARLOS GOMES-PLINIO B MILANO',
             'LE MERCADO PUBLICO CENTRAL-GALERIA', 'HUGO CANDAL', 'MORENO LOUREIRO LIMA',
             'OTILIA K DE ARAUJO-VIAMAO', 'DIR QUATRO MIL NOVECENTOS',
             'nossa sra aparecida', 'ALBERTO  BINS', 'TRES MIL CINQUENTA TRES', 'LUZ'
             '7137' 'VA NOVE CEFER UM', 'DOIS TEN ARY TARRAGO-PAULO M COELHO',
             'RP E UM VILA NOVA SANTA ROSA', 'MORENO LOUREIRO LIMA']
        
        self.df = self.df.filter(~col('log_cleaned').isin(lst))
       
    def create_pyspark_dataframe_csv(self):    
        df_pandas = self.df.toPandas()
        df_pandas.to_csv('cl_data.csv', index=False)

          
    def create_pd_geo_dataframe_csv_and_compare(self):
        gdf = gpd.read_file("EixosLogradouros/EixosLogradouros.shp")

        def get_wkt(geom):
            if geom is not None and isinstance(geom, shapely.geometry.base.BaseGeometry):
                return geom.wkt
            else:
                return None

        df_pandas_geo = gdf.drop('geometry', axis=1).copy()
        df_pandas_geo['geometry'] = gdf['geometry'].apply(get_wkt)  
        
        
    # ___TESTS    
    # This function was commented out as it was determined that there was no need to transform null spaces into 'N/A'.
    ''' def check_columns_replaced(self):
        excluded_columns = ["idade", "log1"]
        for column in self.df.columns:
            if column not in excluded_columns:
                test = self.df.filter(F.col(column).isNull() | (F.col(column) == "")).count()
                replaced_values = self.df.select(column).rdd.flatMap(lambda x: x).collect()
                if test == 0 or 'N/A' not in replaced_values:
                    print(f"- Column '{column}' has been replaced correctly with 'N/A' for null or empty string values.")
                else:
                    print(f"- ERROR: Some values in column '{column}' are not replaced correctly with 'N/A' for null or empty string values.")'''
    
    
    def check_first_column_name(self):
        if self.df.columns[0] == "id":
            print("- The name of the first column corresponds to 'id'.")
        else:
            print("- The name of the first column does not correspond to 'id'.")
    
    
    def check_date_extraction(self):
        if self.df.filter((F.length(F.col('data')) != 10) | (F.col('data') == " ")).count() == 0:
            print("- First 10 digits extracted from the 'data' column.")
        else:
            print("! ERROR: 'data' is not extracted correctly. COUNTER:", self.df.filter((F.length(F.col('data')) != 10) | (F.col('data') == " ")).count())
            print("- Sample of 5 random records where 'data' is not extracted correctly:")
            self.df.filter((F.length(F.col('data')) != 10) | (F.col('data') == "N/A")).sample(False, 0.1).show(5, truncate=False)

            
    def check_hour_extraction(self):
        eight_digits_count = self.df.filter(F.length(F.col('hora')) == 8).count()
        null_count = self.df.filter(F.col('hora').isNull()).count()

        if eight_digits_count > 0:
            print("- Eight digits extracted from the 'hora' column for the majority of records.")
        else:
            print("! ERROR: No eight-digit values found in the 'hora' column.")

        if null_count > 0:
            print("- ALERT: Some values in the 'hora' column are counted as null. COUNTER: ", null_count)
            print("- Sample of 5 random records where 'hora' is not extracted correctly:")
            self.df.filter((F.length(F.col('hora')) != 8) | (F.col('hora') == " ")).sample(False, 0.1).show(5, truncate=False)


            
    def check_whitespaces_removed_log1(self):
        if self.df.filter(F.col('log1').rlike(r'^\s+|\s+$')).count() == 0:
            print("- Leading and trailing whitespaces removed from the 'log1' column.")
        else:
            print("! ERROR: Leading and trailing whitespaces are not removed correctly. COUNTER: ", self.df.filter(F.col('log1').rlike(r'^\s+|\s+$')).count())
           
        
    def check_new_column_log_cleaned(self):
        if 'log_cleaned' in self.df.columns:
            print("- The 'log_cleaned' column has been successfully created.")
        else:
            print("! ERROR: The 'log_cleaned' column has not been created yet.")
          
            
    def check_log_cleaned_alterations(self):
        regex_patterns = [
            '^(DOS |DAS |DA |DO |TL |DESEMBARGADOR )',
            '^(NS DA CONCEICAO-AC AV )',
            '^(QUATRO JARDIM )',
            '^(NS DA CONCEICAO-AC AV )',
            '^ESTRELA$',
            '^ANDRE DA ROCHA$',
            '^SEVERO DULLI$',
            '^A I VILA BOM JESUS$',
            '^MORENO LOUREIRO LIMA$',
            '^A VILA N S FATIMA-DIVINEIA$'
        ]

        for pattern in regex_patterns:
            test_count = self.df.filter(col('log_cleaned').rlike(pattern)).count()
            if test_count == 0:
                print(f"- Pattern '{pattern}' has been successfully applied to 'log_cleaned' column.")
            else:
                print(f"! ERROR: Pattern '{pattern}' has not been applied correctly to 'log_cleaned' column.")
    
    def check_whitespaces_removed_log_cleaned(self):
        if self.df.filter(F.col('log_cleaned').rlike(r'^\s+|\s+$')).count() == 0:
            print("- Leading and trailing whitespaces removed from the 'log_cleaned' column.")
        else:
            print("! ERROR: Leading and trailing whitespaces are not removed correctly. COUNTER: ", self.df.filter(F.col('log_cleaned').rlike(r'^\s+|\s+$')).count())      
        
    def check_log_removed(self):
        lst = ['LEGALIDADE E DA DEMOCRACIA', 'DA LEGALIDADE E DA DEMOCRACIA', '', 'RTL CARLOS GOMES-PLINIO B MILANO',
               'LE MERCADO PUBLICO CENTRAL-GALERIA', 'HUGO CANDAL', 'MORENO LOUREIRO LIMA',
               'OTILIA K DE ARAUJO-VIAMAO', 'DIR QUATRO MIL NOVECENTOS',
               'nossa sra aparecida', 'ALBERTO BINS', 'TRES MIL CINQUENTA TRES', 'LUZ',
               '7137', 'VA NOVE CEFER UM', 'DOIS TEN ARY TARRAGO-PAULO M COELHO',
               'RP E UM VILA NOVA SANTA ROSA']

        filtered_df = self.df.filter(~col('log_cleaned').isin(lst))
        if filtered_df.count() == self.df.count():
            print("- All rows have been removed correctly.")
        else:
            remaining_values_df = self.df.select('log_cleaned').subtract(filtered_df.select('log_cleaned'))
            remaining_count = remaining_values_df.count()
            print(f"ALERT: >{remaining_count}< undesired values still ramain on DataFrame.")
            print("These values can only conflict if there is a connection with the file for mapping accidents by address in the city of Porto Alegre: ")

            if remaining_count > 0:
                print("_semple of remaining undesired values:_")
                remaining_values = remaining_values_df.distinct().collect()
                
                for row in remaining_values:
                    print(row.log_cleaned)
         

    def execute_cleaning(self):
        self.extract_day_from_date()
        self.change_id_field_name()
        self.extract_first_8_digits_from_hora()
        self.remove_whitespaces_from_log1()
        self.replace_log1_values()
        self.remove_decimal_age_value()
        self.create_new_column_log_correspondency()
        self.remove_whitespaces_from_log_cleaned()
        self.remove_lines_with_no_log_correspondency()
        self.create_pyspark_dataframe_csv()
        self.create_pd_geo_dataframe_csv_and_compare()
 

    def run_tests(self):
        self.check_date_extraction()
        self.check_hour_extraction()
        self.check_whitespaces_removed_log1()
        self.check_first_column_name()
        self.check_new_column_log_cleaned()
        self.check_whitespaces_removed_log_cleaned()
        self.check_log_cleaned_alterations()
        self.check_log_removed()
        
        
    def run_data_cleaning(self):
        try:
            self.execute_cleaning()
            self.run_tests()
            
        except Exception as e:
            print("!!! Error occurred during data cleaning:", str(e))

dc = DataCleaning(df)

dc.run_data_cleaning()

- First 10 digits extracted from the 'data' column.
- Eight digits extracted from the 'hora' column for the majority of records.
- Leading and trailing whitespaces removed from the 'log1' column.
- The name of the first column corresponds to 'id'.
- The 'log_cleaned' column has been successfully created.
- Leading and trailing whitespaces removed from the 'log_cleaned' column.
- Pattern '^(DOS |DAS |DA |DO |TL |DESEMBARGADOR )' has been successfully applied to 'log_cleaned' column.
- Pattern '^(NS DA CONCEICAO-AC AV )' has been successfully applied to 'log_cleaned' column.
- Pattern '^(QUATRO JARDIM )' has been successfully applied to 'log_cleaned' column.
- Pattern '^(NS DA CONCEICAO-AC AV )' has been successfully applied to 'log_cleaned' column.
- Pattern '^ESTRELA$' has been successfully applied to 'log_cleaned' column.
- Pattern '^ANDRE DA ROCHA$' has been successfully applied to 'log_cleaned' column.
- Pattern '^SEVERO DULLI$' has been successfully applied to 'log_cleaned' column.

In [5]:
# REMEMBER TO PRECONFIGURE THE AWS KEYS BEFORE EXECUTE THIS CELL

s3_client = boto3.client('s3',
                        aws_access_key_id=ACCESS_KEY_ID,
                        aws_secret_access_key=SECRET_ACCESS_KEY,
                        region_name=REGION_NAME
                        )

local_file_path = 'cl_data.csv'
s3_file_name = 'data.csv'

s3_client.upload_file(local_file_path, BUCKET_NAME, s3_file_name)

print("- File successfully uploaded to AWS S3.")

File successfully uploaded to AWS S3.
