<a href="https://colab.research.google.com/github/nathaliacastelobranco/infnet-infraestrutura-hadoop/blob/main/ELT_CAGED_Hadoop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ETL CAGED

In [1]:
!pip install py7zr pyarrow google-cloud-storage -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/69.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/97.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m97.0/97.0 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.7/51.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m142.7/142.7 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m413.6/413.6 kB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# ========================================
# 1. Dependências e autenticação
# ========================================
import os
import shutil
import py7zr
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from ftplib import FTP
from google.cloud import storage

from google.colab import auth
auth.authenticate_user()

In [5]:
# ========================================
# 2. Configurações
# ========================================
FTP_HOST='ftp.mtps.gov.br'
FTP_BASE_PATH = "pdet/microdados/NOVO CAGED"
BASE_PATH = "/content/caged_data"
BUCKET_NAME = "bucket_caged"

anos = [2020, 2021, 2022, 2023, 2024, 2025]
tipos = ["MOV", "EXC", "FOR"]

os.makedirs(BASE_PATH, exist_ok=True)

In [6]:
# ========================================
# 3. Funções auxiliares
# ========================================
def baixar_arquivos(ano, mes):
    """
    Baixa todos os .7z da pasta do mês (retorna lista de paths locais).
    """
    baixados = []
    path = f"{FTP_BASE_PATH}/{ano}/{ano}{mes:02d}"
    try:
        with FTP(FTP_HOST, user="anonymous", passwd="aaaaa", encoding="latin-1") as ftp:
            try:
                ftp.cwd(path)
            except Exception as e:
                print(f"[WARN] Pasta não encontrada: {path}")
                return []
            files = ftp.nlst()
            for f in files:
                if f.lower().endswith(".7z"):
                    local_file = os.path.join(BASE_PATH, f)
                    if not os.path.exists(local_file):
                        print(f"Baixando {f} ...")
                        with open(local_file, "wb") as lf:
                            ftp.retrbinary(f"RETR {f}", lf.write)
                    else:
                        print(f"Já existe localmente: {f}")
                    baixados.append(local_file)
    except Exception as e:
        print(f"[ERRO FTP] {e}")
    return baixados

def extrair_7z_para_pasta(arquivo_7z, out_dir):
    """
    Extrai o .7z para uma pasta própria (evita conflito de nomes).
    Retorna lista de paths dos arquivos extraídos.
    """
    os.makedirs(out_dir, exist_ok=True)
    try:
        with py7zr.SevenZipFile(arquivo_7z, mode="r") as z:
            z.extractall(path=out_dir)
    except Exception as e:
        print(f"[ERRO extração] {arquivo_7z}: {e}")
        return []
    arquivos = []
    for root, _, files in os.walk(out_dir):
        for f in files:
            arquivos.append(os.path.join(root, f))
    return arquivos



def processar_tipo_para_parquet_streaming(ano, tipo, base_path, output_parquet_local, sep=";", encoding="latin-1"):
    """
    Procura por arquivos extraídos que correspondam a CAGED{TIPO}{ANO}*
    e escreve em streaming para um único arquivo Parquet.
    """
    targets = []
    for root, _, files in os.walk(base_path):
        for fname in files:
            fname_up = fname.upper()
            if fname_up.startswith(f"CAGED{tipo}{ano}") and fname.lower().endswith((".txt", ".csv")):
                targets.append(os.path.join(root, fname))

    if not targets:
        print(f"[INFO] Nenhum arquivo encontrado para {ano} {tipo}")
        return False

    if os.path.exists(output_parquet_local):
        os.remove(output_parquet_local)

    writer = None
    schema = None

    for arq in sorted(targets):
        print(f" Lendo em chunks: {arq}")
        try:
            for chunk in pd.read_csv(arq, sep=sep, encoding=encoding, chunksize=200_000, low_memory=False):
                chunk.columns = [c.strip() for c in chunk.columns]

                # converte chunk para pyarrow table
                if writer is None:
                    table = pa.Table.from_pandas(chunk, preserve_index=False)
                    schema = table.schema
                    writer = pq.ParquetWriter(output_parquet_local, schema, compression="snappy")
                    writer.write_table(table)
                else:
                    # tenta converter para o mesmo schema; se faltar coluna, adiciona com NA
                    try:
                        table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
                    except Exception:
                        cols_target = [f.name for f in schema]
                        df2 = chunk.copy()
                        for c in cols_target:
                            if c not in df2.columns:
                                df2[c] = pd.NA
                        df2 = df2[cols_target]
                        table = pa.Table.from_pandas(df2, schema=schema, preserve_index=False)
                    writer.write_table(table)
        except Exception as e:
            print(f"[ERRO leitura {arq}] {e}")

    if writer is not None:
        writer.close()
        print(f"[OK] Parquet criado: {output_parquet_local}")
        return True
    else:
        print(f"[WARN] Nenhum chunk escrito para {ano} {tipo}")
        return False


def upload_to_gcs(local_file, bucket_name, remote_path):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(remote_path)
    blob.upload_from_filename(local_file)
    print(f"✅ Upload: gs://{bucket_name}/{remote_path}")

In [7]:
# ========================================
# 4. Pipeline
# ========================================
for ano in anos:
    print(f"\n==== PROCESSANDO ANO {ano} ====")
    for mes in range(1, 13):
        print(f"--> Mês {mes:02d}")
        arquivos_7z = baixar_arquivos(ano, mes)
        for arq in arquivos_7z:
            out_dir = os.path.join(BASE_PATH, os.path.basename(arq).replace(".7z", ""))
            if not os.path.exists(out_dir) or not os.listdir(out_dir):
                print(f"  Extraindo {os.path.basename(arq)} para {out_dir}")
                extrair_7z_para_pasta(arq, out_dir)
            else:
                print(f"  Já extraído: {out_dir}")

    for tipo in tipos:
        print(f"\n--- Juntando {ano} {tipo} em Parquet ---")
        local_parquet = os.path.join(BASE_PATH, f"{ano}{tipo}.parquet")
        ok = processar_tipo_para_parquet_streaming(ano, tipo, BASE_PATH, local_parquet)
        if ok:
            remote = f"caged/{ano}/{tipo}/{ano}{tipo}.parquet"
            upload_to_gcs(local_parquet, BUCKET_NAME, remote)
        else:
            print(f"[INFO] Nada para subir para {ano}/{tipo}")


==== PROCESSANDO ANO 2020 ====
--> Mês 01
Baixando CAGEDMOV202001.7z ...
  Extraindo CAGEDMOV202001.7z para /content/caged_data/CAGEDMOV202001
--> Mês 02
Baixando CAGEDFOR202002.7z ...
Baixando CAGEDMOV202002.7z ...
  Extraindo CAGEDFOR202002.7z para /content/caged_data/CAGEDFOR202002
  Extraindo CAGEDMOV202002.7z para /content/caged_data/CAGEDMOV202002
--> Mês 03
Baixando CAGEDFOR202003.7z ...
Baixando CAGEDMOV202003.7z ...
  Extraindo CAGEDFOR202003.7z para /content/caged_data/CAGEDFOR202003
[ERRO extração] /content/caged_data/CAGEDFOR202003.7z: Corrupt input data
  Extraindo CAGEDMOV202003.7z para /content/caged_data/CAGEDMOV202003
--> Mês 04
Baixando CAGEDEXC202004.7z ...
Baixando CAGEDFOR202004.7z ...
Baixando CAGEDMOV202004.7z ...
  Extraindo CAGEDEXC202004.7z para /content/caged_data/CAGEDEXC202004
  Extraindo CAGEDFOR202004.7z para /content/caged_data/CAGEDFOR202004
  Extraindo CAGEDMOV202004.7z para /content/caged_data/CAGEDMOV202004
--> Mês 05
Baixando CAGEDEXC202005.7z ...
