# 📊 Análise Exploratória de Dados de da Rede Sonda 🌦️

## 📌 Introdução
Este notebook realiza uma análise exploratória dos dados meteorológicos coletados de diversas estações. O objetivo é entender a estrutura dos dados, avaliar sua qualidade e identificar padrões relevantes.

## 📂 Fonte dos Dados
- Arquivos CSV formatados armazenados no ftp
- Contêm medições de variáveis meteorológicas, solarimétricas e cameras.

## 🔍 Objetivos da Análise
1. **Carregar e explorar os dados**: verificar onde os dados estão armazenados, seu formato e estrutura.
2. **Dimensionamento e variáveis disponíveis**: entender o tamanho dos arquivos, número de registros e colunas.
3. **Análise temporal dos dados disponíveis**: identificar o período coberto e eventuais lacunas temporais.
4. **Visualização da distribuição espacial das estações**: verificar a abrangência geográfica das medições.
5. **Exploração inicial de distribuições**: histogramas e estatísticas básicas das variáveis.
6. **Análise de qualidade dos dados** *(última etapa)*: identificar valores ausentes, inconsistências e flags de qualidade.

### 1. Carregar e Explorar os Dados
Vamos começar listando o tamanho da base de dados que estão no diretório do ftp.

In [1]:
# Diretório onde os arquivos estão localizados
DIRETORIO = '../sonda/dados_formatados/'

In [2]:
# Exibe o tamanho de cada arquivo no diretório ordenado por tamanho de forma decrescente
!du -h --max-depth=1 {DIRETORIO} | sort -rh

11G	../sonda/dados_formatados/
1,2G	../sonda/dados_formatados/BRB
1011M	../sonda/dados_formatados/PTR
989M	../sonda/dados_formatados/FLN
959M	../sonda/dados_formatados/PMA
759M	../sonda/dados_formatados/JOI
722M	../sonda/dados_formatados/CPA
714M	../sonda/dados_formatados/SMS
686M	../sonda/dados_formatados/SLZ
613M	../sonda/dados_formatados/NAT
536M	../sonda/dados_formatados/CGR
464M	../sonda/dados_formatados/SBR
413M	../sonda/dados_formatados/TMA
365M	../sonda/dados_formatados/MCL
349M	../sonda/dados_formatados/ORN
300M	../sonda/dados_formatados/UBE
284M	../sonda/dados_formatados/BJL
175M	../sonda/dados_formatados/TLG
174M	../sonda/dados_formatados/CAI
171M	../sonda/dados_formatados/CTB
55M	../sonda/dados_formatados/CBA
196K	../sonda/dados_formatados/TRI
196K	../sonda/dados_formatados/SPK
196K	../sonda/dados_formatados/SCR
196K	../sonda/dados_formatados/RLM
196K	../sonda/dados_formatados/OPO
196K	../sonda/dados_formatados/MDS
196K	../sonda/dados_formatados/LEB
196K	../sonda/dados_form

Existem 3 tipos de dados:
- Dados Meteorológicos
- Dados Solarimétricos
- Dados Anemometricos

In [3]:
import glob

# listar todos os dados Meteorológicos usando o glob só para o tipo de arquivo .csv
dados_metereologicos = glob.glob(DIRETORIO + "*/Meteorologicos/**/*.csv", recursive=True)
# Remove arquivos que contenham 'YYYY_MM_MD_DQC'
dados_metereologicos = [arquivo for arquivo in dados_metereologicos if 'YYYY_MM' not in arquivo]

# listar todos os dados de Solarimétricos usando o glob só para o tipo de arquivo .csv
dados_solarimetricos = glob.glob(DIRETORIO + "*/Solarimetricos/**/*.csv", recursive=True)
# Remove arquivos que contenham 'YYYY_MM_MD_DQC'
dados_solarimetricos = [arquivo for arquivo in dados_solarimetricos if 'YYYY_MM' not in arquivo]

# listar todos os dados de Anemometricos usando o glob só para o tipo de arquivo .csv
dados_anemometricos = glob.glob(DIRETORIO + "*/Anemometricos/**/*.csv", recursive=True)
# Remove arquivos que contenham 'YYYY_MM_MD_DQC'
dados_anemometricos = [arquivo for arquivo in dados_anemometricos if 'YYYY_MM' not in arquivo]

In [4]:
# Listar a quantidade de arquivos em cada categoria
print(f"Quantidade de arquivos Meteorologicos: {len(dados_metereologicos)}")
print(f"Quantidade de arquivos Solarimetricos: {len(dados_solarimetricos)}")
print(f"Quantidade de arquivos Anemometricos: {len(dados_anemometricos)}")

Quantidade de arquivos Meteorologicos: 1036
Quantidade de arquivos Solarimetricos: 1022
Quantidade de arquivos Anemometricos: 0


In [5]:
# Apontar o caminho das bases de dados
BASE_METEOROLOGICA = '../sonda/dados_meteorologicos.parquet'
BASE_SOLARIMETRICA = '../sonda/dados_solarimetricos.parquet'

In [6]:
# Importar a biblioteca DuckDB para manipulação de dados
import duckdb

# Conectar ao banco de dados DuckDB
con = duckdb.connect()

In [7]:
import os
import pandas as pd

# Verifica se o arquivo existe, caso exista leia o arquivo usando duckdb
if  os.path.exists(BASE_METEOROLOGICA):
    # Ler o arquivo parquet parquet usando duckdb
    con.execute(f"CREATE TABLE meteorological AS SELECT * FROM parquet_scan('{BASE_METEOROLOGICA}')")
    print("Tabela meteorological criada com sucesso")
else:
    # Criar a tabela temporária vazia com base no primeiro arquivo CSV
    query = f"""
    CREATE TABLE meteorological AS 
    SELECT * FROM read_csv_auto('{dados_metereologicos[0]}', 
                                skip=2, 
                                union_by_name=True, 
                                all_varchar=True) 
    WHERE 1=0
    """
    con.execute(query)  # Cria a tabela vazia com o esquema correto
    # Inserir os dados em lote sem carregar tudo na memória
    for arquivo in sorted(dados_metereologicos):
        try:
            query = f"""
            INSERT INTO meteorological 
            SELECT * FROM read_csv_auto('{arquivo}', skip=2, union_by_name=True, all_varchar=True)
            """
            con.execute(query)
        except Exception as e:
            print(f"⚠️ Erro ao processar o arquivo: {arquivo}")
            print(f"   ➡️ Motivo: {e}")

    # Ler a primeira linha do arquivo CSV para obter os nomes das colunas
    header = pd.read_csv(dados_metereologicos[0], nrows=1)
    column_names = header.columns.tolist()  # Nomes das colunas

    # Renomear as colunas com base nos nomes do arquivo CSV
    for i, col in enumerate(column_names):
        # Formata o nome da coluna conforme a nomenclatura do DuckDB (column01, column02, ...)
        column_name = f"column{str(i).zfill(2)}"  # Preenche com zero à esquerda para 2 dígitos
        # Renomeia a coluna pelo nome correto
        con.execute(f"ALTER TABLE meteorological RENAME COLUMN {column_name} TO {col}")

    # Salvar os dados em Parquet na BASE_METEOROLOGICA
    con.execute(f"COPY meteorological TO '{BASE_METEOROLOGICA}' (FORMAT 'parquet')")

⚠️ Erro ao processar o arquivo: ../sonda/dados_formatados/PMA/Meteorologicos/2008/PMA_2008_06_MD_formatado.csv
   ➡️ Motivo: Binder Error: table meteorological has 13 columns but 1 values were supplied
⚠️ Erro ao processar o arquivo: ../sonda/dados_formatados/SLZ/Meteorologicos/2015/SLZ_2015_01_MD_formatado.csv
   ➡️ Motivo: Binder Error: table meteorological has 13 columns but 1 values were supplied


In [8]:
# Exibir as primeiras linhas da tabela meteorological
con.execute("SELECT * FROM meteorological LIMIT 5").fetch_df()

Unnamed: 0,acronym,timestamp,year,day,min,tp_sfc,humid_sfc,press,rain,ws10_avg,ws10_std,wd10_avg,wd10_std
0,BJL,2014-06-01 00:00:00,2014,152,0,3333.0,3333.0,3333.0,-5555,-5555,-5555,-5555,-5555
1,BJL,2014-06-01 00:10:00,2014,152,10,3333.0,3333.0,3333.0,-5555,-5555,-5555,-5555,-5555
2,BJL,2014-06-01 00:20:00,2014,152,20,3333.0,3333.0,3333.0,-5555,-5555,-5555,-5555,-5555
3,BJL,2014-06-01 00:30:00,2014,152,30,3333.0,3333.0,3333.0,-5555,-5555,-5555,-5555,-5555
4,BJL,2014-06-01 00:40:00,2014,152,40,3333.0,3333.0,3333.0,-5555,-5555,-5555,-5555,-5555


In [None]:
# Verifica se o arquivo '../sonda/dados_meteorologicos.parquet' existe, caso exista leia usando duckdb
import os
import duckdb

# Verifica se o arquivo Parquet já existe
if os.path.exists('../sonda/dados_meteorologicos.parquet'):
    conn = duckdb.connect(database=':memory:', read_only=True)
    meteorologico_parquet = conn.table('dados_meteorologicos.parquet')
    schema_parquet = meteorologico_parquet.schema()
    print(f"Schema do arquivo parquet: {schema_parquet}")

else:
    # Conectar ao banco de dados DuckDB
    con = duckdb.connect()

    # Definir schema manualmente (exemplo)
    schema_definido = """
    acronym TEXT, timestamp TEXT, year INTEGER, day INTEGER, min INTEGER, 
    tp_sfc FLOAT, humid_sfc FLOAT, press FLOAT, rain FLOAT, 
    ws10_avg FLOAT, ws10_std FLOAT, wd10_avg FLOAT, wd10_std FLOAT
    """

    # Criar tabela com schema fixo
    con.execute(f"CREATE TABLE meteorological ({schema_definido})")

    # Inserir os dados de forma incremental
    for arquivo in sorted(dados_metereologicos):
        query = f"""
        INSERT INTO meteorological 
        SELECT 
            acronym::TEXT, timestamp::TEXT, year::INTEGER, day::INTEGER, min::INTEGER, 
            tp_sfc::FLOAT, humid_sfc::FLOAT, press::FLOAT, rain::FLOAT, 
            ws10_avg::FLOAT, ws10_std::FLOAT, wd10_avg::FLOAT, wd10_std::FLOAT
        FROM read_csv_auto('{arquivo}', skip=2, union_by_name=True, auto_type=False)
        """
        con.execute(query)

    # Salvar como Parquet
    con.execute("COPY meteorological TO '../sonda/dados_meteorologicos.parquet' (FORMAT 'parquet')")

    # Opcional: Verificar schema do Parquet
    meteorologico_parquet = con.table('meteorological')
    print(meteorologico_parquet.limit(5).df())  # Mostra 5 primeiras linhas para checar

    con.close()

In [None]:
meteorologico_parquet

In [None]:
# Local onde salvar o arquivo Parquet
arquivo_parquet = '../sonda/dados_meteorologicos.parquet'

# Conectar ao banco de dados DuckDB
con = duckdb.connect()

# Criar uma tabela temporária para armazenar os dados
con.execute("CREATE TABLE meteorological AS SELECT * FROM read_csv_auto('" + dados_metereologicos[0] + "', skip=2, union_by_name=True) WHERE 1=0")

# Inserir os dados de forma incremental, sem carregar tudo na memória
for arquivo in sorted(dados_metereologicos):  # Pode usar toda a lista, removi o slice [0:10] para mais arquivos
    query = f"""
    INSERT INTO meteorological 
    SELECT * FROM read_csv_auto('{arquivo}', skip=2, union_by_name=True)
    """
    con.execute(query)

# Pega o header do primeiro arquivo
header = pd.read_csv(dados_metereologicos[0], nrows=1)

# Salvar o resultado final em um arquivo Parquet sem carregar tudo na memória
con.execute("COPY (SELECT * FROM meteorological) TO 'dados_meteorologicos.parquet' (FORMAT 'parquet')")

# Fecha a conexão com DuckDB
con.close()

In [None]:
df_meteorologico

In [None]:
# Função para leitura rápida de arquivos
def ler_csv_rapido(file, schema):
    return pl.scan_csv(
        file, 
        skip_rows_after_header=1,  # Pula a linha logo após o cabeçalho
        schema_overrides=schema,  # Utiliza o schema do primeiro arquivo
    )

In [None]:
import os

# Verifica se arquivo já existe em ../sonda/ caso sim, leia o arquivo, caso não, leia o arqui
if os.path.exists('../sonda/meteorologicos.parquet'):
    df_meteorologico = pl.read_parquet('../sonda/meteorologicos.parquet')
else:
    # Lê todos os arquivos de dados meteorológicos
    df_meteorologico = pl.concat([ler_csv_rapido(file, schema_meteorologico) for file in dados_metereologicos])
    # Materializar os dados na memória
    df_meteorologico = df_meteorologico.collect()
    # Salva os dados em um arquivo parquet
    df_meteorologico.write_parquet('../sonda/meteorologicos.parquet', use_pyarrow=True)

In [None]:
# Para os dados solarimetricos como são maiores, divida em partes
NUM_PARTES = 30

# Verifica se arquivo já existe em ../sonda/ caso sim, leia o arquivo, caso não, leia o arqui
if os.path.exists('../sonda/solarimetricos.parquet'):
    df_solarimetrico = pl.read_parquet('../sonda/solarimetricos.parquet')
else:
    # Função para unir os arquivos Parquet em partes, processando de forma incremental
    def unir_parquets_incremental(arquivos_parquet, arquivo_saida, batch_size=5):
        # Inicializa uma lista para os frames lidos, começando com o primeiro arquivo
        df_parcial = None
        
        # Processa os arquivos em lotes
        for i in range(0, len(arquivos_parquet), batch_size):
            # Lê os arquivos do lote atual
            batch_files = arquivos_parquet[i:i+batch_size]
            lazy_frames = [pl.scan_parquet(file) for file in batch_files]
            
            # Concatena os arquivos no lote
            df_batch = pl.concat(lazy_frames)
            
            # Materializa o DataFrame antes de adicionar ao DataFrame parcial
            df_batch = df_batch.collect()

            # Se for o primeiro lote, inicializa o DataFrame final
            if df_parcial is None:
                df_parcial = df_batch
            else:
                # Caso contrário, concatena o lote no DataFrame final
                df_parcial = df_parcial.extend(df_batch)
            
            # Materializa o DataFrame final antes de escrever
            df_parcial.write_parquet(arquivo_saida, use_pyarrow=True)

    # # Gerar os arquivos Parquet em partes
    # for i in range(NUM_PARTES):
    #     # Lê todos os arquivos de dados solarimetricos para a parte i
    #     df_solar = pl.concat([ler_csv_rapido(file, schema_solarimetrico) for file in dados_solarimetricos[i::NUM_PARTES]])
    #     # Materializar os dados na memória
    #     df_solar = df_solar.collect()
    #     # Salva os dados em um arquivo parquet
    #     df_solar.write_parquet(f'../sonda/solarimetricos_{i}.parquet', use_pyarrow=True)

    # Agora, unir todos os arquivos Parquet gerados
    arquivos_parquet = [f'../sonda/solarimetricos_{i}.parquet' for i in range(NUM_PARTES)]

    # Define o caminho do arquivo final
    arquivo_saida = '../sonda/solarimetricos.parquet'

    # Unir os arquivos em partes, processando de forma incremental
    unir_parquets_incremental(arquivos_parquet, arquivo_saida, batch_size=5)
    
    # Remove os arquivos temporários
    for arquivo in arquivos_parquet:
        os.remove(arquivo)

    # Lê o arquivo final
    df_solarimetrico = pl.read_parquet(arquivo_saida)

In [None]:
df_solarimetrico