In [None]:
# -*- coding: utf-8 -*-
"""
Tasks para dataset do CENIPA (Centro de Investigação e Prevenção de Acidentes Aeronáuticos).
# https://www.gov.br/cenipa/pt-br
# API: https://dados.gov.br/swagger-ui/index.html
"""

import os
import json
import logging
import requests
import pandas as pd
from pathlib import Path

# from prefect import task

# Constants
INPUT_DIR_PATH = os.path.join(Path(os.getcwd()).parent,"input")

DATASET_ID = "623d13d9-3465-4be0-82e7-c13b78b08282"
API_URL = "https://dados.gov.br/dados/api/publico"
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJkNDRMMXkzcGxRTlFIX1pvU2VTb28yU0t3TjFzUkNBTS13LTRYZ2Ywc0d3dFVveW1ZNzJMQXNUakZtNlhFbWZhMm8taWozYWJicGlxMnN3eiIsImlhdCI6MTc0OTkzNzUwOH0.__f5sSiipPmb_gwhGIA06UgCmYbR7UWZ2Li8LH-KI_E"


def download_table_to_csv(table_name, table_url, table_path=INPUT_DIR_PATH):
    """
        Downloads a table from the CENIPA dataset and saves it as a CSV file. 
    """
    try:
        response = requests.get(table_url)
        response.raise_for_status()

        if os.path.exists(table_path) is False:
            os.makedirs(table_path)

        file_path = os.path.join(table_path, f"{table_name}.csv")
        with open(file_path, "wb") as f:
            f.write(response.content)
        logging.info(f"Downloaded {table_name} to {file_path}")
    except Exception as e:
        raise
    
def correct_csv_encoding():
    """
        Corrects the encoding of CSV files in the input directory from 'latin1' to 'utf-8'.
    """
    logging.info("Correcting CSV file encodings from 'latin1' to 'utf-8'...")
    for  file_name in os.listdir(INPUT_DIR_PATH):
        if file_name.endswith(".csv"):
            file_path = os.path.join(INPUT_DIR_PATH, file_name)
            pd.read_csv(file_path, sep=";", encoding="latin1")\
            .to_csv(file_path, sep=";", encoding="utf-8", index=False)


def show_uniques(df, columns):
    """
        Displays unique values for specified columns in the dataset.
    """
    print('\n')
    logging.info("Showing unique values for specified columns...")
    for col in columns:
        if not col.startswith('id_'):
            unique_values = df[col].unique()
            print(f"Unique values in {col}: {unique_values}")
    print('\n')

In [3]:
session = requests.Session()

HEADERS = {
    "accept":"application/json",
    "chave-api-dados-abertos":f"{API_KEY}"
}

response = requests.get(
    headers = HEADERS,
    url = f"{API_URL}/conjuntos-dados/{DATASET_ID}")

with open(os.path.join(INPUT_DIR_PATH,"cenipa_metadata.json"), "w") as f:
    json.dump(response.json(), f, indent=4)

In [4]:
metadata = response.json()
            
for resource in metadata["recursos"]:
    table_id = resource["id"]
    table_title = resource["titulo"]
    table_url = resource["link"]
    table_name = table_url.split("/")[-1].replace(".csv", "")
    table_format = resource["formato"]
    
    if table_format == "CSV":
        try:
            logging.info(f"Downloading table: {table_title} (ID: {table_id})")
            download_table_to_csv(table_name, table_url)
        except Exception as e:
            logging.error(f"Failed to download {table_title}: {e}")
correct_csv_encoding()

In [5]:
RENAME_MAPPING = {
        'codigo_ocorrencia': 'id_ocorrencia',
        'ocorrencia_classificacao':'tipo_ocorrencia',
        'ocorrencia_latitude':'latitude_ocorrencia',
        'ocorrencia_longitude': 'longitude_ocorrencia', 
        'ocorrencia_cidade':'nome_municipio',
        'ocorrencia_uf':'sigla_uf',
        'ocorrencia_pais':'nome_pais',
        'ocorrencia_aerodromo':'sigla_aerodromo',
        'ocorrencia_dia':'data_ocorrencia', 
        'ocorrencia_hora':'hora_ocorrencia',
        'investigacao_aeronave_liberada':'indicador_investigacao_liberada',
        'investigacao_status':'satus_investigacao',
        'divulgacao_relatorio_numero':'id_relatorio',
        'divulgacao_relatorio_publicado':'indicador_relatorio_publicado', 
        'divulgacao_dia_publicacao':'data_publicacao_relatorio',
        'total_recomendacoes':'quantidade_recomendacoes', 
        'total_aeronaves_envolvidas':'quantidade_aeronaves_envolvidas',
        'ocorrencia_saida_pista':'indicador_saida_pista'
    }

STRING_COLUMNS = [ 
    'tipo_ocorrencia', 
    'nome_municipio', 
    'sigla_uf', 
    'nome_pais',
    'sigla_aerodromo', 
    'satus_investigacao', 
    'id_relatorio'
]

DATE_COLUMNS = [
    'data_ocorrencia', 
    'data_publicacao_relatorio']

TIMESTAMP_COLUMNS = [
    'hora_ocorrencia'
]

BOOL_COLUMNS = [ 
    'indicador_investigacao_liberada', 
    'indicador_relatorio_publicado',
    'indicador_saida_pista'
]

FLOAT_COLUMNS = [
    'latitude_ocorrencia',
    'longitude_ocorrencia'
]

INT_COLUMNS = [
    'id_ocorrencia',
    'quantidade_recomendacoes',
    'quantidade_aeronaves_envolvidas'
]

In [6]:
df_ocorrencias = pd.read_csv(
    os.path.join(INPUT_DIR_PATH, "ocorrencia.csv"),
    sep=";",
    encoding="utf-8"
)

logging.info(f"Checking code columns for inconsistencies...")
columns_code = [
        'codigo_ocorrencia', 
        'codigo_ocorrencia1',
        'codigo_ocorrencia2',
        'codigo_ocorrencia3',
        'codigo_ocorrencia4']

logging.info(f"Any row with one or more nulls: {df_ocorrencias[df_ocorrencias[columns_code].isnull().any(axis=1)]}")
logging.info(f"Any null row: {df_ocorrencias[df_ocorrencias[columns_code].isnull().all(axis=1)]}")

df_ocorrencias[(df_ocorrencias['codigo_ocorrencia'] == df_ocorrencias['codigo_ocorrencia1'])&\
               (df_ocorrencias['codigo_ocorrencia'] == df_ocorrencias['codigo_ocorrencia2'])&\
               (df_ocorrencias['codigo_ocorrencia'] == df_ocorrencias['codigo_ocorrencia3'])&\
               (df_ocorrencias['codigo_ocorrencia'] == df_ocorrencias['codigo_ocorrencia4'])]
columns_code.remove('codigo_ocorrencia')

# Remove columns with codes that are not unique
df_ocorrencias_modif = df_ocorrencias.drop(columns=columns_code)\
    .rename(columns=RENAME_MAPPING).copy()

In [None]:
## Formatting float columns
for col in FLOAT_COLUMNS:
    
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(str)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.strip()   
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(r'\s+|°', '', regex=True) 
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(r'\*|nan', '0', regex=True)
    
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(',','.')
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(float)

In [11]:
## Inconsistencies
# Check for missing values in the columns
for col in df_ocorrencias_modif.columns:
    if df_ocorrencias_modif[col].isnull().any():
        logging.warning(f"Column '{col}' has missing values.")
# Check for unique values in the 'id_ocorrencia' column
assert df_ocorrencias_modif['id_ocorrencia'].is_unique, "The 'id_ocorrencia' column should have unique values."
# Check for unique values in the 'id_relatorio' column
if df_ocorrencias_modif['id_relatorio'].is_unique:
    pass
else:
    logging.warning("The 'id_relatorio' has duplicate values.")

# Check for duplicate rows
if df_ocorrencias_modif.duplicated().any():
    logging.warning("There are duplicate rows in the DataFrame.")
    print(df_ocorrencias_modif[df_ocorrencias_modif.duplicated()])

## Formatting String Columns
# Unique values for each column
show_uniques(df_ocorrencias_modif, STRING_COLUMNS)
        
# Convert string columns to lowercase with first letter of each word capitalized (except connectors) 
for col in STRING_COLUMNS:
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(str)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.strip()
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(r'\*|nan', '', regex=True)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(r'\s+', ' ', regex=True)
    
    if not col.startswith('id'):
        df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.lower()
    
    if col.startswith('nome'):
        df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.title()
        df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.replace(
            r'\b(De|Da|Do|Das|Dos|E)\b', 
            lambda x: x.group(0).lower(), 
            regex=True)
    
    if col.startswith('sigla'):
        df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.upper()

    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].fillna('')

show_uniques(df_ocorrencias_modif, STRING_COLUMNS)

## Formatting Date Columns
# Convert date columns to datetime format
for col in DATE_COLUMNS:
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(str)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.strip()
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].fillna('')

    df_ocorrencias_modif[col] = pd.to_datetime(df_ocorrencias_modif[col], errors='coerce')
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].dt.strftime('%Y-%m-%d')

## Formatting Timestamp Columns
for col in TIMESTAMP_COLUMNS:
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(str)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.strip()
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].fillna('')

    df_ocorrencias_modif[col] = pd.to_datetime(df_ocorrencias_modif[col], errors='coerce')
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].dt.strftime('%H:%M:%S')

## Formatting Boolean Columns
show_uniques(df_ocorrencias_modif, BOOL_COLUMNS)
# Convert boolean columns to boolean type
for col in BOOL_COLUMNS:
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].astype(str)
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.strip()
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].fillna('')

    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].str.lower()
    df_ocorrencias_modif[col] = df_ocorrencias_modif[col].replace({'sim': True, 'não': False, '': None})

show_uniques(df_ocorrencias_modif, BOOL_COLUMNS)





Unique values in tipo_ocorrencia: ['INCIDENTE' 'ACIDENTE' 'INCIDENTE GRAVE']
Unique values in nome_municipio: ['JUAZEIRO DO NORTE' 'UBERLÂNDIA' 'GUARULHOS' ... 'FERROS' 'ARCOVERDE'
 'BOCAIÚVA']
Unique values in sigla_uf: ['CE' 'MG' 'SP' 'RS' 'DF' 'AL' 'GO' 'RJ' 'PA' 'MA' 'MT' 'AM' 'PB' 'RR'
 'BA' 'PR' 'PE' 'RN' 'TO' 'SE' 'MS' 'RO' '***' 'SC' 'ES' 'AP' 'AC' 'PI']
Unique values in nome_pais: ['BRASIL']
Unique values in sigla_aerodromo: ['FAER' 'SBUL' 'SBGR' 'SBSM' 'SBSR' 'SBBR' 'SBMO' 'SBGO' 'SBJR' 'NCAD'
 'SBSL' 'SBGL' 'SBEG' 'SBRP' 'SBSV' 'SBRJ' 'SBME' 'SWPD' 'SBPJ' 'SBFZ'
 'SNAO' 'SBAR' 'SBSP' 'SBNV' 'SIRQ' 'SIFM' 'SJRT' 'ZZZZ' 'SI8Y' 'SBPV'
 'SBCT' 'SBRF' 'SBKP' 'SD7S' 'SBIP' 'SBMI' 'SW2A' 'SNKB' 'SBPA' 'SBPS'
 'SBFL' 'SBJV' 'SBRD' 'SBCB' 'SBNF' 'SBVT' 'SDBK' 'SIPJ' 'SBCH' 'SIVU'
 'SBCY' 'SBTF' 'SBFI' 'SD6X' 'SDVG' 'SWGN' 'SD2D' 'SBCF' 'SBPG' 'SWPF'
 'SBNM' 'SBMT' 'SBML' 'SBSI' 'SBBE' 'SBSG' 'SBPF' 'SWFN' 'SNJL' 'SBFS'
 'SJGU' 'SBBI' 'SIVQ' 'SBMQ' 'SNPA' 'SNRU' 'SBCP' 'SWUQ' 'SDYT'

  df_ocorrencias_modif[col] = pd.to_datetime(df_ocorrencias_modif[col], errors='coerce')
  df_ocorrencias_modif[col] = df_ocorrencias_modif[col].replace({'sim': True, 'não': False, '': None})
  df_ocorrencias_modif[col] = df_ocorrencias_modif[col].replace({'sim': True, 'não': False, '': None})
  df_ocorrencias_modif[col] = df_ocorrencias_modif[col].replace({'sim': True, 'não': False, '': None})


In [None]:
import basedosdados as bd
PROJECT_ID = "fiery-cairn-399314"

In [6]:
datasets = [
    "br_geobr_mapas",
    "br_bd_diretorios_brasil"
]

PROJECT = "basedosdados"

for dataset in datasets:
    print(f"Dataset: {dataset}")
    df = bd.read_sql(
        f"""SELECT * 
        FROM `{PROJECT}.{dataset}.INFORMATION_SCHEMA.TABLES`""", 
                billing_project_id=PROJECT_ID)
    print(df['table_name'].unique())

Dataset: br_geobr_mapas
Downloading: 100%|[32m██████████[0m|
['regiao_intermediaria' 'setor_censitario_2010' 'pais' 'terra_indigena'
 'concentracao_urbana' 'mesorregiao' 'estabelecimentos_saude'
 'amazonia_legal' 'area_risco_desastre' 'semiarido' 'sede_municipal'
 'pegada_urbana' 'area_minima_comparavel_2010' 'escola' 'saude'
 'regiao_imediata' 'bioma' 'municipio' 'microrregiao' 'regiao'
 'regiao_metropolitana_2017' 'unidade_conservacao' 'uf'
 'limite_vizinhanca' 'arranjo_populacional']
Dataset: br_bd_diretorios_brasil
Downloading: 100%|[32m██████████[0m|
['setor_censitario_2010' 'subatividade_ibge' 'cid_10' 'cbo_2002'
 'cbo_1994' 'cnae_2' 'escola' 'cnae_1' 'distrito_2000' 'etnia_indigena'
 'instituicao_ensino_superior' 'municipio' 'curso_superior' 'regiao'
 'cid_9' 'distrito_1991' 'area_conhecimento' 'natureza_juridica' 'empresa'
 'uf' 'distrito_2010' 'cep' 'setor_censitario_2022']


In [64]:
dataset = "br_bd_diretorios_brasil"
table = "uf"
# df = bd.read_sql(
#         f"""SELECT * 
#         FROM `{PROJECT}.{dataset}.INFORMATION_SCHEMA.COLUMNS`
#         WHERE table_name = '{table}'""", 
#         billing_project_id=PROJECT_ID)
df = bd.read_sql(
        f"""SELECT * 
        FROM `{PROJECT}.{dataset}.{table}`
        LIMIT 100""", 
        billing_project_id=PROJECT_ID)
df

Downloading: 100%|[32m██████████[0m|


Unnamed: 0,id_uf,sigla,nome,regiao
0,42,SC,Santa Catarina,Sul
1,41,PR,Paraná,Sul
2,43,RS,Rio Grande do Sul,Sul
3,11,RO,Rondônia,Norte
4,13,AM,Amazonas,Norte
5,17,TO,Tocantins,Norte
6,14,RR,Roraima,Norte
7,16,AP,Amapá,Norte
8,12,AC,Acre,Norte
9,15,PA,Pará,Norte
