## Data Engineering Capstone Project
#### Introduction
This project will bring together criminal facts of data and distinct property information in the same locality, São Paulo / Brazil, so that the result of this project is a clean and reliable basis and can be used for statist modeling and business intelligence.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [65]:
import os
import gc
import logging
from datetime import datetime
from sys import stdout
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [43]:
def normalize_columns_name(_df):
    """
    Recive Spark data frame and return data frame with clean column names
    """
    cols = _df.columns
    df_result = _df
    for c in cols:
        df_result = df_result.withColumnRenamed(c, c.lower().replace(' ', '_'))
    
    return df_result

def replace_ncols(_df):
    """
    Recive Spark data frame and return data frame with replaced null string values
    """
    col_types = _df.dtypes
    df_result = _df
    
    for item in col_types:
        if item[1] == 'string':
            df_result = df_result.withColumn(item[0]\
                                            ,f.when(f.col(item[0]).isin('NULL','NaN', 'NA'), None)\
                                            .otherwise(f.col(item[0])) )
    return df_result
    

In [3]:
# Enable garbage collector in Py4j
gc.enable()
JOB_NAME = 'spark_dend_capstone'

In [4]:
spark = SparkSession\
    .builder\
    .appName(JOB_NAME)\
    .master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [5]:
# Due to the verbosity of log Spark, just leave related error messages org and akka
logger_jvm = spark._jvm.org.apache.log4j
logger_jvm.LogManager.getLogger("org"). setLevel(logger_jvm.Level.ERROR)
logger_jvm.LogManager.getLogger("akka").setLevel(logger_jvm.Level.ERROR)

### Step 1: Scope the Project and Gather Data

#### Describe and Gather Data
1. **Crime Data in Brazil:** comes from [Kaggle](https://www.kaggle.com/inquisitivecrow/crime-data-in-brazil). All crime data from 10 years of police work in Sao Paulo - Brazil. There are more than 16kk of records with several attributes of criminal fact.
3. **Current Properati Listing Information:** comes from [Kaggle](https://www.kaggle.com/properati-data/properties). Property attributes of 1.5 million Latin American listings.

#### Gather Data
The scope of this project is to create a task Spark responsible for entering the data warehouse with clean, reliable data in a dimensional model.
Our analytical repository can be used by data scientists and market intelligence analysts to identify patterns and correlations between datasets.
For example, certain company's real estate branch need to apply a pricing study properties, using data provided by our repository is possible to identify if the surroundings of the properties have a lot indicidencia crimes and what the relevance of these criminal acts.

In [32]:
READ_PATH_CRIME_DATA = './data/crime_data_br/*.csv'
READ_PATH_NEWS = './data/news_folhauol/*.csv'
READ_PATH_PROPERTIES = './data/properties_br/*.csv'

#### Read dataset Crime data in Brazil and drop tuples duplicates

In [44]:
df_crime_crude = spark.read\
    .option('mergeSchema', 'true')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(READ_PATH_CRIME_DATA)\
    .dropDuplicates()

df_crime_crude.printSchema()

root
 |-- NUM_BO: string (nullable = true)
 |-- ANO_BO: string (nullable = true)
 |-- ID_DELEGACIA: string (nullable = true)
 |-- NOME_DEPARTAMENTO: string (nullable = true)
 |-- NOME_SECCIONAL: string (nullable = true)
 |-- DELEGACIA: string (nullable = true)
 |-- NOME_DEPARTAMENTO_CIRC: string (nullable = true)
 |-- NOME_SECCIONAL_CIRC: string (nullable = true)
 |-- NOME_DELEGACIA_CIRC: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- MES: string (nullable = true)
 |-- DATA_OCORRENCIA_BO: string (nullable = true)
 |-- HORA_OCORRENCIA_BO: string (nullable = true)
 |-- FLAG_STATUS13: string (nullable = true)
 |-- RUBRICA: string (nullable = true)
 |-- DESDOBRAMENTO: string (nullable = true)
 |-- CONDUTA: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- CIDADE: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO_LOGRADOURO: string (nullable = true)
 |-- FLAG_STATUS22: string (nullabl

#### Renamed columns to lower case

In [45]:
df_crime_crude = normalize_columns_name(df_crime_crude)

In [None]:
qtd_reg_crimes = df_crime_crude.count()
print('Total records df_crime_crude:', qtd_reg_crimes)

### Step 2: Explore and Assess the Data

#### Replace strings 'NULL','NaN', 'NA' to null type

In [46]:
df_crime_crude = replace_ncols(df_crime_crude)

#### The columns are recorded as _col# records of badly formatted strings to our project records affected by these anomalies will be discarded because they represent a very low percentage.

In [47]:
df_crime = df_crime_crude.where('(_c30 is null and _c31 is null and _c32 is null)').drop('_c30', '_c31', '_c32')

In [48]:
df_crime.limit(10).toPandas()

Unnamed: 0,num_bo,ano_bo,id_delegacia,nome_departamento,nome_seccional,delegacia,nome_departamento_circ,nome_seccional_circ,nome_delegacia_circ,ano,...,logradouro,numero_logradouro,flag_status22,descr_tipo_pessoa,cont_pessoa,sexo_pessoa,idade_pessoa,cor,descr_profissao,descr_grau_instrucao
0,3807,2009,30307,DEMACRO,DEL.SEC.MOGI DAS CRUZES,02º D.P. MOGI DAS CRUZES,DEMACRO,DEL.SEC.MOGI DAS CRUZES,02º D.P. MOGI DAS CRUZES,2009,...,AV ANCHIETA,0,C,Vítima,1,M,41.0,Preta,APOSENTADO(A),1 Grau incompleto
1,2897,2009,10354,DECAP,DEL.SEC.8º SAO MATEUS,54º D.P. CID. TIRADENTES,DECAP,DEL.SEC.8º SAO MATEUS,54º D.P. CID. TIRADENTES,2009,...,DOUTOR GUILERME DE ABREU SODRE,790,C,Indiciado,6,M,,,,
2,2919,2009,10344,DECAP,DEL.SEC.6º SANTO AMARO,80º D.P. VILA JOANIZA,DECAP,DEL.SEC.2º SUL,35º D.P. JABAQUARA,2009,...,AC AV ENG ARMANDO DE ARRUDA PEREIRA,4678,C,Vítima,2,F,29.0,Branca,CONSULTOR(A),Superior completo
3,3744,2009,10365,DECAP,DEL.SEC.4º NORTE,90º D.P. PQ. NOVO MUNDO,DECAP,DEL.SEC.4º NORTE,90º D.P. PQ. NOVO MUNDO,2009,...,AC MARGINAL DIREITA DO TIETE,18,C,Partes,5,M,45.0,Branca,ANALISTA DE SISTEMAS,Superior completo
4,3363,2009,10308,DECAP,DEL.SEC.5º LESTE,52º D.P. PARQUE S.JORGE,DECAP,DEL.SEC.4º NORTE,19º D.P. VILA MARIA,2009,...,AC PONTE PRES JANIO QUADROS-AV MORVAN DI,0,C,Testemunha,3,M,41.0,Branca,AJUDANTE,1 Grau incompleto
5,2373,2009,130412,DEINTER 1 - SAO JOSE DOS CAMPOS,DEL.SEC.TAUBATÉ,01º D.P. PINDAMONHANGABA,DECAP,DEL.SEC.3º OESTE,46º D.P. PERUS,2009,...,AL AGENOR COUTO DE MAGALHAES,0,C,Vítima,1,F,63.0,Branca,,
6,2286,2009,30609,DEMACRO,DEL.SEC.CARAPICUIBA,02º D.P. BARUERI,DEMACRO,DEL.SEC.CARAPICUIBA,01º D.P. BARUERI,2009,...,AL ARAGUAIA,1850,C,Vítima,2,M,16.0,Parda,ESTUDANTE,1 Grau incompleto
7,4665,2009,10342,DECAP,DEL.SEC.1º CENTRO,78º D.P. JARDINS,DECAP,DEL.SEC.1º CENTRO,78º D.P. JARDINS,2009,...,AL FRANCA,184,C,Representante,1,F,36.0,Branca,EMPRESARIO(A),2 Grau completo
8,3205,2009,10333,DECAP,DEL.SEC.3º OESTE,33º D.P. PIRITUBA,DECAP,DEL.SEC.1º CENTRO,78º D.P. JARDINS,2009,...,AL FRANCA,0,C,Indiciado,1,M,,Branca,,
9,4638,2009,10341,DECAP,DEL.SEC.1º CENTRO,77º D.P. SANTA CECILIA,DECAP,DEL.SEC.1º CENTRO,77º D.P. SANTA CECILIA,2009,...,AL NOTHMANN,100,C,Indiciado,2,M,26.0,Parda,,


#### Normalizing the sexo_pessoa stands for F (female)

In [61]:
df_crime = df_crime.withColumn('sexo_pessoa'\
                               , f.when(f.col('sexo_pessoa') == 'I', 'F')\
                               .otherwise(f.col('sexo_pessoa')))

#### Data quality check: validation if columns have only numeric data, otherwise values will be null

In [80]:
df_crime.select('latitude', 'longitude').where("latitude rlike '[a-z]'").distinct().show(10, False)

+--------+---------+
|latitude|longitude|
+--------+---------+
+--------+---------+



#### Data dictionary: crime data

In [67]:
data_dict_crime = {
"num_bo": "integer",
"ano_bo": "integer",
"id_delegacia": "integer",
"nome_departamento": "string",
"nome_seccional": "string",
"delegacia": "string",
"nome_departamento_circ": "string",
"nome_seccional_circ": "string",
"nome_delegacia_circ": "string",
"ano": "integer",
"mes": "integer",
"data_ocorrencia_bo":"string",
"hora_ocorrencia_bo":"string",
"flag_status13": "string",
"rubrica": "string",
"desdobramento": "string",
"conduta": "string",
"latitude": "double",
"longitude": "double",
"cidade": "string",
"logradouro": "string",
"numero_logradouro": "string",
"flag_status22": "string",
"descr_tipo_pessoa": "string",
"cont_pessoa": "string",
"sexo_pessoa": "string",
"idade_pessoa": "string",
"cor": "string",
"descr_profissao": "string",
"descr_grau_instrucao": "string"
}

#### Converted all column types according to data dictionary

In [68]:
for k, v in data_dict_crime.items():
    df_crime = df_crime.withColumn(k, f.col(k).cast(v))

#### Data quality check: validate types

In [69]:
df_crime.printSchema()

root
 |-- num_bo: integer (nullable = true)
 |-- ano_bo: integer (nullable = true)
 |-- id_delegacia: integer (nullable = true)
 |-- nome_departamento: string (nullable = true)
 |-- nome_seccional: string (nullable = true)
 |-- delegacia: string (nullable = true)
 |-- nome_departamento_circ: string (nullable = true)
 |-- nome_seccional_circ: string (nullable = true)
 |-- nome_delegacia_circ: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- data_ocorrencia_bo: string (nullable = true)
 |-- hora_ocorrencia_bo: string (nullable = true)
 |-- flag_status13: string (nullable = true)
 |-- rubrica: string (nullable = true)
 |-- desdobramento: string (nullable = true)
 |-- conduta: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- cidade: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero_logradouro: string (nullable = true)
 |-- flag_status22: string (nu

#### To optimize future transformations, our dataframe will be created from parquet files in a temporary directory.
We will use this feature only now because previous transformations made the data frame consumable

In [83]:
df_crime.repartition(5).write.mode('append').parquet('./data/tmp_crime_data_br/')

In [103]:
df_crime_temp = spark.read.parquet('./data/tmp_crime_data_br/*.parquet')
df_crime_temp.where("sexo_pessoa in ('F', 'M') and (latitude is not null or longitude is not null)")\
.createOrReplaceTempView('df_crime')

#### Data quality check: validate primary key num_bo

In [104]:
spark.sql('select count(*) all_records, count(distinct concat(num_bo,ano_bo,id_delegacia)) num_bo_distinct from df_crime').show()

+-----------+---------------+
|all_records|num_bo_distinct|
+-----------+---------------+
|    6465130|        2815645|
+-----------+---------------+



#### Create dimension police station

In [77]:
df_police_station = spark.sql("\
          SELECT DISTINCT id_delegacia, nome_departamento, nome_seccional, delegacia\
          FROM df_crime\
          ")

#### Data quality check: check empty data frame

In [78]:
df_police_station.count()

6441

In [115]:
df = spark.sql('select             num_bo, \
            id_delegacia, \
            latitude, \
            longitude, \
            ano_bo, \
            ano, \
            mes, \
            data_ocorrencia_bo, \
            hora_ocorrencia_bo, \
            flag_status13, \
            rubrica, \
            desdobramento, \
            conduta, \
            cidade \
              from df_crime')

In [113]:
df.where("num_bo = 449").limit(5).toPandas()

Unnamed: 0,num_bo,id_delegacia,latitude,longitude,ano_bo,nome_departamento_circ,nome_seccional_circ,nome_delegacia_circ,ano,mes,data_ocorrencia_bo,hora_ocorrencia_bo,flag_status13,rubrica,desdobramento,conduta,cidade
0,449,10392,-23.388445,-46.769031,2012,DECAP,DEL.SEC.3º OESTE,46º D.P. PERUS,2012,3,20/03/2012,07:25,C,Drogas sem autorização ou em desacordo (Art.33...,,,S.PAULO
1,449,30306,-23.520864,-46.178951,2014,DEMACRO,DEL.SEC.MOGI DAS CRUZES,01º D.P. MOGI DAS CRUZES,2013,11,07/11/2013,15:50,C,Furto (art. 155),,OUTROS,MOGI DAS CRUZES
2,449,30211,-23.543059,-45.763427,2012,DEMACRO,DEL.SEC.GUARULHOS,DEL.POL.STA ISABEL,2012,2,15/02/2012,09:30,C,Roubo (art. 157),,OUTROS,S.ISABEL
3,449,30432,-23.535922,-46.845701,2011,DEMACRO,DEL.SEC.CARAPICUIBA,02º D.P. CARAPICUIBA,2011,4,14/04/2011,11:20,C,Roubo (art. 157),,ESTABELECIMENTO COMERCIAL,CARAPICUIBA
4,449,10112,-23.537971,-46.618818,2011,DECAP,DEL.SEC.1º CENTRO,12º D.P. PARI,2011,2,05/02/2011,14:00,C,Roubo (art. 157),,ESTABELECIMENTO COMERCIAL,S.PAULO


In [114]:
spark.sql('select count(*) all_records, count(distinct concat(num_bo,ano_bo,id_delegacia, cidade)) num_bo_distinct from df').show()

+-----------+---------------+
|all_records|num_bo_distinct|
+-----------+---------------+
|    6465130|        2815645|
+-----------+---------------+



#### Create table fact crimes

In [80]:
df_fact = spark.sql("\
          SELECT DISTINCT \
            num_bo, \
            id_delegacia, \
            latitude, \
            longitude, \
            ano_bo, \
            nome_departamento_circ, \
            nome_seccional_circ, \
            nome_delegacia_circ, \
            ano, \
            mes, \
            data_ocorrencia_bo, \
            hora_ocorrencia_bo, \
            flag_status13, \
            rubrica, \
            desdobramento, \
            conduta, \
            cidade, \
            logradouro, \
            numero_logradouro, \
            flag_status22, \
            descr_tipo_pessoa, \
            cont_pessoa, \
            sexo_pessoa, \
            idade_pessoa, \
            cor, \
            descr_profissao, \
            descr_grau_instrucao \
          FROM df_crime\
          ")

#### Read dataset Properties

In [4]:
df_properties_crude = spark.read\
    .option('mergeSchema', 'true')\
    .option('header', 'true')\
    .option('quote', '"')\
    .option('escape', '"')\
    .csv(READ_PATH_PROPERTIES)
df_properties_crude = normalize_columns_name(df_properties_crude)

In [5]:
df_properties = df_properties_crude.where("lat-lon is not null")

In [11]:
df_properties.select('price_usd_per_m2','price_per_m2').limit(10).toPandas()

Unnamed: 0,price_usd_per_m2,price_per_m2
0,2877.2106944444445,9444.444444444443
1,3595.199440993789,11801.242236024846
2,3026.4511111111115,10000.0
3,2044.1819298245612,6754.3859649122805
4,1089.52242,3600.0
5,590.5269512195122,1951.219512195122
6,590.5269512195122,1951.219512195122
7,1101.8389671361504,3615.023474178404
8,4315.673132743363,14159.29203539823
9,2133.5609,7000.0


In [25]:
df_crime.limit(2).toPandas()

Unnamed: 0,NUM_BO,ANO_BO,ID_DELEGACIA,NOME_DEPARTAMENTO,NOME_SECCIONAL,DELEGACIA,NOME_DEPARTAMENTO_CIRC,NOME_SECCIONAL_CIRC,NOME_DELEGACIA_CIRC,ANO,...,LOGRADOURO,NUMERO_LOGRADOURO,FLAG_STATUS22,DESCR_TIPO_PESSOA,CONT_PESSOA,SEXO_PESSOA,IDADE_PESSOA,COR,DESCR_PROFISSAO,DESCR_GRAU_INSTRUCAO
0,4,2013,900821,DPPC- DEP.POL.PROT A CIDADANIA,DPPC-DEP.POL.PROT.A CIDADANIA,DIISP - 01ª DEL.POL.,DECAP,DEL.SEC.1º CENTRO,03º D.P. CAMPOS ELISEOS,2009,...,R VINTE E QUATRO DE MAIO,35,C,Declarante,1,F,43,,VENDEDOR(A),2 Grau completo
1,23,2014,30613,DEMACRO,DEL.SEC.CARAPICUIBA,DEL.DEF.MUL. BARUERI,DEMACRO,DEL.SEC.CARAPICUIBA,01º D.P. BARUERI,2009,...,EST DAS ROSAS,0,C,Autor,3,M,49,Parda,EMPRESARIO COMERCIAL,
