# Projeto Estágio supervisionado - ETL

O objetivo desde notbook é Desenvolver o processo de Inserção dos dados nas tabelas já existentes no Data Warehouse, conforme o diagrama de Entidade-Relacionamento (DER) e o Script schema.sql

In [6]:
# Imports

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
import urllib.parse # Resolver problema com a senha do banco CARACTER ESPECIAL
import os

### Conexão com o Banco de Dados

Nesta etapa, vamos criar uma conexão com o banco de dados MySQL utilizando o SQLAlchemy. 


In [7]:
PASTA_DADOS = 'Data'

# Dados de conexão

db_user = 'root'
db_password = '01611478Marlin@'
db_host = 'localhost'
db_port = '3306'
db_name = 'projetopetroleo'

senha_codificada = urllib.parse.quote_plus(db_password)

# Criando uma conexão com o banco de dados
try: 
    con_str = f"mysql+mysqlconnector://{db_user}:{senha_codificada}@{db_host}:{db_port}/{db_name}" 
    engine = create_engine(con_str)
    print('Conectado ao banco de dados com sucesso!')
except Exception as e:
    print(f'Erro ao conectar ao banco de dados: {e}')

Conectado ao banco de dados com sucesso!


## `dim_calendario`
### Geração dos Dados do Calendário

Nesta etapa, será criado um DataFrame do Pandas contendo todos os dias de 1990 a 2030. Em seguida, será extraído e criada todas as colunas de enriquecimento que definimos no nosso modelo (ano, mês, trimestre, etc.), incluindo a nossa chave primária no formato `AAAAMMDD`.

In [8]:
# Definindo o período de datas
data_inicio = '1990-01-01'
data_fim = '2030-12-31'

datas = pd.date_range(start=data_inicio, end=data_fim)
df_calendario = pd.DataFrame(datas, columns=['data_completa'])

# Criando a PK (AAAAMMDD)
df_calendario['id_calendario'] = df_calendario['data_completa'].dt.strftime('%Y%m%d').astype(int)

# Extraindo e enriquecendo os atributos da data
df_calendario['ano'] = df_calendario['data_completa'].dt.year
df_calendario['mes_numero'] = df_calendario['data_completa'].dt.month
df_calendario['dia_numero'] = df_calendario['data_completa'].dt.day
df_calendario['trimestre_numero'] = df_calendario['data_completa'].dt.quarter
df_calendario['semestre_numero'] = (df_calendario['data_completa'].dt.quarter + 1) // 2

# Mapeando nomes em português
mapa_meses = {1: 'Janeiro', 2: 'Fevereiro', 3: 'Março', 4: 'Abril', 5: 'Maio', 6: 'Junho', 7: 'Julho', 8: 'Agosto', 9: 'Setembro', 10: 'Outubro', 11: 'Novembro', 12: 'Dezembro'}
mapa_dias_semana = {0: 'Segunda-feira', 1: 'Terça-feira', 2: 'Quarta-feira', 3: 'Quinta-feira', 4: 'Sexta-feira', 5: 'Sábado', 6: 'Domingo'}

df_calendario['mes_nome'] = df_calendario['mes_numero'].map(mapa_meses)
df_calendario['trimestre_nome'] = 'T' + df_calendario['trimestre_numero'].astype(str)
df_calendario['semestre_nome'] = 'S' + df_calendario['semestre_numero'].astype(str)
df_calendario['dia_da_semana'] = df_calendario['data_completa'].dt.dayofweek.map(mapa_dias_semana)

# Organizando as colunas na ordem correta da tabela
ordem_colunas = [
    'id_calendario', 'data_completa', 'ano', 'mes_numero', 'mes_nome',
    'dia_numero', 'trimestre_numero', 'trimestre_nome', 'semestre_numero',
    'semestre_nome', 'dia_da_semana'
]
df_calendario = df_calendario[ordem_colunas]

print("DataFrame da dim_calendario gerado com sucesso.")

DataFrame da dim_calendario gerado com sucesso.


### Caregando os Dados para o Banco de Dados

Com o DataFrame pronto e formatado exatamente como a nossa tabela `dim_calendario` no MySQL, agora podemos fazer a carga dos dados. Usaremos o método `.to_sql()` do Pandas, que é otimizado para inserção em massa.

In [9]:
# Carregando o DataFrame para a tabela SQL
try:
    df_calendario.to_sql('dim_calendario', con=engine, if_exists='append', index=False)
    print(f"{len(df_calendario)} registros carregados com sucesso na tabela 'dim_calendario'!")
except Exception as e:
    print(f"Ocorreu um erro durante a carga: {e}")

14975 registros carregados com sucesso na tabela 'dim_calendario'!


### Verificação
Vamos visualizar o início e o fim do DataFrame final que foi carregado

In [10]:
# Visualizando o resultado
display(df_calendario.head())
display(df_calendario.tail())
display(df_calendario.info())

Unnamed: 0,id_calendario,data_completa,ano,mes_numero,mes_nome,dia_numero,trimestre_numero,trimestre_nome,semestre_numero,semestre_nome,dia_da_semana
0,19900101,1990-01-01,1990,1,Janeiro,1,1,T1,1,S1,Segunda-feira
1,19900102,1990-01-02,1990,1,Janeiro,2,1,T1,1,S1,Terça-feira
2,19900103,1990-01-03,1990,1,Janeiro,3,1,T1,1,S1,Quarta-feira
3,19900104,1990-01-04,1990,1,Janeiro,4,1,T1,1,S1,Quinta-feira
4,19900105,1990-01-05,1990,1,Janeiro,5,1,T1,1,S1,Sexta-feira


Unnamed: 0,id_calendario,data_completa,ano,mes_numero,mes_nome,dia_numero,trimestre_numero,trimestre_nome,semestre_numero,semestre_nome,dia_da_semana
14970,20301227,2030-12-27,2030,12,Dezembro,27,4,T4,2,S2,Sexta-feira
14971,20301228,2030-12-28,2030,12,Dezembro,28,4,T4,2,S2,Sábado
14972,20301229,2030-12-29,2030,12,Dezembro,29,4,T4,2,S2,Domingo
14973,20301230,2030-12-30,2030,12,Dezembro,30,4,T4,2,S2,Segunda-feira
14974,20301231,2030-12-31,2030,12,Dezembro,31,4,T4,2,S2,Terça-feira


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14975 entries, 0 to 14974
Data columns (total 11 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   id_calendario     14975 non-null  int64         
 1   data_completa     14975 non-null  datetime64[ns]
 2   ano               14975 non-null  int32         
 3   mes_numero        14975 non-null  int32         
 4   mes_nome          14975 non-null  object        
 5   dia_numero        14975 non-null  int32         
 6   trimestre_numero  14975 non-null  int32         
 7   trimestre_nome    14975 non-null  object        
 8   semestre_numero   14975 non-null  int32         
 9   semestre_nome     14975 non-null  object        
 10  dia_da_semana     14975 non-null  object        
dtypes: datetime64[ns](1), int32(5), int64(1), object(4)
memory usage: 994.6+ KB


None

## `dim_localizacao`
Neste etapa, vamos popular a tabela `dim_localizacao` extraindo, limpando e unificando dados de localização de múltiplos arquivos de origem.

In [11]:

# Lista de TODOS os arquivos que vamos processar para ter uma dimensão completa
arquivos_com_local = [
    'producao-petroleo-m3-1997-2025.csv',
    'processamento-petroleo-m3-1990-2025.csv',
    'ca-2004-01.csv',
    'ca-2014-01.csv',
    'ca-2019-02.csv',
    'ca-2020-02.csv',
    'ca-2022-02.csv',
    'ca-2025-01.csv'
]

lista_de_dfs_locais = []

# Dicionário de mapeamento de colunas
mapa_colunas = {
    'GRANDE REGIÃO': 'regiao', 'UNIDADE DA FEDERAÇÃO': 'estado',
    'Estado - Sigla': 'uf_sigla', 'Regiao - Sigla': 'regiao_sigla', # Será descartada depois, mas ajuda a unificar
    'Municipio': 'municipio', 'Bairro': 'bairro', 'Cep': 'cep',
    'Nome da Rua': 'nome_rua', 'Numero Rua': 'numero_rua'
}

# Loop principal pelos arquivos
for nome_arquivo in arquivos_com_local:
    caminho_completo = os.path.join(PASTA_DADOS, nome_arquivo)
    print(f"Processando arquivo: {nome_arquivo}...")
    
    # Lendo apenas o cabeçalho para ver quais colunas de localização existem no arquivo
    colunas_originais_no_arquivo = pd.read_csv(caminho_completo, sep=';', encoding='utf-8-sig', nrows=0).columns
    colunas_para_ler = [col for col in colunas_originais_no_arquivo if col in mapa_colunas.keys()]
    
    if not colunas_para_ler:
        print("Nenhuma coluna de localização encontrada. Pulando.")
        continue
        
    # Lendo arquivo em pedaços (chunks) usando apenas as colunas necessárias
    chunk_iterator = pd.read_csv(
        caminho_completo, 
        sep=';', 
        encoding='utf-8-sig',
        usecols=colunas_para_ler,      # OTIMIZAÇÃO 1
        chunksize=50000,               # OTIMIZAÇÃO 2
        low_memory=False
    )
    
    for chunk in chunk_iterator:
        chunk.rename(columns=mapa_colunas, inplace=True)
        lista_de_dfs_locais.append(chunk)

# Unificando todos os pedaços
df_locais_completo = pd.concat(lista_de_dfs_locais, ignore_index=True)
print("\nTodos os arquivos foram lidos e unificados.")

# Renomeia todas as colunas
df_locais_completo.rename(columns=mapa_colunas, inplace=True)


print("Iniciando enriquecimento de dados de localização...")

# Criando os dicionários de mapeamento COMPLETOS
uf_para_estado = {
    'AC': 'ACRE', 'AL': 'ALAGOAS', 'AP': 'AMAPÁ', 'AM': 'AMAZONAS', 'BA': 'BAHIA', 
    'CE': 'CEARÁ', 'DF': 'DISTRITO FEDERAL', 'ES': 'ESPÍRITO SANTO', 'GO': 'GOIÁS', 
    'MA': 'MARANHÃO', 'MT': 'MATO GROSSO', 'MS': 'MATO GROSSO DO SUL', 'MG': 'MINAS GERAIS', 
    'PA': 'PARÁ', 'PB': 'PARAÍBA', 'PR': 'PARANÁ', 'PE': 'PERNAMBUCO', 'PI': 'PIAUÍ', 
    'RJ': 'RIO DE JANEIRO', 'RN': 'RIO GRANDE DO NORTE', 'RS': 'RIO GRANDE DO SUL', 
    'RO': 'RONDÔNIA', 'RR': 'RORAIMA', 'SC': 'SANTA CATARINA', 'SP': 'SÃO PAULO', 
    'SE': 'SERGIPE', 'TO': 'TOCANTINS'
}
estado_para_uf = {v: k for k, v in uf_para_estado.items()} # Mapa reverso automático

uf_para_regiao = {
    'AC': 'REGIÃO NORTE', 'AP': 'REGIÃO NORTE', 'AM': 'REGIÃO NORTE', 'PA': 'REGIÃO NORTE', 'RO': 'REGIÃO NORTE', 'RR': 'REGIÃO NORTE', 'TO': 'REGIÃO NORTE',
    'AL': 'REGIÃO NORDESTE', 'BA': 'REGIÃO NORDESTE', 'CE': 'REGIÃO NORDESTE', 'MA': 'REGIÃO NORDESTE', 'PB': 'REGIÃO NORDESTE', 'PE': 'REGIÃO NORDESTE', 'PI': 'REGIÃO NORDESTE', 'RN': 'REGIÃO NORDESTE', 'SE': 'REGIÃO NORDESTE',
    'DF': 'REGIÃO CENTRO-OESTE', 'GO': 'REGIÃO CENTRO-OESTE', 'MT': 'REGIÃO CENTRO-OESTE', 'MS': 'REGIÃO CENTRO-OESTE',
    'ES': 'REGIÃO SUDESTE', 'MG': 'REGIÃO SUDESTE', 'RJ': 'REGIÃO SUDESTE', 'SP': 'REGIÃO SUDESTE',
    'PR': 'REGIÃO SUL', 'RS': 'REGIÃO SUL', 'SC': 'REGIÃO SUL'
}

# Preenchendo os valores nulos em uma ordem lógica
# garantindo que as colunas 'estado' e 'uf_sigla' preencham uma à outra.
df_locais_completo['uf_sigla'] = df_locais_completo['uf_sigla'].fillna(df_locais_completo['estado'].str.upper().map(estado_para_uf))
df_locais_completo['estado'] = df_locais_completo['estado'].fillna(df_locais_completo['uf_sigla'].str.upper().map(uf_para_estado))
df_locais_completo['regiao'] = df_locais_completo['regiao'].fillna(df_locais_completo['uf_sigla'].str.upper().map(uf_para_regiao))
print("Enriquecimento concluído.")

# Agrupando por estado e município para criar uma linha única para cada localidade

print("Consolidando localidades únicas...")

# Usando .first() para pegar o primeiro valor não-nulo de cada coluna para aquele grupo
# Isso efetivamente "mescla" as informações das diferentes fontes
df_localizacao_final = df_locais_completo.groupby(['estado', 'uf_sigla', 'municipio']).first().reset_index()

# Garantindo que todas as colunas da dimensão final existam
colunas_dimensao = [
    'municipio', 'estado', 'uf_sigla', 'regiao', 
    'bairro', 'cep', 'nome_rua', 'numero_rua'
]
for col in colunas_dimensao:
    if col not in df_localizacao_final.columns:
        df_localizacao_final[col] = None
        
df_localizacao_final = df_localizacao_final[colunas_dimensao]
print("Limpeza e consolidação final concluídas.")

Processando arquivo: producao-petroleo-m3-1997-2025.csv...
Processando arquivo: processamento-petroleo-m3-1990-2025.csv...
Processando arquivo: ca-2004-01.csv...
Processando arquivo: ca-2014-01.csv...
Processando arquivo: ca-2019-02.csv...
Processando arquivo: ca-2020-02.csv...
Processando arquivo: ca-2022-02.csv...
Processando arquivo: ca-2025-01.csv...

Todos os arquivos foram lidos e unificados.
Iniciando enriquecimento de dados de localização...
Enriquecimento concluído.
Consolidando localidades únicas...
Limpeza e consolidação final concluídas.


### Caregando os Dados para o Banco de Dados
Com nosso DataFrame `df_localizacao_final` limpo e pronto, o próximo passo é carregá-lo para a tabela `dim_localizacao` que criamos no MySQL.

In [12]:
# Carregando os dados para a tabela dim_localizacao
print("Iniciando a carga para a tabela 'dim_localizacao'...")
try:
    df_localizacao_final.to_sql('dim_localizacao', con=engine, if_exists='append', index=False)
    print(f"{len(df_localizacao_final)} registros únicos de localização carregados com sucesso!")
except Exception as e:
    print(f"Ocorreu um erro durante a carga: {e}")

Iniciando a carga para a tabela 'dim_localizacao'...
647 registros únicos de localização carregados com sucesso!


### Verificação
Vamos visualizar o início e o fim do DataFrame final que foi carregado

In [13]:
print(f"Total de {len(df_localizacao_final)} localidades únicas foram encontradas e carregadas.")
display(df_localizacao_final.head())
display(df_localizacao_final.tail())
display(df_localizacao_final.info())

Total de 647 localidades únicas foram encontradas e carregadas.


Unnamed: 0,municipio,estado,uf_sigla,regiao,bairro,cep,nome_rua,numero_rua
0,ACRELANDIA,ACRE,AC,REGIÃO NORTE,CENTRO,69945-000,AVENIDA ADENILSON ROGERIO DE OLIVEIRA,72
1,CRUZEIRO DO SUL,ACRE,AC,REGIÃO NORTE,CENTRO,69980-000,MARGEM ESQUERDA DO RIO JURUA,S/N
2,RIO BRANCO,ACRE,AC,REGIÃO NORTE,AVIARIO,69909-720,AVENIDA NACOES UNIDAS,23
3,SENA MADUREIRA,ACRE,AC,REGIÃO NORTE,TRIANGULO,69940-000,AVENIDA BRASIL,2154
4,SENADOR GUIOMARD,ACRE,AC,REGIÃO NORTE,CENTRO,69925-000,AVENIDA CASTELO BRANCO,2709


Unnamed: 0,municipio,estado,uf_sigla,regiao,bairro,cep,nome_rua,numero_rua
642,PALMAS,TOCANTINS,TO,REGIÃO NORTE,PLANO DIRETOR SUL,77020-126,"QUADRA 110 SUL (ARSE 14), AV. NS 08, H.M.",110
643,PARAISO DO TOCANTINS,TOCANTINS,TO,REGIÃO NORTE,CENTRO,77600-000,AVENIDA TRANSBRASILIANA,961
644,PONTE ALTA DO TOCANTINS,TOCANTINS,TO,REGIÃO NORTE,TAQUARUCU,77590-000,AVENIDA TIRADENTES QUADRA V,SN
645,PORTO NACIONAL,TOCANTINS,TO,REGIÃO NORTE,JARDIM BRASÍLIA,77500-000,AVENIDA ENGENHEIRO LUIZ CRUZ,S/N
646,XAMBIOA,TOCANTINS,TO,REGIÃO NORTE,CENTRO,77880-000,RUA ANTONIO MARANHAO,878


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 647 entries, 0 to 646
Data columns (total 8 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   municipio   647 non-null    object
 1   estado      647 non-null    object
 2   uf_sigla    647 non-null    object
 3   regiao      647 non-null    object
 4   bairro      647 non-null    object
 5   cep         647 non-null    object
 6   nome_rua    647 non-null    object
 7   numero_rua  647 non-null    object
dtypes: object(8)
memory usage: 40.6+ KB


None

## `dim_revendedor`
Neste etapa, vamos popular a tabela `dim_revendedor` extraindo, limpando e unificando dados de localização de múltiplos arquivos de origem.

In [14]:

arquivos_de_precos = [
    'ca-2004-01.csv', 
    'ca-2014-01.csv', 
    'ca-2019-02.csv',
    'ca-2020-02.csv', 
    'ca-2022-02.csv', 
    'ca-2025-01.csv'
]

lista_dfs_revendedores = []


mapa_colunas_revendedor = {
    'CNPJ da Revenda': 'cnpj',
    'Revenda': 'revenda_nome',
    'Bandeira': 'bandeira'
}

for nome_arquivo in arquivos_de_precos:
    caminho_completo = os.path.join(PASTA_DADOS, nome_arquivo)
    print(f"Processando arquivo: {nome_arquivo}...")
    
    
    colunas_originais_no_arquivo = pd.read_csv(
        caminho_completo, 
        sep=';', 
        encoding='utf-8-sig', 
        nrows=0
        ).columns
    
    colunas_para_ler = [col for col in colunas_originais_no_arquivo if col in mapa_colunas_revendedor.keys()]
    
    if not colunas_para_ler:
        continue
    
    
    chunk_iterator = pd.read_csv(
        caminho_completo, sep=';', encoding='utf-8-sig', 
        usecols=colunas_para_ler, chunksize=50000, low_memory=False
    )
    
    for chunk in chunk_iterator:
        # Renomeando usando o mapa completo e adiciona à lista
        chunk.rename(columns=mapa_colunas_revendedor, inplace=True)
        lista_dfs_revendedores.append(chunk)


df_revendedores_bruto = pd.concat(lista_dfs_revendedores, ignore_index=True)
df_revendedores_bruto.dropna(subset=['cnpj'], inplace=True)
df_revendedor_final = df_revendedores_bruto.drop_duplicates(subset=['cnpj']).copy()

colunas_finais_revendedor = ['cnpj', 'revenda_nome', 'bandeira']

# Reordenamos o DataFrame para garantir a consistência
df_revendedor_final = df_revendedor_final[colunas_finais_revendedor]

print("\nExtração e limpeza final dos revendedores concluídas.")

Processando arquivo: ca-2004-01.csv...
Processando arquivo: ca-2014-01.csv...
Processando arquivo: ca-2019-02.csv...
Processando arquivo: ca-2020-02.csv...
Processando arquivo: ca-2022-02.csv...
Processando arquivo: ca-2025-01.csv...

Extração e limpeza final dos revendedores concluídas.


### Caregando os Dados para o Banco de Dados
Com nosso DataFrame `df_revendedor_final` limpo e pronto, o próximo passo é carregá-lo para a tabela `dim_revendedor` que criamos no MySQL.

In [15]:
# Carregando os dados para a tabela dim_revendedor
try:
    df_revendedor_final.to_sql('dim_revendedor', con=engine, if_exists='append', index=False)
    print(f"{len(df_revendedor_final)} registros únicos de revendedores carregados com sucesso!")
except Exception as e:
    print(f"Ocorreu um erro durante a carga: {e}")

30767 registros únicos de revendedores carregados com sucesso!


### Verificação
Vamos visualizar o início e o fim do DataFrame final que foi carregado

In [16]:
print(f"Total de {len(df_revendedor_final)} revendedores únicos foram encontrados e carregados.")
display(df_revendedor_final.head())
display(df_revendedor_final.tail())
display(df_revendedor_final.info())

Total de 30767 revendedores únicos foram encontrados e carregados.


Unnamed: 0,cnpj,revenda_nome,bandeira
0,49.051.667/0001-02,AUTO POSTO SAKAMOTO LTDA,PETROBRAS DISTRIBUIDORA S.A.
3,00.003.188/0001-21,COMPETRO COMERCIO E DISTRIBUICAO DE DERIVADOS ...,BRANCA
6,00.603.738/0001-43,GASOL COMBUSTÍVEIS AUTOMOTIVOS LTDA.,PETROBRAS DISTRIBUIDORA S.A.
9,34.274.233/0015-08,PETROBRAS DISTRIBUIDORA S.A.,PETROBRAS DISTRIBUIDORA S.A.
12,34.274.233/0033-81,PETROBRAS DISTRIBUIDORA S.A.,PETROBRAS DISTRIBUIDORA S.A.


Unnamed: 0,cnpj,revenda_nome,bandeira
2272812,22.915.775/0004-72,POSTO MAR DAS PEDRAS LTDA,RAIZEN MIME
2282995,33.211.167/0001-50,ROMEIROS 1 COMERCIO DE PETROLEO LTDA,VIBRA
2283127,33.210.833/0001-36,AUTO POSTO ROMEIROS 5 LTDA,VIBRA
2283481,34.481.410/0001-13,W & L COMERCIO DE COMBUSTIVEL LTDA,IPIRANGA
2283492,45.855.959/0001-47,AUTO POSTO BRAZ LTDA,IPIRANGA


<class 'pandas.core.frame.DataFrame'>
Index: 30767 entries, 0 to 2283492
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   cnpj          30767 non-null  object
 1   revenda_nome  30767 non-null  object
 2   bandeira      30767 non-null  object
dtypes: object(3)
memory usage: 961.5+ KB


None

## `dim_produto`
Neste etapa, vamos popular a tabela `dim_produto` extraindo, limpando e unificando dados de localização de múltiplos arquivos de origem.