# Projeto de Implementação de Data Warehouse para Análises de Ocorrências Policiais no Estado do Rio de Janeiro

### Introdução:

A coleta e análise de dados relacionados a ocorrências policiais desempenham um papel crítico na manutenção da segurança pública. As forças de segurança enfrentam desafios complexos, incluindo a prevenção e investigação de crimes, a gestão de recursos e o monitoramento de tendências criminais.


#### Objetivo:
O objetivo deste projeto é criar a arquitetura de um DW que abrange a estrutura, o design e a organização dos componentes do DW, incluindo as tabelas de dimensão, a tabela fato, as transformações de dados, a modelagem de dados, entre outros aspectos. O Data Warehouse é construído seguindo o modelo estrela, tabelas de dimensão e tabela fato.

Este projeto faz parte do curso ministrado pelo **M.Sc Prof. Claudio Bonel**- https://www.youtube.com/watch?v=R8Admbp2_O4&list=PLPP4r1UqnhGqfkpnMFfqWe3eK_rjWnP9C, disponível no canal do YouTube. O curso original visava criar um Data Warehouse usando SQLite como banco de dados de destino para o Data Warehouse. No entanto, optei por adotar o PostgreSQL como meu banco de dados escolhido e seguir uma abordagem passo a passo, começando primeiro por toda criação de tabelas e carga no stage antes de criar o Data Warehouse propriamente dito.

Neste estudo de caso, vou demonstrar como implementei essa solução, fornecendo detalhes sobre a criação de um pipeline ETL personalizado e seu impacto na qualidade da análise de dados.

## Importância de um Data Warehouse nas Análises de Ocorrências Policiais

**1- Consolidação de Dados** : A consolidação desses dados em um único repositório facilita a análise e a identificação de tendências e padrões.

**2 - Histórico e Tendências**: Com um DW, é possível armazenar históricos de ocorrências policiais ao longo do tempo. Compreender essas tendências é fundamental para direcionar os esforços de prevenção e resposta policial.

**3 - Apoio à Tomada de Decisões**: O DW fornece às autoridades policiais uma visão abrangente e baseada em dados das ocorrências passadas e presentes. 

**4 - Avaliação de Eficácia**: Um DW permite que as agências de segurança pública avaliem a eficácia de suas políticas e ações. 

**5 - Compartilhamento de Informações**: Um DW pode ser configurado para permitir o compartilhamento seguro de dados entre agências de segurança pública, promovendo uma abordagem mais colaborativa e eficiente para lidar com o crime.

In [1]:
# importar as bibliotecas necessárias
import pandas as pd
import psycopg2
from datetime import datetime
from sqlalchemy import create_engine, inspect

In [2]:
# Configurações de conexão com o banco de dados OcorrenciasStage
db_config_stage = {
    "host": "localhost",
    "database": "OcorrenciasStage",
    "user": "postgres",
    "password": "admin123",
}

conexao_stage = psycopg2.connect(**db_config_stage)
cursor_stage = conexao_stage.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Stage bem-sucedida.")

Conexão com o Stage bem-sucedida.


## Extrair dados com a tecnica Web Scraping do site do IBGE 

Nesta etapa, será necessário extrair os dados dos municipios e código diretamente do site do IBGE.A extração visa obter informações atualizadas e confiáveis sobre municípios brasileiros para uso em análises, pesquisas e tomada de decisões. 

In [3]:
# Coletando dados do site do IBGE para registrar os dados dos municípios do Rio de Janeiro
url = 'https://www.ibge.gov.br/explica/codigos-dos-municipios.php#RJ'

DadosIBGE = pd.DataFrame(pd.read_html(url, match="Municípios do Rio de Janeiro")[0])

#Renomear tabelas do arquivo csv
DadosIBGE = DadosIBGE.rename(columns={'Municípios do Rio de Janeiro':'nmMunic','Códigos':'codMunic'})

#Alterar nome do indice
DadosIBGE.index.name = 'idMunic'

#Alterar para que o indece começe em 1
DadosIBGE.index = DadosIBGE.index + 1

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

DadosIBGE['dtCarga'] = dtCarga

# Mostrar os dados já transformados
display(DadosIBGE)

Unnamed: 0_level_0,nmMunic,codMunic,dtCarga
idMunic,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,Angra dos Reis,3300100,07/10/2023 14:37
2,Aperibé,3300159,07/10/2023 14:37
3,Araruama,3300209,07/10/2023 14:37
4,Areal,3300225,07/10/2023 14:37
5,Armação dos Búzios,3300233,07/10/2023 14:37
...,...,...,...
88,Três Rios,3306008,07/10/2023 14:37
89,Valença,3306107,07/10/2023 14:37
90,Varre-Sai,3306156,07/10/2023 14:37
91,Vassouras,3306206,07/10/2023 14:37


## Etapa 1 - Criar etapa de carga denominada Stage

Essa etapa "Stage" serve para isolar, limpar, transformar e validar os dados brutos, garantindo que eles estejam prontos para análises e relatórios no DW. Essa abordagem melhora a qualidade dos dados e a eficiência do processo de integração.

### Tabela Municipio

In [4]:
# Nome do esquema e tabela no PostgreSQL para OcorrenciasStage
esquema_stage = "public"
tabela_stage = "tbMunicipio"

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_stage}"."{tabela_stage}" (
        "idMunic" SERIAL PRIMARY KEY,
        "nmMunic" TEXT,
        "codMunic" INTEGER,
        "dtCarga" TIMESTAMP
    );
    
    CREATE INDEX IF NOT EXISTS idx_tbMunic_codMunic ON "{esquema_stage}"."{tabela_stage}" ("codMunic");
    
""")

# Inserir os dados diretamente na tabela PostgreSQL
for _, row in DadosIBGE.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema_stage}"."{tabela_stage}" ("nmMunic", "codMunic", "dtCarga")
        VALUES (%s, %s, %s);
    """, (row['nmMunic'], row['codMunic'], row['dtCarga']))
conexao_stage.commit()


print('Tabela Municipio criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela Municipio criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [5]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbMunicipio"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,idMunic,nmMunic,codMunic,dtCarga
0,1,Angra dos Reis,3300100,2023-10-07 14:27:00
1,2,Aperibé,3300159,2023-10-07 14:27:00
2,3,Araruama,3300209,2023-10-07 14:27:00
3,4,Areal,3300225,2023-10-07 14:27:00
4,5,Armação dos Búzios,3300233,2023-10-07 14:27:00
5,6,Arraial do Cabo,3300258,2023-10-07 14:27:00
6,7,Barra Mansa,3300407,2023-10-07 14:27:00
7,8,Barra do Piraí,3300308,2023-10-07 14:27:00
8,9,Belford Roxo,3300456,2023-10-07 14:27:00
9,10,Bom Jardim,3300506,2023-10-07 14:27:00


### Tabela DP (Departamento de Polícia)

In [7]:
# Criar a tabela tbDP e carregar do arquivo csv 
esquema = "public"
tabela = "tbDP"
arquivo_csv = "C:/Thuany/Projetos/Projeto Banco de dados e Data Warehouse/Arquivos/Dados/DP.csv"

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")


# Carregar os dados do arquivo CSV para um DataFrame, sem definir uma coluna como índice
DadosDP = pd.read_csv(arquivo_csv, index_col=None)

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema}"."{tabela}" (
        "codDP" INTEGER,
        "nmDP" VARCHAR(80),
        "enderecoDP" VARCHAR(150),
        "dtCarga" TIMESTAMP
    );
    
    CREATE INDEX IF NOT EXISTS idx_tbDP_codDP ON "{esquema}"."{tabela}"("codDP");
    
""")

conexao_stage.commit()

# Inserir os dados na tabela PostgreSQL
for _, row in DadosDP.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema}"."{tabela}" ("codDP", "nmDP", "enderecoDP", "dtCarga")
        VALUES (%s, %s, %s, %s);
    """, (row['codDP'], row['nmDP'], row['enderecoDP'], dtCarga))
conexao_stage.commit()

print('Tabela DP criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela DP criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [8]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbDP"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,codDP,nmDP,enderecoDP,dtCarga
0,1,001ª DP - Praça Mauá,Atendendo provisoriamente na sede da 4ª DP - P...,2023-10-07 14:39:00
1,4,004ª DP - Praça da República,"Av. Presidente Vargas, 1100 - Centro, Rio de J...",2023-10-07 14:39:00
2,5,005ª DP - Mem de Sá,"Avenida Gomes Freire, 320 - Centro, Rio de Jan...",2023-10-07 14:39:00
3,6,006ª DP - Cidade Nova,"Rua Professor Clementino Fraga, 77 - Centro, R...",2023-10-07 14:39:00
4,7,007ª DP - Santa Teresa,"Rua Francisco de Castro, 5 - Santa Teresa, Rio...",2023-10-07 14:39:00
5,9,009ª DP - Catete,"Rua Pedro Américo, 1 - Catete, Rio de Janeiro ...",2023-10-07 14:39:00
6,10,010ª DP - Botafogo,"Rua Bambina, 140 - Botafogo, Rio de Janeiro - ...",2023-10-07 14:39:00
7,11,011ª DP - Rocinha,"Rua Bertha Lutz, 80 - Gávea, Rio de Janeiro - ...",2023-10-07 14:39:00
8,12,012ª DP - Copacabana,"Rua Hilário de Gouveia, 102 - Copacabana, Rio ...",2023-10-07 14:39:00
9,13,013ª DP - Ipanema,"Avenida Nossa Senhora de Copacabana, 1260 - Co...",2023-10-07 14:39:00


### Tabela Responsáveis DP (Departamento de Polícia)

In [13]:
# Criar a tabela tbRespDP e carregar do arquivo csv 
esquema = "public"
tabela = "tbRespDP"
arquivo_csv = "C:/Thuany/Projetos/Projeto Banco de dados e Data Warehouse/Arquivos/Dados/ResponsavelDP.csv"

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Carregar os dados do arquivo CSV para um DataFrame, sem definir uma coluna como índice
DadosRespDP = pd.read_csv(arquivo_csv, index_col=None)

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema}"."{tabela}" (
        "codDP" INTEGER,
        "nmResponsavel" VARCHAR(80),
        "dtCarga" TIMESTAMP
    );
    
    CREATE INDEX IF NOT EXISTS idx_tbRespDP_codDP ON "{esquema}"."{tabela}"("codDP");
    
""")

# Inserir os dados na tabela PostgreSQL
for _, row in DadosRespDP.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema}"."{tabela}" ("codDP", "nmResponsavel", "dtCarga")
        VALUES (%s, %s, %s);
    """, (row['codDP'], row['nmResponsavel'], dtCarga))
conexao_stage.commit()

print('Tabela Responsáveis DP criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela Responsáveis DP criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [14]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbRespDP"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,codDP,nmResponsavel,dtCarga
0,1,Delegado de Polícia José Luiz Silva Duarte,2023-10-07 14:41:00
1,4,Delegada de Polícia Patricia de Paiva Aguiar,2023-10-07 14:41:00
2,5,Delegado de Polícia Bruno Gilaberte Freitas,2023-10-07 14:41:00
3,6,Delegado de Polícia Fabio Luiz Da Silva Souza,2023-10-07 14:41:00
4,7,Delegado de Polícia Carlos Alberto Meirelles D...,2023-10-07 14:41:00
5,9,Delegada de Polícia Maria Aparecida Salgado Ma...,2023-10-07 14:41:00
6,10,Delegado de Polícia Alexandre Herdy Barros Silva,2023-10-07 14:41:00
7,11,Delegada de Polícia Flávia Goes Monteiro Romer...,2023-10-07 14:41:00
8,12,Delegada de Polícia Bianca Rodrigues Xavier Li...,2023-10-07 14:41:00
9,13,Delegado de Polícia Felipe Santoro da Silva,2023-10-07 14:41:00


###  Tabela BPM (Batalhão da Policia Militar)

In [16]:
# Criar a tabela tbBPM e carregar do arquivo csv 
esquema = "public"
tabela = "tbBPM"
arquivo_csv = "C:/Thuany/Projetos/Projeto Banco de dados e Data Warehouse/Arquivos/Dados/BPM.csv"

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Carregar os dados do arquivo CSV para um DataFrame, sem definir uma coluna como índice
DadosBPM = pd.read_csv(arquivo_csv, index_col=None)

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema}"."{tabela}" (
        "codBPM" INTEGER,
        "nmBPM" VARCHAR(10),
        "enderecoBPM" VARCHAR(150),
        "dtCarga" TIMESTAMP
    );
    
    CREATE INDEX IF NOT EXISTS idx_tbBPM_codBPM ON "{esquema}"."{tabela}"("codBPM");
    
""")

# Inserir os dados na tabela PostgreSQL
for _, row in DadosBPM.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema}"."{tabela}" ("codBPM", "nmBPM", "enderecoBPM", "dtCarga")
        VALUES (%s, %s, %s, %s);
    """, (row['codBPM'], row['nmBPM'], row['enderecoBPM'], dtCarga))
conexao_stage.commit()

print('Tabela BPM criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela BPM criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [17]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbBPM"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,codBPM,nmBPM,enderecoBPM,dtCarga
0,2,2º BPM,"R. Álvaro Ramos, 155",2023-10-07 14:41:00
1,3,3º BPM,"R. Lucídio Lago, 181",2023-10-07 14:41:00
2,4,4º BPM,"R. Francisco Eugênio, 228",2023-10-07 14:41:00
3,5,5º BPM,"Praça Cel. Assunção, S/N",2023-10-07 14:41:00
4,6,6º BPM,"R. Barão de Mesquita, 625",2023-10-07 14:41:00
5,7,7º BPM,"R. Dr. Alfredo Backer, 367",2023-10-07 14:41:00
6,8,8º BPM,"Rua Tenente Coronel Cardoso, s/n",2023-10-07 14:41:00
7,9,9º BPM,"R. Tacaratu, 94",2023-10-07 14:41:00
8,10,10º BPM,"Rod. Lúcio Meira, 47000",2023-10-07 14:41:00
9,11,11º BPM,"R. Voluntários da Pátria, 474",2023-10-07 14:41:00


### Tabela AreaBPM (Batalhão da Policia Militar)

In [18]:
# Criar a tabela tbAreaBPM e carregar do arquivo csv 
esquema = "public"
tabela = "tbAreaBPM"
arquivo_csv = "C:/Thuany/Projetos/Projeto Banco de dados e Data Warehouse/Arquivos/Dados/areaBPM.csv"

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Carregar os dados do arquivo CSV para um DataFrame, sem definir uma coluna como índice
DadosAreaBPM = pd.read_csv(arquivo_csv, index_col=None)

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema}"."{tabela}" (
        "codBPM" INTEGER,
        "areaBPM" REAL,
        "dtCarga" TIMESTAMP
    );
    CREATE INDEX IF NOT EXISTS idx_tbAreaBPM_codBPM ON "{esquema}"."{tabela}"("codBPM");   
    
""")

# Inserir os dados na tabela PostgreSQL
for _, row in DadosAreaBPM.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema}"."{tabela}" ("codBPM", "areaBPM", "dtCarga")
        VALUES (%s, %s, %s);
    """, (row['codBPM'], row['areaBPM'], dtCarga))

conexao_stage.commit()

print('Tabela AreaBPM criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela AreaBPM criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [19]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbAreaBPM"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,codBPM,areaBPM,dtCarga
0,5,15.4,2023-10-07 14:42:00
1,4,17.1,2023-10-07 14:42:00
2,2,15.0,2023-10-07 14:42:00
3,23,25.2,2023-10-07 14:42:00
4,19,5.1,2023-10-07 14:42:00
5,31,175.4,2023-10-07 14:42:00
6,6,55.2,2023-10-07 14:42:00
7,22,14.8,2023-10-07 14:42:00
8,16,33.1,2023-10-07 14:42:00
9,3,39.0,2023-10-07 14:42:00


### Tabela Ocorrências 

In [20]:
arquivo_csv = "C:/Thuany/Projetos/Projeto Banco de dados e Data Warehouse/Arquivos/Dados/Ocorrencias.csv"

# Carregar os dados do arquivo CSV para um DataFrame
DadosOcorrencias = pd.read_csv(arquivo_csv, index_col=None)

DadosOcorrencias

Unnamed: 0,codDP,codBPM,ano,mes,mes_ano,Regiao,codMunic,Ocorrencia,SomaQtde
0,1,5,2018,1,2018m01,1,3304557,ameaca,7
1,1,5,2018,1,2018m01,1,3304557,apreensao_drogas,3
2,1,5,2018,1,2018m01,1,3304557,estelionato,81
3,1,5,2018,1,2018m01,1,3304557,estupro,1
4,1,5,2018,1,2018m01,1,3304557,extorsao,3
...,...,...,...,...,...,...,...,...,...
124000,168,33,2018,12,2018m12,3,3304409,posse_drogas,8
124001,168,33,2018,12,2018m12,3,3304409,recuperacao_veiculos,1
124002,168,33,2018,12,2018m12,3,3304409,roubo_rua,1
124003,168,33,2018,12,2018m12,3,3304409,roubo_transeunte,1


In [23]:
# Criar a tabela tbOcorrencias e carregar do arquivo CSV
esquema = "public"
tabela = "tbOcorrencias"

# Limpar a transação atual, se houver
cursor_stage.execute("ROLLBACK;")

# Criar tabela se não existir
cursor_stage.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema}"."{tabela}" (
        "codDP" INTEGER,
        "codBPM" INTEGER,
        "ano" INTEGER,
        "mes" INTEGER,
        "mes_ano" CHAR(7),
        "Regiao" INTEGER,
        "codMunic" INTEGER,
        "Ocorrencia" VARCHAR(50),
        "SomaQtde" INTEGER,
        "dtCarga" TIMESTAMP
    );
           
    CREATE INDEX IF NOT EXISTS idx_tbOcorrencias_codDP ON "{esquema}"."{tabela}"("codDP");
    CREATE INDEX IF NOT EXISTS idx_tbOcorrencias_codBPM ON "{esquema}"."{tabela}"("codBPM");
    CREATE INDEX IF NOT EXISTS idx_tbOcorrencias_codMunic ON "{esquema}"."{tabela}"("codMunic");
    
""")

# Adicionar uma coluna de data para armazenar o dia e a hora da carga de dados
dtCarga = datetime.today().strftime('%d/%m/%Y %H:%M')

# Lista das colunas que você deseja indexar
colunas_para_indexar = ["codDP", "codBPM", "codMunic"]

# Inserir os dados na tabela PostgreSQL
for _, row in DadosOcorrencias.iterrows():
    cursor_stage.execute(f"""
        INSERT INTO "{esquema}"."{tabela}" ("codDP", "codBPM", "ano", "mes", "mes_ano", "Regiao", "codMunic", "Ocorrencia", "SomaQtde", "dtCarga")
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """, (row['codDP'], row['codBPM'], row['ano'], row['mes'], row['mes_ano'], row['Regiao'], row['codMunic'], row['Ocorrencia'], row['SomaQtde'], dtCarga))

conexao_stage.commit()

print('Tabela Ocorrências criada e carga efetuada no Banco de Dados - Carga Stage!')

Tabela Ocorrências criada e carga efetuada no Banco de Dados - Carga Stage!


#### Conferindo se a tabela foi populada corretamente, selecionando os primeiros 10 registros

In [24]:
# Consulta SQL para selecionar os primeiros 10 registros da tabela
consulta_sql = """
    SELECT * FROM "tbOcorrencias"
    LIMIT 10;
"""

# Execute a consulta SQL e carregue os resultados em um DataFrame
df = pd.read_sql(consulta_sql, conexao_stage)
conexao_stage.commit()
conexao_stage.close()

# Exibir os resultados
print("Primeiros 10 registros da tabela:")
display(df)

Primeiros 10 registros da tabela:




Unnamed: 0,codDP,codBPM,ano,mes,mes_ano,Regiao,codMunic,Ocorrencia,SomaQtde,dtCarga
0,1,5,2018,1,2018m01,1,3304557,ameaca,7,2023-10-07 14:43:00
1,1,5,2018,1,2018m01,1,3304557,apreensao_drogas,3,2023-10-07 14:43:00
2,1,5,2018,1,2018m01,1,3304557,estelionato,81,2023-10-07 14:43:00
3,1,5,2018,1,2018m01,1,3304557,estupro,1,2023-10-07 14:43:00
4,1,5,2018,1,2018m01,1,3304557,extorsao,3,2023-10-07 14:43:00
5,1,5,2018,1,2018m01,1,3304557,furto_bicicleta,2,2023-10-07 14:43:00
6,1,5,2018,1,2018m01,1,3304557,furto_celular,23,2023-10-07 14:43:00
7,1,5,2018,1,2018m01,1,3304557,furto_coletivo,14,2023-10-07 14:43:00
8,1,5,2018,1,2018m01,1,3304557,furto_transeunte,52,2023-10-07 14:43:00
9,1,5,2018,1,2018m01,1,3304557,furto_veiculos,5,2023-10-07 14:43:00


## Etapa 2 - Criar etapa de carga no Data Warehouse

In [25]:
# Configurações de conexão com o Data Warehouse
db_config_dw = {
    "host": "localhost",
    "database": "OcorrenciasDW",
    "user": "postgres",
    "password": "admin123",
}

conexao_DW = psycopg2.connect(**db_config_dw)
cursor_DW = conexao_DW.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Data Warehouse bem-sucedida.")

Conexão com o Data Warehouse bem-sucedida.


### Tabela Dimensão Municipio 

In [26]:
# Renomear colunas para criar uma dimensão chamada Municipios
DadosIBGE = DadosIBGE.rename(columns={'Municípios do Rio de Janeiro':'nmMunic','Códigos':'codMunic'})

# Alterar o nome do índice para parametrizar com as demais dimensões
DadosIBGE.index.name = 'idMunic'

# Nome do esquema e tabela no PostgreSQL para OcorrenciasStage
esquema_DW = "public"
tabela_DW = "dMunicipio"

# Limpar a transação atual no Data Warehouse, se houver
cursor_DW.execute("ROLLBACK;")

# Criar tabela se não existir no Data Warehouse
cursor_DW.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_DW}"."{tabela_DW}" (
        "idMunic" SERIAL PRIMARY KEY,
        "nmMunic" TEXT,
        "codMunic" INTEGER
    );
    
    CREATE INDEX IF NOT EXISTS idx_dMunicipio_idMunic ON "{esquema_DW}"."{tabela_DW}" ("idMunic");;
""")

# Inserir os dados diretamente na tabela no Data Warehouse
for _, row in DadosIBGE.iterrows():
    cursor_DW.execute(f"""
        INSERT INTO "{esquema_DW}"."{tabela_DW}" ("nmMunic", "codMunic")
        VALUES (%s, %s);
    """, (row['nmMunic'], row['codMunic']))

# Comitar a transação no Data Warehouse
conexao_DW.commit()
conexao_DW.close()

print('Tabela dMunicipio criada e carga efetuada no Banco de Dados - Carga Data Warehouse!')

Tabela dMunicipio criada e carga efetuada no Banco de Dados - Carga Data Warehouse!


#### Abrir conexão com o banco Stage

In [27]:
# Conexão com Stage para criar a query e salvar os dados
conexao_stage = psycopg2.connect(**db_config_stage)
cursor_stage = conexao_stage.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Stage bem-sucedida.")

Conexão com o Stage bem-sucedida.


### Tabela Dimensão DP

Montar query para juntar as informações de duas tabelas **(tbDP e tbRespDP)** para a criação da tabela dimensão 'DP' com as informações completas

In [28]:
# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão DP completa.
# Certifique-se de que qualquer transação anterior seja encerrada
cursor_stage.execute("ROLLBACK;")

# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão DP completa.
query = '''
       SELECT 
        "codDP",
        "nmDP",
        "enderecoDP",
        "nmResponsavel"
        FROM
    (
        SELECT
            a."codDP",
            a."nmDP",
            a."enderecoDP",
            b."nmResponsavel",
            max(a."dtCarga")
        FROM "public"."tbDP" a
        JOIN "public"."tbRespDP" b
        ON a."codDP" = b."codDP"
        WHERE a."dtCarga" = (SELECT max(x."dtCarga") FROM "public"."tbDP" x)
        GROUP BY
            a."codDP",
            a."nmDP",
            a."enderecoDP",
            b."nmResponsavel"
) a;

'''
conexao_stage.commit()

# Executar a consulta SQL e salvar o resultado em um DataFrame
df1 = pd.read_sql(query, conexao_stage)
df1



Unnamed: 0,codDP,nmDP,enderecoDP,nmResponsavel
0,124,124ª DP - Saquarema,"Rua Doutor Luiz Januário, 201 - Campo Aviacao,...",Delegado de Polícia Andre Luiz Salvador Bueno
1,21,021ª DP - Bonsucesso,"Avenida dos Democráticos, 1322 - Bonsucesso, R...",Delegado de Polícia Hilton Pinho Alonso
2,64,064ª DP - São João de Meriti,"Avenida Doutor Arruda Negreiro, s/nº - São Mat...",Delegado de Polícia Evaristo Pontes Magalhães
3,81,081ª DP - Itaipu,"Avenida Francisco da Cruz, 6666 - Itaipu, Nite...",Delegado de Polícia Fábio Oliveira Barucke
4,105,105ª DP - Petrópolis,"Avenida Barão do Rio Branco, 3099 - Retiro, Pe...",Delegado de Polícia João Valentim dos Santos Neto
...,...,...,...,...
132,127,127ª DP - Armação de Búzios,"Avenida Parque, s/n - Village de Búzios, Búzio...",Delegado de Polícia Rodrigo Bichara Moreira
133,93,093ª DP - Volta Redonda,"Avenida Lucas Evangelista de Oliveira Franco, ...",Delegado de Polícia Edezio de Castro Ramos Junior
134,40,040ª DP - Honório Gurgel,"Rua Guarama, 15 - Rocha Miranda, Rio de Janeir...",Delegada de Polícia Márcia Beck Simões
135,32,032ª DP - Taquara,"Rua Professora Francisca Piragibe, 80 - Taquar...",Delegado de Polícia Alessandro Petralanda Santos


In [29]:
conexao_stage.close()

#### Abrir conexão com o Data Warehouse

In [30]:
conexao_DW = psycopg2.connect(**db_config_dw)
cursor_DW = conexao_DW.cursor()
# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Data Warehouse bem-sucedida.")

Conexão com o Data Warehouse bem-sucedida.


### Criar tabela Dimensão DP  

In [31]:
# Nome do esquema e tabela no PostgreSQL para Data Warehouse
esquema_DW = "public"
tabela_DW = "dDP"

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Agora você pode executar os comandos SQL novamente
# Criar tabela se não existir
cursor_DW.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_DW}"."{tabela_DW}" (
        "idDP" SERIAL PRIMARY KEY,
        "codDP" INTEGER,
        "nmDP" VARCHAR(50),
        "enderecoDP" VARCHAR(200),
        "nmResponsavel" VARCHAR (150)
    );
    
    
    CREATE INDEX IF NOT EXISTS idx_idDP ON "{esquema_DW}"."{tabela_DW}" ("idDP");
""")


# Inserir os dados diretamente na tabela no Data Warehouse
for _, row in df1.iterrows():
    cursor_DW.execute(f"""
        INSERT INTO "{esquema_DW}"."{tabela_DW}" ("codDP", "nmDP", "enderecoDP", "nmResponsavel")
        VALUES (%s, %s, %s, %s);
    """, (row['codDP'], row['nmDP'], row['enderecoDP'], row['nmResponsavel']))

# Comitar a transação no Data Warehouse
conexao_DW.commit()
conexao_DW.close()


print('Tabela dDP criada e carga efetuada no Banco de Dados - Carga Data Warehouse!')

Tabela dDP criada e carga efetuada no Banco de Dados - Carga Data Warehouse!


#### Abrir conexão com o banco Stage

In [35]:
# Conexão com Stage para criar a query e salvar os dados
conexao_stage = psycopg2.connect(**db_config_stage)
cursor_stage = conexao_stage.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Stage bem-sucedida.")

Conexão com o Stage bem-sucedida.


### Tabela Dimensão BPM


Montar query para juntar as informações de duas tabelas **(tbBPM e tbAreaBPM)** para a criação da tabela dimensão 'BPM' com as informações completas

In [36]:
# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão DP completa.
# Certifique-se de que qualquer transação anterior seja encerrada
cursor_stage.execute("ROLLBACK;")

# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão DP completa.
query = '''
       SELECT
    "codBPM",
    "nmBPM",
    "enderecoBPM",
    "areaBPM"
FROM 
(
    SELECT 
        a."codBPM",
        a."nmBPM",
        a."enderecoBPM",
        b."areaBPM",
        max(a."dtCarga")
    FROM "public"."tbBPM" a
    JOIN "public"."tbAreaBPM" b
    ON a."codBPM" = b."codBPM"
    WHERE a."dtCarga" = (SELECT MAX(x."dtCarga") FROM "public"."tbBPM" x)
    GROUP BY
        a."codBPM",
        a."nmBPM",
        a."enderecoBPM",
        b."areaBPM"
    ) a
'''
conexao_stage.commit()

# Executar a consulta SQL e salvar o resultado em um DataFrame
df2 = pd.read_sql(query, conexao_stage)
df2



Unnamed: 0,codBPM,nmBPM,enderecoBPM,areaBPM
0,2,2º BPM,"R. Álvaro Ramos, 155",15.0
1,3,3º BPM,"R. Lucídio Lago, 181",39.0
2,4,4º BPM,"R. Francisco Eugênio, 228",17.1
3,5,5º BPM,"Praça Cel. Assunção, S/N",15.4
4,6,6º BPM,"R. Barão de Mesquita, 625",55.2
5,7,7º BPM,"R. Dr. Alfredo Backer, 367",247.7
6,8,8º BPM,"Rua Tenente Coronel Cardoso, s/n",6635.7
7,9,9º BPM,"R. Tacaratu, 94",33.6
8,10,10º BPM,"Rod. Lúcio Meira, 47000",4243.6
9,11,11º BPM,"R. Voluntários da Pátria, 474",4041.1


In [37]:
conexao_stage.close()

#### Abrir conexão com o Data Warehouse

In [38]:
conexao_DW = psycopg2.connect(**db_config_dw)
cursor_DW = conexao_DW.cursor()
# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Data Warehouse bem-sucedida.")

Conexão com o Data Warehouse bem-sucedida.


In [39]:
# Nome do esquema e tabela no PostgreSQL para Data Warehouse
esquema_DW = "public"
tabela_DW = "dBPM"

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Agora você pode executar os comandos SQL novamente
# Criar tabela se não existir
cursor_DW.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_DW}"."{tabela_DW}" (
        "idBPM" SERIAL PRIMARY KEY,
        "codBPM" INTEGER,
        "nmBPM" VARCHAR (7),
        "enderecoBPM" VARCHAR (200),
        "areaBPM" REAL 

);

CREATE INDEX IF NOT EXISTS idx_idBPM ON "{esquema_DW}"."{tabela_DW}" ("idBPM");
""")

# Inserir os dados diretamente na tabela no Data Warehouse
for _, row in df2.iterrows():
    cursor_DW.execute(f"""
        INSERT INTO "{esquema_DW}"."{tabela_DW}" ("codBPM", "nmBPM","enderecoBPM","areaBPM" )
        VALUES (%s, %s, %s, %s);
    """, (row['codBPM'], row['nmBPM'], row['enderecoBPM'], row['areaBPM']))

# Comitar a transação no Data Warehouse
conexao_DW.commit()
conexao_DW.close()


print('Tabela dBPM criada e carga efetuada no Banco de Dados - Carga Data Warehouse!')

Tabela dBPM criada e carga efetuada no Banco de Dados - Carga Data Warehouse!


#### Abrir conexão com o banco Stage

In [43]:
# Conexão com Stage para criar a query e salvar os dados
conexao_stage = psycopg2.connect(**db_config_stage)
cursor_stage = conexao_stage.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Stage bem-sucedida.")

Conexão com o Stage bem-sucedida.


In [44]:
# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão periodo completa.
# Certifique-se de que qualquer transação anterior seja encerrada
cursor_stage.execute("ROLLBACK;")

# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela dimensão DP completa.
query = '''
       WITH RECURSIVE data(d) AS (
    SELECT '2018-01-01'::timestamp
    UNION ALL
    SELECT d + interval '1 month'
    FROM data
    WHERE d < current_date
)
SELECT 
    to_char(d, 'DD/MM/YYYY') AS data,
    CAST(EXTRACT(MONTH FROM d) AS INTEGER) AS mes,
    CAST(EXTRACT(YEAR FROM d) AS INTEGER) AS ano,
    CASE
        WHEN EXTRACT(MONTH FROM d) IN (1, 2, 3) THEN 1 
        WHEN EXTRACT(MONTH FROM d) IN (4, 5, 6) THEN 2
        WHEN EXTRACT(MONTH FROM d) IN (7, 8, 9) THEN 3
        ELSE 4
    END AS trimestre,
    CASE
        WHEN EXTRACT(MONTH FROM d) IN (1, 2, 3, 4, 5, 6) THEN 1
        ELSE 2
    END AS semestre
FROM data;

'''
conexao_stage.commit()


# Executar a consulta SQL e salvar o resultado em um DataFrame
df3 = pd.read_sql(query, conexao_stage)
df3



Unnamed: 0,data,mes,ano,trimestre,semestre
0,01/01/2018,1,2018,1,1
1,01/02/2018,2,2018,1,1
2,01/03/2018,3,2018,1,1
3,01/04/2018,4,2018,2,1
4,01/05/2018,5,2018,2,1
...,...,...,...,...,...
66,01/07/2023,7,2023,3,2
67,01/08/2023,8,2023,3,2
68,01/09/2023,9,2023,3,2
69,01/10/2023,10,2023,4,2


In [45]:
conexao_stage.close()

#### Abrir conexão com o Data Warehouse

In [46]:
conexao_DW = psycopg2.connect(**db_config_dw)
cursor_DW = conexao_DW.cursor()
# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Data Warehouse bem-sucedida.")

Conexão com o Data Warehouse bem-sucedida.


### Tabela Dimensao Periodo


In [47]:
# Nome do esquema e tabela no PostgreSQL para Data Warehouse
esquema_DW = "public"
tabela_DW = "dPeriodo"

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Agora você pode executar os comandos SQL novamente
# Criar tabela se não existir
cursor_DW.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_DW}"."{tabela_DW}" (
        "idPeriodo" SERIAL PRIMARY KEY,
        "data" DATE,
        "mes" INTEGER,
        "ano" INTEGER,
        "trimestre" INTEGER,
        "semestre" INTEGER

);

CREATE INDEX IF NOT EXISTS idx_idperiodo ON "{esquema_DW}"."{tabela_DW}" ("idPeriodo");
""")

# Inserir os dados diretamente na tabela no Data Warehouse
for _, row in df3.iterrows():
    cursor_DW.execute(f"""
        INSERT INTO "{esquema_DW}"."{tabela_DW}" ("data", "mes","ano","trimestre","semestre" )
        VALUES (%s, %s, %s, %s, %s);
    """, (row['data'], row['mes'], row['ano'], row['trimestre'], row['semestre']))

# Comitar a transação no Data Warehouse
conexao_DW.commit()
conexao_DW.close()

print('Tabela dPeriodo criada e carga efetuada no Banco de Dados - Carga Data Warehouse!')

Tabela dPeriodo criada e carga efetuada no Banco de Dados - Carga Data Warehouse!


#### Abrir conexão com o banco Stage

In [48]:
# Conexão com Stage para criar a query e salvar os dados
conexao_stage = psycopg2.connect(**db_config_stage)
cursor_stage = conexao_stage.cursor()

# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Stage bem-sucedida.")

Conexão com o Stage bem-sucedida.


### Etapa 3 -  Criar dataframe com resultado da query de carga de dados para a montar a tabela fOcorrencias no Data Warehouse

In [49]:
#definição da query de carga de dados na tabela fato
queryfOcorrencias = '''
     SELECT
        a."codDP",
        a."codBPM",
        a."ano",
        a."mes",
        a."Regiao",
        a."codMunic",
        a."Ocorrencia",
        a."SomaQtde"
    FROM "tbOcorrencias" a
    WHERE a."dtCarga" = (
                        SELECT MAX("dtCarga")
                        FROM "tbOcorrencias"
                        )

'''

#popular o dataframe
fOcorrencias = pd.read_sql(queryfOcorrencias,conexao_stage)

conexao_stage.commit()

fOcorrencias



Unnamed: 0,codDP,codBPM,ano,mes,Regiao,codMunic,Ocorrencia,SomaQtde
0,1,5,2018,1,1,3304557,ameaca,7
1,1,5,2018,1,1,3304557,apreensao_drogas,3
2,1,5,2018,1,1,3304557,estelionato,81
3,1,5,2018,1,1,3304557,estupro,1
4,1,5,2018,1,1,3304557,extorsao,3
...,...,...,...,...,...,...,...,...
124000,168,33,2018,12,3,3304409,posse_drogas,8
124001,168,33,2018,12,3,3304409,recuperacao_veiculos,1
124002,168,33,2018,12,3,3304409,roubo_rua,1
124003,168,33,2018,12,3,3304409,roubo_transeunte,1


In [50]:
conexao_stage.close()

#### Abrir conexão com o Data Warehouse

In [52]:
conexao_DW = psycopg2.connect(**db_config_dw)
cursor_DW = conexao_DW.cursor()
# Se a conexão for bem-sucedida, imprima uma mensagem de sucesso
print("Conexão com o Data Warehouse bem-sucedida.")

Conexão com o Data Warehouse bem-sucedida.


### Tabela Fato Ocorrências no Data Warehouse

Neste etapa irei criar primeiro a tabela fato Ocorências e depois criar um tabela temporaria para usar a consulta que salvei no dataframe para dar a carga na tabela fato. 

In [56]:
# Nome do esquema e tabela no PostgreSQL para Data Warehouse
esquema_DW = "public"
tabela_DW = "fOcorrencias"

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Agora você pode executar os comandos SQL novamente
# Criar tabela se não existir e criar índices
cursor_DW.execute(f"""
    CREATE TABLE IF NOT EXISTS "{esquema_DW}"."{tabela_DW}" (
        "idDP" INTEGER REFERENCES "dDP"("idDP") ON UPDATE NO ACTION ON DELETE NO ACTION,
        "idBPM" INTEGER REFERENCES "dBPM"("idBPM") ON UPDATE NO ACTION ON DELETE NO ACTION,
        "idPeriodo" INTEGER REFERENCES "dPeriodo"("idPeriodo") ON UPDATE NO ACTION ON DELETE NO ACTION,
        "regiao" INTEGER,
        "idMunic" INTEGER REFERENCES "dMunicipio"("idMunic") ON UPDATE NO ACTION ON DELETE NO ACTION,
        "ocorrencias" VARCHAR(150),  -- Ajuste o tamanho conforme necessário
        "qtde" INTEGER
    );
    
    --Crie índices nas colunas idDP, idBPM, idPeriodo e idMunic
    CREATE INDEX IF NOT EXISTS idx_idDP ON "{esquema_DW}"."{tabela_DW}" ("idDP");
    CREATE INDEX IF NOT EXISTS idx_idBPM ON "{esquema_DW}"."{tabela_DW}" ("idBPM");
    CREATE INDEX IF NOT EXISTS idx_idPeriodo ON "{esquema_DW}"."{tabela_DW}" ("idPeriodo");
    CREATE INDEX IF NOT EXISTS idx_idMunic ON "{esquema_DW}"."{tabela_DW}" ("idMunic");
""")

# Comitar a transação no Data Warehouse
conexao_DW.commit()

print('Tabela fOcorrencias criada no Banco de Dados - Carga Data Warehouse!')

Tabela fOcorrencias criada no Banco de Dados - Carga Data Warehouse!


#### Abrir conexão com o banco Stage

## Tabela Temporária tempOcorrencias para armazenar dados

In [54]:
# Substitua 'tempOcorrencias' pelo nome desejado para a tabela temporária
tabela_temporaria = 'tempOcorrencias'

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Criar um mecanismo SQLAlchemy a partir das informações de conexão
engine = create_engine(f'postgresql://{db_config_dw["user"]}:{db_config_dw["password"]}@{db_config_dw["host"]}/{db_config_dw["database"]}')

# Substitua 'fOcorrencias' pelo seu DataFrame e 'tempOcorrencias' pelo nome da tabela temporária desejada
fOcorrencias.to_sql(tabela_temporaria, engine, if_exists="replace")

# Verificar se a tabela temporária foi criada com sucesso
inspector = inspect(engine)
if tabela_temporaria in inspector.get_table_names():
    print(f"A tabela temporária {tabela_temporaria} foi criada com sucesso.")
else:
    print(f"A tabela temporária {tabela_temporaria} não foi encontrada.")
    
# Comitar a transação no Data Warehouse
conexao_DW.commit()

# Fechar a conexão do mecanismo SQLAlchemy
engine.dispose()

A tabela temporária tempOcorrencias foi criada com sucesso.


### Inserir dados na tabela Fato Ocorrências (fOcorrencias) - CARGA INCREMENTAL

In [55]:
# nesta etapa irei dar a carga incremental na tabela fato Ocorrencia. 
#Será necessario fazer um join com a tabela temporaria para fazer a junção dos dados.
esquema_DW = "public"
tabela_DW = "fOcorrencias"

# ROLLBACK para encerrar a transação atual
conexao_DW.rollback()

# Nesta etapa iremos juntar informações de mais uma tabela para criarmos uma tabela fato fOcorrencias completa.
query = '''
 SELECT
        b."idDP",
        c."idBPM",
        d."idPeriodo",
        a."Regiao" as "regiao",
        e."idMunic",
        a."Ocorrencia" as "ocorrencias",
        a."SomaQtde" as "qtde"
    FROM "public"."tempOcorrencias" a 
    JOIN "public"."dDP" b ON a."codDP" = b."codDP"
    JOIN "public"."dBPM" c ON a."codBPM" = c."codBPM"
    JOIN "public"."dPeriodo" d ON (a."ano" = d."ano") AND (a."mes" = d."mes")
    JOIN "public"."dMunicipio" e ON a."codMunic" = e."codMunic"
    LEFT JOIN "public"."fOcorrencias" g ON b."idDP" = g."idDP"
    AND c."idBPM" = g."idBPM"
    AND d."idPeriodo" = g."idPeriodo"
    AND e."idMunic" = g."idMunic"  
    
'''

# Executar a consulta SQL no banco de dados do Stage
cursor_DW.execute(query)

# Recuperar os resultados da consulta em um DataFrame
df4 = pd.DataFrame(cursor_DW.fetchall(), columns=["idDP", "idBPM", "idPeriodo", "regiao", "idMunic", "ocorrencias", "qtde"])

# Inserir os dados na tabela fato fOcorrencias no Data Warehouse
for _, row in df4.iterrows():
    cursor_DW.execute(f"""
        INSERT INTO "{esquema_DW}"."{tabela_DW}" ("idDP", "idBPM", "idPeriodo", "regiao", "idMunic", "ocorrencias", "qtde")
        VALUES (%s, %s, %s, %s, %s, %s, %s);
    """, (row['idDP'], row['idBPM'], row['idPeriodo'], row['regiao'], row['idMunic'], row['ocorrencias'], row['qtde']))

# Comitar a transação no Data Warehouse
conexao_DW.commit()

print('Carga da tabela fato fOcorrencias efetuada com sucesso no Data Warehouse!')

Carga da tabela fato fOcorrencias efetuada com sucesso no Data Warehouse!


## Etapa 4 - Atualização Retroativa na Fato Ocorrências (fOcorrencias) 

In [59]:
# Certifique-se de que qualquer transação anterior seja encerrada no Data Warehouse
cursor_DW.execute("ROLLBACK;")

# Definindo a query de verificação da existência de atualizações retroativas
query_atualizacao = '''
    SELECT
        b."idDP",
        c."idBPM",
        d."idPeriodo",
        a."Regiao" as "regiao",
        e."idMunic",
        a."Ocorrencia" as "ocorrencias",
        a."SomaQtde" as "qtde"
    FROM "public"."tempOcorrencias" a 
    JOIN "public"."dDP" b ON a."codDP" = b."codDP"
    JOIN "public"."dBPM" c ON a."codBPM" = c."codBPM"
    JOIN "public"."dPeriodo" d ON (a."ano" = d."ano") AND (a."mes" = d."mes")
    JOIN "public"."dMunicipio" e ON a."codMunic" = e."codMunic"
    LEFT JOIN "public"."fOcorrencias" g ON b."idDP" = g."idDP"
    AND c."idBPM" = g."idBPM"
    AND d."idPeriodo" = g."idPeriodo"
    AND e."idMunic" = g."idMunic"  
    WHERE a."SomaQtde" <> g."qtde"
'''

# Executar a consulta SQL no banco de dados do Stage
cursor_DW.execute(query_atualizacao)

# Criar um DataFrame de atualização
atualiza_fOcorrencias = pd.read_sql(query_atualizacao, conexao_DW)

# Atualizar os dados na tabela fOcorrencias
print("Iniciando atualização na fato Ocorrências!")

# Definir a query de atualização
qry_atualiza_fOcorrencias = '''
    UPDATE fOcorrencias
    SET "qtde" = %s
    WHERE "idDP" = %s
    AND "idBPM" = %s
    AND "idPeriodo" = %s
    AND "regiao" = %s
    AND "idMunic" = %s
    AND "ocorrencias" = %s
'''

# Atualizar dados na tabela fOcorrencias
for _, row in atualiza_fOcorrencias.iterrows():
    cursor_DW.execute(qry_atualiza_fOcorrencias, (
        row['qtde'], row['idDP'], row['idBPM'], row['idPeriodo'], 
        row['regiao'], row['idMunic'], row['ocorrencias']
    ))

# Confirmar a transação
conexao_DW.commit()

# Fechar a conexão
conexao_DW.close()

print("Fim da atualização de dados na fato Ocorrências!", len(atualiza_fOcorrencias), "registros atualizados!")

Iniciando atualização na fato Ocorrências!
Fim da atualização de dados na fato Ocorrências! 0 registros atualizados!


