In [1]:
import pandas as pd
import os
from pathlib import Path
from unidecode import unidecode
import fiona
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import toolkitETL
import importlib
importlib.reload(toolkitETL)


<module 'toolkitETL' from 'c:\\Users\\pedro\\OneDrive\\Área de Trabalho\\Pedro\\Portifolio\\ETL-ML-Dengue\\src\\etl\\toolkitETL.py'>

In [2]:
## Estabelecendo diretórios base
diretorio_src_etl = os.getcwd()
diretorio_src = Path(diretorio_src_etl).parent.absolute()
diretorio_base = Path(diretorio_src).parent.absolute()
diretorio_base = "D:\\Mestrado_pedro"
diretorio_data = os.path.join(diretorio_base, 'data')

# Bronze
diretorio_bronze = os.path.join(diretorio_data, 'bronze')
diretorio_inmet = os.path.join(diretorio_bronze, 'inmet')
diretorio_inmet_csv = os.path.join(diretorio_inmet, 'csv')

# Silver
diretorio_silver = os.path.join(diretorio_data, 'silver')
diretorio_inmet_silver = os.path.join(diretorio_silver, 'inmet')
diretorio_inmet_tsv_silver = os.path.join(diretorio_inmet_silver, 'tsv')


## Criação de pastas para armazenar os arquivos
os.makedirs(os.path.join(diretorio_data), exist_ok=True)

# Bronze
os.makedirs(os.path.join(diretorio_bronze), exist_ok=True)
os.makedirs(os.path.join(diretorio_inmet), exist_ok=True)
os.makedirs(os.path.join(diretorio_inmet_csv), exist_ok=True)

# Silver
os.makedirs(os.path.join(diretorio_silver), exist_ok=True)
os.makedirs(os.path.join(diretorio_inmet_silver), exist_ok=True)
os.makedirs(os.path.join(diretorio_inmet_tsv_silver), exist_ok=True)



In [9]:
# Carregue o GeoJSON
geojson = fiona.open(os.path.join(diretorio_data, 'geojs-100-mun.json'))
municipios = gpd.GeoDataFrame(geojson)


In [10]:
arquivos_csv = [arquivo for arquivo in os.listdir(diretorio_inmet_csv) if arquivo.endswith("csv")]
# arquivos_csv = arquivos_csv[-40:]
arquivos_tsv = [arquivo.rstrip("csv") + "tsv" for arquivo in arquivos_csv]
len(arquivos_tsv)

6256

In [11]:
def converter_valor(valor):
    try:
        return float(valor.replace(',', '.'))
    except:
        return valor

def find_municipality(latitude, longitude):

    # Crie um Point a partir da latitude e longitude
    ponto = Point(longitude, latitude)

    # Encontre o município que contém o ponto
    municipio_contido = municipios[municipios.geometry.contains(ponto)].iloc[0]

    # Obtenha o código IBGE
    codigo_ibge = municipio_contido['properties']['id']

    # Extraindo o nome do município
    return codigo_ibge



column_map = {
    "Data": "Data",
    "PRECIPITAÇÃO TOTAL, HORÁRIO (mm)": "Precipitacao_Total_mm",
    "PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)": "Pressao_Atm_Nivel_Estacao_mB",
    "PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)": "Pressao_Atm_Max_mB",
    "PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)": "Pressao_Atm_Min_mB",
    "RADIACAO GLOBAL (Kj/m²)": "Radiacao_Global_Kj_m2",
    "TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)": "Temp_Bulbo_Seco_C",
    "TEMPERATURA DO PONTO DE ORVALHO (°C)": "Temp_Ponto_Orvalho_C",
    "TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)": "Temp_Max_C",
    "TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)": "Temp_Min_C",
    "TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)": "Orvalho_Max_C",
    "TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)": "Orvalho_Min_C",
    "UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)": "Umidade_Rel_Max_Perc",
    "UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)": "Umidade_Rel_Min_Perc",
    "UMIDADE RELATIVA DO AR, HORARIA (%)": "Umidade_Rel_Horaria_Perc",
    "VENTO, DIREÇÃO HORARIA (gr) (° (gr))": "Vento_Direcao_gr",
    "VENTO, RAJADA MAXIMA (m/s)": "Vento_Rajada_Max_ms",
    "VENTO, VELOCIDADE HORARIA (m/s)": "Vento_Velocidade_Horaria_ms"
}

column_map = {chave.lower(): valor.lower() for chave, valor in column_map.items()}

colunas_corrigir = {coluna: converter_valor for coluna in column_map.keys()}

columns_to_convert = ["PRECIPITAÇÃO TOTAL, HORÁRIO (mm)", "PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)",
                      "PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)", "PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)", 
                      "RADIACAO GLOBAL (Kj/m²)", "TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)", 
                      "TEMPERATURA DO PONTO DE ORVALHO (°C)", "TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)", 
                      "TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)", "TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)", 
                      "TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)", "UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)", 
                      "UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)", "UMIDADE RELATIVA DO AR, HORARIA (%)", 
                      "VENTO, DIREÇÃO HORARIA (gr) (° (gr))", "VENTO, RAJADA MAXIMA (m/s)", "VENTO, VELOCIDADE HORARIA (m/s)"
]

columns_to_convert = [coluna.lower() for coluna in columns_to_convert]

dict_cod_sigla_uf = {
    11: "RO", 12: "AC", 13: "AM", 14: "RR",
    15: "PA", 16: "AP", 17: "TO", 21: "MA",
    22: "PI", 23: "CE", 24: "RN", 25: "PB",
    26: "PE", 27: "AL", 28: "SE", 29: "BA",
    31: "MG", 32: "ES", 33: "RJ", 35: "SP",
    41: "PR", 42: "SC", 43: "RS", 50: "MS",
    51: "MT", 52: "GO", 53: "DF" }

In [13]:
import os
import pandas as pd
import numpy as np

# Supondo que as variáveis 'arquivos_csv', 'diretorio_inmet_csv', 'colunas_corrigir', 'column_map', 'columns_to_convert', 'diretorio_inmet_tsv_silver' e a função 'find_municipality' já estejam definidas.

lista_dfs = []

for arquivo_csv in arquivos_csv:

    diretorio_arquivo_csv = os.path.join(diretorio_inmet_csv, arquivo_csv)

    # Ler as 8 primeiras linhas e transformá-las em um dicionário
    with open(diretorio_arquivo_csv, 'r', encoding='latin1') as f:
        linhas = [next(f).strip() for _ in range(8)]

    # Verificar se as linhas contêm informações válidas
    info_dict = {}
    for linha in linhas:
        partes = linha.split(":;")
        if len(partes) == 2:
            info_dict[partes[0].strip()] = partes[1].strip()
    
    # Inicializar as variáveis de latitude e longitude
    latitude = None
    longitude = None

    for chave, valor in info_dict.items():
        if chave.startswith("LAT"):
            latitude = valor.replace(",", ".")
        if chave.startswith("LON"):
            longitude = valor.replace(",", ".")

    # Verificar se latitude e longitude foram encontradas
    if latitude is not None and longitude is not None:
        try:
            municipio = find_municipality(latitude, longitude)
        except IndexError:
            print(f"Municipality not found for coordinates: {latitude}, {longitude}")
            continue
    else:
        print(f"Latitude or longitude not found in the file: {arquivo_csv}")
        continue

    # Ler o restante do arquivo a partir da linha 9 com conversão dos valores
    df = pd.read_csv(diretorio_arquivo_csv, skiprows=8, sep=';', 
                     encoding='latin1', converters=colunas_corrigir)
    
    if "DATA (YYYY-MM-DD)" in df.columns:
        df['Data'] = df["DATA (YYYY-MM-DD)"]
    
    df.columns = df.columns.str.lower()

    colunas_importantes = [coluna.lower() for coluna in column_map.keys()]

    df = df[colunas_importantes]
    for column in columns_to_convert:
        df[column] = pd.to_numeric(df[column], errors='coerce').astype('Float64')
    
    df['data'] = pd.to_datetime(df['data'])
    df['nu_mes'] = df['data'].dt.month
    df['nu_ano'] = df['data'].dt.year

    df = df.rename(columns=column_map)

    df.replace(-9999, np.nan, inplace=True)
    
    df['uf'] = info_dict.get('UF', 'Unknown')
    df['id_municip'] = int(municipio)

    tmp_df = df.drop(columns=['data'], axis=1)

    tmp_df = tmp_df.groupby(['id_municip', 'uf', 'nu_mes', 'nu_ano'], as_index=False).mean()

    df_reset = tmp_df.reset_index(drop=True)

    lista_dfs.append(df_reset)

df_final = pd.concat(lista_dfs, ignore_index=True)
df_final.to_csv(os.path.join(diretorio_inmet_tsv_silver, 'inmet_agrupado.tsv'), sep='\t', index=False)


Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -17.45472222, -52.60111111
Municipality not found for coordinates: -13.01666666, -38.51666666
Municipality not found for coordinates: -13.01666666, -38.51666666
Municipality not found for coordinates: -13.01666666, -38.5166

In [15]:
df_final.shape

(71351, 21)

In [16]:
df_ibge_municipios = pd.read_csv(os.path.join(diretorio_data, "RELATORIO_DTB_BRASIL_MUNICIPIO.tsv"), sep='\t', encoding='latin1')
df_ibge_municipios_filtrado = df_ibge_municipios[['UF','Nome_UF', 'Código Município Completo', 'Nome_Município']].copy()
df_ibge_municipios_filtrado['municipio'] = df_ibge_municipios_filtrado['Nome_Município'].apply(lambda x: unidecode(x.lower()))
df_ibge_municipios_filtrado['uf'] = df_ibge_municipios_filtrado['UF'].apply(lambda x: dict_cod_sigla_uf[x])
df_ibge_municipios_filtrado['id_municip'] = df_ibge_municipios_filtrado['Código Município Completo'].apply(lambda x: str(x)[:-1])

# Criação de uma sequência de datas mensais de janeiro de 2010 a dezembro de 2021
datas = pd.date_range(start='2010-01-01', end='2021-12-31', freq='M')

# Criar um dataframe com todas as combinações de município e data
df_combinado = pd.DataFrame([(codigo, uf, data) for codigo, uf in zip(df_ibge_municipios_filtrado['Código Município Completo'], df_ibge_municipios_filtrado['uf'])
                              for data in datas], columns=['id_municip', 'uf', 'data'])
df_combinado['nu_mes'] = df_combinado['data'].dt.month
df_combinado['nu_ano'] = df_combinado['data'].dt.year
df_combinado = df_combinado.drop("data", axis=1)

# df_combinado.to_csv(os.path.join(diretorio_data, "municipio_mes_ano.tsv"), sep='\t', index=False)


In [43]:
df_combinado.dtypes

id_municip     int64
uf            object
nu_mes         int32
nu_ano         int32
dtype: object

In [17]:
df_merged = pd.merge(df_final, df_combinado, on=['id_municip','uf', 'nu_mes', 'nu_ano'], how='right')

df_merged.columns = df_merged.columns.str.lower()

df_merged['id_municip'] = df_merged['id_municip'].apply(lambda x: str(x)[:-1])

df_merged.to_csv(os.path.join(diretorio_inmet_tsv_silver, "municipio_mes_ano_precipitacao.tsv"), sep='\t', index=False)

In [4]:
df_merged = pd.read_csv(os.path.join(diretorio_inmet_tsv_silver, "municipio_mes_ano_precipitacao.tsv"), sep='\t')
df_merged.head()

Unnamed: 0,id_municip,uf,nu_mes,nu_ano,precipitacao_total_mm,pressao_atm_nivel_estacao_mb,pressao_atm_max_mb,pressao_atm_min_mb,radiacao_global_kj_m2,temp_bulbo_seco_c,...,temp_max_c,temp_min_c,orvalho_max_c,orvalho_min_c,umidade_rel_max_perc,umidade_rel_min_perc,umidade_rel_horaria_perc,vento_direcao_gr,vento_rajada_max_ms,vento_velocidade_horaria_ms
0,110001,RO,1,2010,,,,,,,...,,,,,,,,,,
1,110001,RO,2,2010,,,,,,,...,,,,,,,,,,
2,110001,RO,3,2010,,,,,,,...,,,,,,,,,,
3,110001,RO,4,2010,,,,,,,...,,,,,,,,,,
4,110001,RO,5,2010,,,,,,,...,,,,,,,,,,


In [None]:
def fillna_with_group_median(df, group_cols):
    # Agrupar pelas colunas especificadas
    grouped = df.groupby(group_cols)
    
    # Aplicar a mediana aos grupos e preencher os NaN
    df_filled = df.copy()
    for col in df.columns:
        if (col not in group_cols) and (col != "id_municip"):
            df_filled[col] = grouped[col].transform(lambda x: x.fillna(x.median()))
    
    return df_filled

# Colunas para o agrupamento
group_cols = ['uf', 'nu_mes', 'nu_ano']

# Preenchendo os NaN com a mediana do agrupamento
df_filled = fillna_with_group_median(df_merged, group_cols)

df_filled["inmet_id"] = df_filled.index + 1

# print(df_filled)
df_filled.to_csv(os.path.join(diretorio_inmet_tsv_silver, "municipio_mes_ano_precipitacao_fillna.tsv"), sep='\t', index=False)

In [12]:
diretorio_dotenv = os.path.join(diretorio_base, ".env")

cursor, conn = toolkitETL.conectar_banco(diretorio_dotenv)

caminho_arquivo = os.path.join(diretorio_inmet_tsv_silver, "municipio_mes_ano_precipitacao_fillna.tsv")
toolkitETL.inserir_no_banco(cursor, conn, caminho_arquivo, 'inmet', delimiter='\t',
                                    diretorio_dotenv=diretorio_dotenv)
