In [0]:
# Installing unidecode, because we'll deal with encoding
!pip install unidecode

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Collecting unidecode
  Obtaining dependency information for unidecode from https://files.pythonhosted.org/packages/84/b7/6ec57841fb67c98f52fc8e4a2d96df60059637cba077edc569a302a8ffc7/Unidecode-1.3.8-py3-none-any.whl.metadata
  Downloading Unidecode-1.3.8-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m225.3/235.5 kB[0m [31m6.5 MB/s[0m eta [36m0:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8
[43mNote: you may need to restart the kernel using %restart_python o

In [0]:
# unicode import
from unidecode import unidecode
# regex import
import re

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, median
from pyspark.sql.window import Window

## Opening the CSV file in PySpark and displaying it.

In [0]:
%python
# File location and type
file_location = "/FileStore/tables/CrimesPR.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ";"
encoding="ISO-8859-1"
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("encoding",encoding) \
  .load(file_location)

display(df)

Ano,Localidade,Índice de Desenvolvimento Humano Municipal (IDHM),"Produto Interno Bruto (PIB) per Capita (R$ 1,00)",Crimes de Ameaça,Crimes de Estelionato,Crimes de Estupro,Crimes de Furto,Crimes de Lesão Corporal,Crimes de Roubo,Furtos de Veículos,Ocorrências Envolvendo Tráfico de Drogas,Ocorrências Envolvendo Uso/Consumo de Drogas,Perturbação do Sossego/Tranquilidade,Roubos de Veículos
2018,Estado do Paraná,,38.773,110.917,29.856,6.691,172.033,64.417,60.096,17.678,11.869,16.172,28.994,7.885
2018,Abatiá,,21.529,101.0,3,5,88.0,58.0,20,4,5,11,28,4
2018,Adrianópolis,,43.677,43.0,13,9,31.0,32.0,2,2,1,3,3,-
2018,Agudos do Sul,,22.088,105.0,12,5,100.0,52.0,19,5,5,4,8,6
2018,Almirante Tamandaré,,13.168,1.151,174,68,1.129,660.0,639,176,44,81,157,44
2018,Altamira do Paraná,,33.206,34.0,5,2,25.0,22.0,1,2,-,-,10,-
2018,Alto Paraíso,,34.432,29.0,6,3,22.0,26.0,1,1,12,1,8,-
2018,Alto Paraná,,22.431,173.0,38,17,160.0,111.0,40,17,14,11,28,1
2018,Alto Piquiri,,30.9,90.0,15,10,71.0,46.0,5,7,10,10,18,3
2018,Altônia,,17.247,159.0,24,8,169.0,81.0,6,7,14,5,33,2



## Transforming the Data, in order to be further used 

In [0]:
# Initialize Spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

def clean_data(df):
    """
    Function to clean the DataFrame.
    """
#    # Rename columns: convert to lowercase, replace spaces with underscores, and trim trailing spaces
    def clean_column_name(name):
        name = name.lower().strip()
        name = unidecode(name)  # remove accents
        name = re.sub(r'\s+', '_', name)  # replace spaces with underscores
        name = re.sub(r'[^a-zA-Z0-9_]', '', name)  # remove special characters
        return name

    # cleaning each column's name
    new_column_names = [clean_column_name(col) for col in df.columns]
    df = df.toDF(*new_column_names)    


    # Replacing "-" by Null Values
    df = df.replace("-", None)
    
    # converting all the columns, except "Localidade" to the float type
    for col_name in df.columns:
        if col_name != 'localidade':
            df = df.withColumn(col_name, col(col_name).cast('float'))
    
    # dropping features with "too much" Null values
    df = df.drop('indice_de_desenvolvimento_humano_municipal_idhm')
    df = df.drop('produto_interno_bruto_pib_per_capita_r_100')
    df = df.drop('roubos_de_veiculos')
    
    # sorting the Data, since it has time order
    df = df.orderBy(['localidade', 'ano'])
    
    # Filling Null values with the median of each column grouped by Localidade
    list_cols = [col_name for col_name in df.columns if col_name != 'localidade']
    
    window_spec = Window.partitionBy('localidade')

    # imputing the Null with Median values grouped by 'localidade'    
    for col_name in list_cols:
        median_value = df.approxQuantile(col_name, [0.5], 0.01)[0]
        df = df.withColumn(col_name, when(isnan(col(col_name)) | (col(col_name).isNull()), median_value).otherwise(col(col_name)))
    
    # Dropping remaining Null values
    df = df.dropna()


    return df

# Cleaning/Transforming df
df = clean_data(df)

df.show()

+------+-------------------+------------------+---------------------+-----------------+-----------------+------------------------+---------------+------------------+----------------------------------------+-------------------------------------------+-----------------------------------+
|   ano|         localidade|  crimes_de_ameaca|crimes_de_estelionato|crimes_de_estupro|  crimes_de_furto|crimes_de_lesao_corporal|crimes_de_roubo|furtos_de_veiculos|ocorrencias_envolvendo_trafico_de_drogas|ocorrencias_envolvendo_usoconsumo_de_drogas|perturbacao_do_sossegotranquilidade|
+------+-------------------+------------------+---------------------+-----------------+-----------------+------------------------+---------------+------------------+----------------------------------------+-------------------------------------------+-----------------------------------+
|2018.0|             Abatiá|             101.0|                  3.0|              5.0|             88.0|                    58.0|         

In [0]:
print(df.columns)

['ano', 'localidade', 'crimes_de_ameaca', 'crimes_de_estelionato', 'crimes_de_estupro', 'crimes_de_furto', 'crimes_de_lesao_corporal', 'crimes_de_roubo', 'furtos_de_veiculos', 'ocorrencias_envolvendo_trafico_de_drogas', 'ocorrencias_envolvendo_usoconsumo_de_drogas', 'perturbacao_do_sossegotranquilidade']



## Creating a Table from the Previous PySpark DataFrame

In [0]:
# Create a view or table

temp_table_name = "CrimesPR_csv"

df.createOrReplaceTempView(temp_table_name)


## Number of Cities in the State of Paraná, which is equal to 400 - 1 = 399 (excluding the State of Paraná).

In [0]:
%sql

/* Query the created temp table in a SQL cell */

SELECT COUNT(DISTINCT(localidade)) AS cont_localidade FROM `CrimesPR_csv`

cont_localidade
400



## Top 10 cities in State of Paraná with the highest "Rape Crimes", along with their Minimum, Maximum and Standard Deviation.

In [0]:
%sql

/* Query the created temp table in a SQL cell */

SELECT localidade, AVG(crimes_de_estupro) as avg_estupro, MIN(crimes_de_estupro) as min_estupro,
MAX(crimes_de_estupro) as max_estupro, STD(crimes_de_estupro) as std_estupro
FROM `CrimesPR_csv` 
GROUP BY(localidade) ORDER BY(AVG(crimes_de_estupro) ) DESC LIMIT 10

localidade,avg_estupro,min_estupro,max_estupro,std_estupro
Curitiba,660.1691666642824,1.0149999856948853,875.0,326.78588052108125
Londrina,253.0,219.0,290.0,25.219040425836976
Ponta Grossa,214.5,197.0,232.0,15.016657417681207
Cascavel,179.83333333333334,153.0,206.0,23.498226883462223
Maringá,173.66666666666666,133.0,222.0,33.134071085012586
Colombo,148.33333333333334,119.0,168.0,21.275964529643925
Foz do Iguaçu,147.66666666666666,136.0,157.0,8.041558721209876
Guarapuava,136.66666666666666,120.0,144.0,9.458682078739445
São José dos Pinhais,135.0,95.0,171.0,29.698484809834994
Araucária,114.83333333333331,95.0,129.0,11.61751551178937



## Top 10 cities in State of Paraná with the highest "Robbery Crimes", along with their Minimum, Maximum and Standard Deviation.

In [0]:
%sql

/* Query the created temp table in a SQL cell */

SELECT localidade, AVG(crimes_de_furto) as avg_furto, MIN(crimes_de_furto) as min_furto,
MAX(crimes_de_furto) as max_furto, STD(crimes_de_furto) as std_furto 
FROM `CrimesPR_csv` 
GROUP BY(localidade) ORDER BY(AVG(crimes_de_furto) ) DESC LIMIT 10 

localidade,avg_furto,min_furto,max_furto,std_furto
Castro,789.0,643.0,925.0,99.5248712634184
Cianorte,751.3333333333334,577.0,988.0,170.8948994752818
Pontal do Paraná,654.0713333288828,1.1799999475479126,995.0,505.9883586424109
Medianeira,630.0,526.0,743.0,88.94942383174833
Palmas,626.5043333371481,1.0260000228881836,859.0,316.0775140799172
Santo Antônio da Platina,610.1666666666666,545.0,723.0,73.37688100939333
Jacarezinho,608.3333333333334,439.0,839.0,173.715476186397
Telêmaco Borba,606.506500005722,1.0390000343322754,942.0,332.5394028460164
Piraquara,598.8461666703224,1.037999987602234,967.0,465.78724090524594
Irati,578.0,457.0,698.0,98.69751769928158



## Top 10 cities in State of Paraná with the highest "Drug Trafficking Occurrences", along with their Minimum, Maximum and Standard Deviation.

In [0]:
%sql

SELECT localidade, AVG(ocorrencias_envolvendo_trafico_de_drogas) as avg_trafico,
MIN(ocorrencias_envolvendo_trafico_de_drogas) as min_trafico,
MAX(ocorrencias_envolvendo_trafico_de_drogas) as max_trafico,
STD(ocorrencias_envolvendo_trafico_de_drogas) as std_trafico 
FROM `CrimesPR_csv` GROUP BY(localidade) ORDER BY(AVG(ocorrencias_envolvendo_trafico_de_drogas) ) DESC LIMIT 10 

localidade,avg_trafico,min_trafico,max_trafico,std_trafico
Londrina,658.3985000054041,1.3910000324249268,944.0,333.00613388491604
Maringá,494.5,400.0,633.0,91.7098686074732
Cascavel,450.0,399.0,542.0,48.588064377993085
Foz do Iguaçu,351.5,279.0,368.0,35.613199800074135
Ponta Grossa,292.6666666666667,235.0,343.0,41.577237362127214
Colombo,288.6666666666667,188.0,456.0,95.30512403153708
Guarapuava,265.0,209.0,351.0,54.81240735453972
Piraquara,262.8333333333333,142.0,373.0,93.54232553591272
Apucarana,252.33333333333331,165.0,329.0,55.05512389112086
São José dos Pinhais,238.5,200.0,308.0,41.93208795182992



## Top 10 cities in State of Paraná with the highest "Vehicle Thefts", along with their Minimum, Maximum and Standard Deviation.

In [0]:
%sql

/* Query the created temp table in a SQL cell */

SELECT localidade, AVG(furtos_de_veiculos) as avg_furtos_veiculos, MIN(furtos_de_veiculos) as min_furtos_veiculos,
MAX(furtos_de_veiculos) as max_furtos_veiculos, STD(furtos_de_veiculos) as std_furtos_veiculos 
FROM `CrimesPR_csv` 
GROUP BY(localidade) ORDER BY(AVG(furtos_de_veiculos) ) DESC LIMIT 10


localidade,avg_furtos_veiculos,min_furtos_veiculos,max_furtos_veiculos,std_furtos_veiculos
Maringá,696.1666666666666,510.0,912.0,162.35321575708522
Cascavel,629.8333333333334,457.0,754.0,114.3283283647
Foz do Iguaçu,587.0,413.0,763.0,142.22657979435488
Londrina,489.6721666653951,1.4950000047683716,888.0,388.1439463983128
Colombo,358.5,320.0,405.0,29.85799725366723
São José dos Pinhais,343.5,254.0,514.0,90.9411897876864
Ponta Grossa,343.1666666666667,247.0,441.0,76.47853729424135
Toledo,238.5,169.0,337.0,71.91314205345223
Apucarana,192.5,167.0,221.0,19.46021582614129
Arapongas,191.83333333333331,136.0,223.0,31.307613557514514



## Top 10 cities in State of Paraná with the highest "Disturbing the Piece/Tranquility", along with their Minimum, Maximum and Standard Deviation.

In [0]:
%sql

/* Query the created temp table in a SQL cell */

SELECT localidade, AVG(perturbacao_do_sossegotranquilidade) as avg_sossego_tranq, 
MIN(perturbacao_do_sossegotranquilidade) as min_sossego_tranq,
MAX(perturbacao_do_sossegotranquilidade) as max_sossego_tranq, 
STD(perturbacao_do_sossegotranquilidade) as std_sossego_tranq
FROM `CrimesPR_csv` 
GROUP BY(localidade) ORDER BY(AVG(perturbacao_do_sossegotranquilidade) ) DESC LIMIT 10


localidade,avg_sossego_tranq,min_sossego_tranq,max_sossego_tranq,std_sossego_tranq
Cascavel,706.3591666618983,1.1549999713897705,961.0,358.6130784177245
Francisco Beltrão,646.3333333333334,430.0,926.0,192.7264036572744
Londrina,558.7086666623751,1.093999981880188,950.0,445.0636567004879
Maringá,528.6666666666666,340.0,688.0,145.70609687541102
Piraquara,508.5,192.0,734.0,179.68500215655175
Colombo,497.3653333385785,1.1920000314712524,868.0,328.99932280237385
Paranaguá,492.5,340.0,772.0,167.02784199048972
São José dos Pinhais,472.8519999980927,1.1119999885559082,998.0,344.84232719629915
Guaratuba,466.0,236.0,695.0,180.92318812136824
Pato Branco,456.3333333333333,304.0,694.0,172.7039856710512



## What is the mean Drug Trafficking and Drug use Occurrences in Curitiba? 

In [0]:
%sql

SELECT localidade, AVG(ocorrencias_envolvendo_trafico_de_drogas) as avg_trafico_curitiba,
                  AVG(ocorrencias_envolvendo_usoconsumo_de_drogas) as avg_druguse_curitiba
FROM `CrimesPR_csv` 
GROUP BY localidade 
HAVING localidade = 'Curitiba'

localidade,avg_trafico_curitiba,avg_druguse_curitiba
Curitiba,1.734333336353302,3.14300004641215



## Concerning the last question, what about specifically in year 2020?


In [0]:
%sql

SELECT localidade, ocorrencias_envolvendo_trafico_de_drogas as trafico_curitiba,
                  ocorrencias_envolvendo_usoconsumo_de_drogas as druguse_curitiba
FROM `CrimesPR_csv` 
WHERE localidade = 'Curitiba' AND ano = 2020

localidade,trafico_curitiba,druguse_curitiba
Curitiba,1.975000023841858,3.380000114440918


# Data Quality Issues Investigations

1. Next, it is going to be addressed descriptive statistics about Rape Crimes in **Curitiba** in order to find inconsistencies in the data.

In [0]:
%sql

SELECT localidade, AVG(crimes_de_estupro) as avg_estupro, MIN(crimes_de_estupro) as min_estupro_curitiba,
MAX(crimes_de_estupro) as max_estupro_curitiba
FROM `CrimesPR_csv` 
GROUP BY(localidade) HAVING localidade = 'Curitiba'


localidade,avg_estupro,min_estupro_curitiba,max_estupro_curitiba
Curitiba,660.1691666642824,1.0149999856948853,875.0



2. Next, it is going to be addressed descriptive statistics about Drug Trafficking Occurrences in **Curitiba** in order to find inconsistencies in the data.

In [0]:
%sql

SELECT localidade, AVG(ocorrencias_envolvendo_trafico_de_drogas) as avg_traffic_curitiba, MIN(ocorrencias_envolvendo_trafico_de_drogas) as min_traffic_curitiba,
MAX(ocorrencias_envolvendo_trafico_de_drogas) as max_traffic_curitiba
FROM `CrimesPR_csv` 
GROUP BY(localidade) HAVING localidade = 'Curitiba'


localidade,avg_traffic_curitiba,min_traffic_curitiba,max_traffic_curitiba
Curitiba,1.734333336353302,1.378000020980835,2.132999897003174



3. Next, it is going to be addressed descriptive statistics about Bodily Injuries in **Curitiba** in order to find inconsistencies in the data.

In [0]:
%sql

SELECT localidade, AVG(crimes_de_lesao_corporal) as avg_lesion_curitiba, MIN(crimes_de_lesao_corporal) as min_lesion_curitiba,
MAX(crimes_de_lesao_corporal) as max_lesion_curitiba
FROM `CrimesPR_csv` 
GROUP BY(localidade) HAVING localidade = 'Curitiba'


localidade,avg_lesion_curitiba,min_lesion_curitiba,max_lesion_curitiba
Curitiba,7.769500017166138,6.984000205993652,8.48900032043457



4. Next, it is going to be addressed descriptive statistics about Bodily Injuries in **São José dos Pinhais** in order to find inconsistencies in the data.

In [0]:
%sql

SELECT localidade, AVG(crimes_de_lesao_corporal) as avg_lesion_SJP, MIN(crimes_de_lesao_corporal) as min_lesion_SJP,
MAX(crimes_de_lesao_corporal) as max_lesion_SJP
FROM `CrimesPR_csv` 
GROUP BY(localidade) HAVING localidade = 'São José dos Pinhais'


localidade,avg_lesion_SJP,min_lesion_SJP,max_lesion_SJP
São José dos Pinhais,1.418999989827474,1.2300000190734863,1.6740000247955322


In [0]:

# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

#permanent_table_name = "crimespr_etl_parquet"

#df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
# Example: Save cleaned DataFrame to Parquet format
df.write.mode("overwrite").parquet("./cleaned_crimespr.csv")