In [1]:
import os
import polars as pl
from unidecode import unidecode

In [2]:
URL_2023 = 'https://servicos.dpf.gov.br/dadosabertos/SINARM_CSV/OCORRENCIAS/OCORRENCIAS_ate_2023.csv'
URL_2024 = 'https://servicos.dpf.gov.br/dadosabertos/SINARM_CSV/OCORRENCIAS/OCORRENCIAS_2024.csv'
CONN_URL = f'postgresql://{os.environ['THOR_DBUSER']}:{os.environ['THOR_DBPASSWORD']}@{os.environ['THOR_DBHOST']}:5432/{os.environ['THOR_DATABASE']}'

In [3]:
def load_postgre(df: pl.DataFrame, schema:str , table:str) -> int:
    return df.write_database(f'{schema}.{table}',CONN_URL, if_table_exists='append', engine="sqlalchemy")

In [4]:
# df = pl.read_csv(URL_2023, separator=',', encoding='ISO-8859-1', schema_overrides=schema)
df = pl.read_csv(URL_2024, separator=';', encoding='ISO-8859-1')

In [5]:
df = df.with_columns(
        pl.col('MUNICIPIO').map_elements(lambda x: unidecode(x), return_dtype=pl.String),
        pl.col('TIPO_OCORRENCIA').map_elements(lambda x: unidecode(x), return_dtype=pl.String).str.strip_chars(),
        pl.col('CALIBRE_ARMA').str.strip_chars(),
        pl.col('ANO_OCORRENCIA').cast(pl.Int32),
        pl.col('MES_OCORRENCIA').cast(pl.Int32),
        pl.col('TOTAL').cast(pl.Int32),
        pl.when( pl.col('MAIS_1000_MIL_HAB') == 'S').then(1).otherwise(0).alias('MAIS_1000_MIL_HAB')
)
df = df.rename({col: col.lower() for col in df.columns})

In [None]:
load_postgre(df, 'raw', 'ocorrencias')

In [None]:
df_uf = df.with_columns([
    pl.concat_str(['uf']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_uf')
    ,pl.col('uf').str.strip_chars().alias('desc_uf')]
).select(pl.col('id_uf'), pl.col('desc_uf')).unique().sort(by='desc_uf')

load_postgre(df_uf, 'refined', 'dim_uf')
df_uf.head()

In [None]:
df_region = df.with_columns([
    pl.concat_str(['municipio']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_municipio')
    ,pl.col('municipio').str.strip_chars().alias('desc_municipio')]
).select(pl.col('id_municipio'), pl.col('desc_municipio')).unique().sort(by='desc_municipio')


load_postgre(df_region, 'refined', 'dim_municipio')
df_region.head()

In [None]:
df_occurrence = df.with_columns([
    pl.concat_str(['tipo_ocorrencia']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_ocorrencia')
    ,pl.col('tipo_ocorrencia').str.strip_chars().alias('desc_ocorrencia')]
).select(pl.col('id_ocorrencia'), pl.col('desc_ocorrencia')).unique().sort(by='desc_ocorrencia')


load_postgre(df_occurrence, 'refined', 'dim_ocorrencia')
df_occurrence.head()

In [None]:
df_weapons_brand = df.with_columns([
    pl.concat_str(['marca_arma']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_marca')
    ,pl.col('marca_arma').str.strip_chars().alias('desc_marca')]
).select(pl.col('id_marca'), pl.col('desc_marca')).unique().sort(by='desc_marca')


load_postgre(df_weapons_brand, 'refined', 'dim_marca')
df_weapons_brand.head()

In [None]:
df_weapons = df.with_columns([
    pl.concat_str(['especie_arma']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_arma')
    ,pl.col('especie_arma').str.strip_chars().alias('desc_arma')]
).select(pl.col('id_arma'), pl.col('desc_arma')).unique().sort(by='desc_arma')

load_postgre(df_weapons, 'refined', 'dim_arma')
df_weapons.head()

In [None]:
df_weapons_caliber = df.with_columns([
    pl.concat_str(['calibre_arma']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id_calibre')
    ,pl.col('calibre_arma').str.strip_chars().alias('desc_calibre')]
).select(pl.col('id_calibre'), pl.col('desc_calibre')).unique().sort(by='desc_calibre')

load_postgre(df_weapons_caliber, 'refined', 'dim_calibre')
df_weapons_caliber.head()

In [None]:
df_final = (
    df.join(df_uf, left_on='uf', right_on='desc_uf', how='inner' )
    .join(df_region, left_on='municipio', right_on='desc_municipio', how='inner' )
    .join(df_occurrence, left_on='tipo_ocorrencia', right_on='desc_ocorrencia', how='inner' )
    .join(df_weapons_brand, left_on='marca_arma', right_on='desc_marca', how='inner' )
    .join(df_weapons, left_on='especie_arma', right_on='desc_arma', how='inner')
    .join(df_weapons_caliber, left_on='calibre_arma', right_on='desc_calibre', how='inner')
)

df_final = df_final.select(
    pl.concat_str(['ano_ocorrencia','mes_ocorrencia','id_uf','id_municipio','id_ocorrencia','id_marca','id_arma','id_calibre','mais_1000_mil_hab','total']).hash().cast(pl.String).str.slice(0,10).cast(pl.Int64).alias('id')
    , pl.col('ano_ocorrencia')
    , pl.col('mes_ocorrencia')
    , pl.col('id_uf')
    , pl.col('id_municipio')
    , pl.col('id_ocorrencia')
    , pl.col('id_marca')
    , pl.col('id_arma')
    , pl.col('id_calibre')
    , pl.col('mais_1000_mil_hab')
    , pl.col('total')
)

load_postgre(df_final, 'refined', 'fat_ocorrencias')
df_final.head()