In [6]:
from pathlib import Path
import pandas as pd
import traceback
import pyarrow.dataset as ds
import numpy as np

# ajuste conforme seu caminho
ROOT = Path(r"C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee\Dados por município 12.0")
OUT_DIR = Path(r"C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee")
PARTS_OUT = OUT_DIR / "parts_parquet"
FINAL_PARQUET = OUT_DIR / "emissoes_unidas.parquet"
AGG_DECADA_PARQUET = OUT_DIR / "emissoes_por_decada.parquet"

PARTS_OUT.mkdir(parents=True, exist_ok=True)
OUT_DIR.mkdir(parents=True, exist_ok=True)

print("ROOT:", ROOT)
print("PARTS_OUT:", PARTS_OUT)


ROOT: C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee\Dados por município 12.0
PARTS_OUT: C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee\parts_parquet


In [7]:
def is_lfs_pointer(path: Path, nlines: int = 3) -> bool:
    try:
        with path.open("r", encoding="utf-8", errors="ignore") as f:
            head = "".join([next(f) for _ in range(nlines)])
        return "git-lfs.github.com/spec/v1" in head or head.strip().startswith("version https://git-lfs.github.com/spec/v1")
    except Exception:
        return False

def try_read_csv(path: Path, encodings=None, seps=None, nrows_sample=5):
    """Tenta ler o CSV com encodings e separators comuns. Retorna DataFrame ou lança erro."""
    if encodings is None:
        encodings = ["utf-8", "latin1", "iso-8859-1"]
    if seps is None:
        seps = [",", ";", "\t"]
    last_err = None
    for enc in encodings:
        for sep in seps:
            try:
                # lê pequena amostra para validar
                pd.read_csv(path, encoding=enc, sep=sep, nrows=nrows_sample)
                # se ok, faz leitura completa (se muito grande, deixar padrão; se precisar usar chunks, adaptar)
                return pd.read_csv(path, encoding=enc, sep=sep)
            except Exception as e:
                last_err = e
                continue
    raise ValueError(f"Não consegui ler {path} com encodings/separadores testados. Último erro: {last_err}")


In [8]:
all_csvs = sorted(ROOT.rglob("*.csv"))
print("Total CSVs encontrados:", len(all_csvs))

lfs_pointers = [p for p in all_csvs if is_lfs_pointer(p)]
print("Ponteiros LFS detectados:", len(lfs_pointers))
if lfs_pointers:
    print("Mostrando até 10 ponteiros LFS:")
    for p in lfs_pointers[:10]:
        print(" -", p.relative_to(ROOT))
print("Se houver ponteiros LFS, rode `git lfs pull` na pasta do repositório e execute novamente.")


Total CSVs encontrados: 44
Ponteiros LFS detectados: 0
Se houver ponteiros LFS, rode `git lfs pull` na pasta do repositório e execute novamente.


In [9]:
# Leitura file-by-file e gravação em partes parquet (evita estourar disco/ram)
parts = []
errors = []
i = 0

for p in all_csvs:
    try:
        if is_lfs_pointer(p):
            print("Ponteiro LFS (pular):", p.relative_to(ROOT))
            continue
        print("Lendo:", p.relative_to(ROOT))
        df = try_read_csv(p)
        # garantir nomes limpos
        df.columns = [c.strip() for c in df.columns]
        # adicionar coluna de origem para rastreabilidade
        df["_origem_arquivo"] = str(p.relative_to(ROOT))
        part_path = PARTS_OUT / f"part_{i:04d}.parquet"
        df.to_parquet(part_path, index=False, compression="snappy")
        parts.append(part_path)
        i += 1
    except Exception as e:
        errors.append((p, str(e)))
        print("Erro lendo", p.relative_to(ROOT), ":", e)

print("Parts gravadas:", len(parts))
print("Erros:", len(errors))


Lendo: AC\gases.csv
Lendo: AM\ar4.csv
Lendo: AM\ar6.csv
Lendo: AP\ar2.csv
Lendo: AP\ar4.csv
Lendo: AP\ar5.csv
Lendo: AP\ar6.csv
Lendo: AP\gases.csv
Lendo: CE\ar5.csv
Lendo: DF\ar2.csv
Lendo: DF\ar4.csv
Lendo: DF\ar5.csv
Lendo: DF\gases.csv
Lendo: ES\ar2.csv
Lendo: ES\ar6.csv
Lendo: GO\ar2.csv
Lendo: GO\ar4.csv
Lendo: GO\ar5.csv
Lendo: MT\ar2.csv
Lendo: MT\ar5.csv
Lendo: MT\ar6.csv
Lendo: MT\gases.csv


  return pd.read_csv(path, encoding=enc, sep=sep)


Lendo: NA\ar5.csv
Lendo: NA\ar6.csv
Lendo: PA\ar5.csv
Lendo: PA\ar6.csv
Lendo: PA\gases.csv
Lendo: PE\ar4.csv
Lendo: PE\ar5.csv
Lendo: PI\ar5.csv
Lendo: PR\ar5.csv
Lendo: RO\ar5.csv
Lendo: RR\ar2.csv
Lendo: RR\ar4.csv
Lendo: RR\ar5.csv
Lendo: RR\gases.csv
Lendo: RS\ar5.csv


  return pd.read_csv(path, encoding=enc, sep=sep)


Lendo: RS\ar6.csv


  return pd.read_csv(path, encoding=enc, sep=sep)


Lendo: SP\ar2.csv


  return pd.read_csv(path, encoding=enc, sep=sep)


Lendo: SP\gases.csv


  return pd.read_csv(path, encoding=enc, sep=sep)


Lendo: TO\ar4.csv
Lendo: TO\ar5.csv
Lendo: TO\ar6.csv
Lendo: TO\gases.csv
Parts gravadas: 44
Erros: 0


In [10]:
from pathlib import Path
import pandas as pd
import numpy as np
from tqdm import tqdm

PARTS_OUT = Path(r"C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee\parts_parquet")
AGG_PARTS = PARTS_OUT / "agg_parts"
AGG_PARTS.mkdir(parents=True, exist_ok=True)

def detect_year_cols(cols):
    return [c for c in cols if c.isdigit() and 1800 <= int(c) <= 2100]

# ajuste o nome do setor caso necessário
sector_candidates = ["Setor de emissão","Setor de Emissão","Setor","Categoria emissora","Categoria Emissora"]
def find_sector_col(cols):
    for cand in sector_candidates:
        if cand in cols:
            return cand
    # fallback: tenta encontrar coluna que contenha 'setor' ignorando case
    for c in cols:
        if 'setor' in c.lower() or 'categoria' in c.lower():
            return c
    raise KeyError("Coluna de setor não encontrada. Colunas: " + ", ".join(cols))

def map_sector(text):
    t = str(text).lower()
    if "agro" in t or "pecuár" in t or "agric" in t:
        return "Agropecuária"
    if "energia" in t or "combust" in t or "bunker" in t or "queima" in t:
        return "Energia e Combustíveis"
    if "process" in t or "industrial" in t or "processos" in t:
        return "Processos Industriais"
    if "resíduo" in t or "residuos" in t or "lixo" in t or "saneamento" in t:
        return "Resíduos"
    if "uso do solo" in t or "mudança" in t or "desmat" in t or "flore" in t:
        return "Mudança de Uso do Solo"
    return str(text).strip()

parts = sorted(PARTS_OUT.glob("part_*.parquet"))
print("Parts encontradas:", len(parts))

agg_part_files = []
for idx, part in enumerate(tqdm(parts, desc="Processando parts")):
    try:
        df = pd.read_parquet(part)   # parte por parte
        df.columns = [c.strip() for c in df.columns]
        year_cols = detect_year_cols(df.columns)
        if not year_cols:
            # se não tiver anos, pula
            continue

        id_vars = [c for c in df.columns if c not in year_cols]
        # melt para long (apenas as colunas desta part)
        dlong = df.melt(id_vars=id_vars, value_vars=year_cols, var_name="ano", value_name="valor")
        # limpeza e tipos
        dlong = dlong.dropna(subset=["valor"])
        dlong["ano"] = dlong["ano"].astype(int)
        dlong["valor"] = pd.to_numeric(dlong["valor"].astype(str).str.replace(",",".",regex=False), errors="coerce")
        dlong = dlong.dropna(subset=["valor"])

        # mapear setor (coluna detectada dinamicamente)
        sector_col = find_sector_col(dlong.columns)
        dlong["Setor_Agrupado"] = dlong[sector_col].apply(map_sector)

        # criar decada
        dlong["decada"] = (dlong["ano"] // 10 * 10).astype(int).astype(str) + "s"

        # escolher colunas para agregação (só as que existem)
        agg_by = ["decada", "Setor_Agrupado"]
        if "Cidade" in dlong.columns:
            agg_by.append("Cidade")
        if "Gas" in dlong.columns:
            agg_by.append("Gas")
        else:
            # tentar detectar coluna de gás por heurística
            gas_cand = [c for c in dlong.columns if "gás" in c.lower() or "gas"==c.lower()]
            if gas_cand:
                agg_by.append(gas_cand[0])
        if "Bioma" in dlong.columns:
            agg_by.append("Bioma")

        # agregação por esta part
        df_agg = dlong.groupby(agg_by, dropna=False)["valor"].sum().reset_index().rename(columns={"valor":"emissao_t"})

        # salvar agregação desta part
        outp = AGG_PARTS / f"agg_part_{idx:04d}.parquet"
        df_agg.to_parquet(outp, index=False, compression="snappy")
        agg_part_files.append(outp)
    except Exception as e:
        print("Erro em", part.name, ":", e)

print("Agg parts geradas:", len(agg_part_files))


Parts encontradas: 44


Processando parts: 100%|██████████| 44/44 [13:12<00:00, 18.02s/it]

Agg parts geradas: 44





In [11]:
# Ler todos os agg_parts (muito menores) e somar para obter o agregado final
agg_files = sorted((PARTS_OUT / "agg_parts").glob("agg_part_*.parquet"))
print("Arquivos de agregado a combinar:", len(agg_files))

dfs_small = []
for f in agg_files:
    dfs_small.append(pd.read_parquet(f))

if not dfs_small:
    raise RuntimeError("Nenhum agg_part encontrado — verifique a célula anterior.")

df_agg_all = pd.concat(dfs_small, ignore_index=True)
# agrega novamente por garantir soma correta entre partes
group_cols = [c for c in df_agg_all.columns if c != "emissao_t"]
df_decada_final = df_agg_all.groupby(group_cols, dropna=False)["emissao_t"].sum().reset_index()

print("df_decada_final shape:", df_decada_final.shape)
display(df_decada_final.head())

# salvar resultado final
out_final = PARTS_OUT.parent / "emissoes_por_decada_streaming.parquet"
df_decada_final.to_parquet(out_final, index=False, compression="snappy")
print("Salvo em:", out_final)


Arquivos de agregado a combinar: 44
df_decada_final shape: (486495, 6)


Unnamed: 0,decada,Setor_Agrupado,Cidade,Gás,Bioma,emissao_t
0,1970s,Agropecuária,Abadia de Goiás (GO),CO2e (t) GTP-AR2,Cerrado,7093.951239
1,1970s,Agropecuária,Abadia de Goiás (GO),CO2e (t) GTP-AR4,Cerrado,7093.951239
2,1970s,Agropecuária,Abadia de Goiás (GO),CO2e (t) GTP-AR5,Cerrado,6148.091074
3,1970s,Agropecuária,Abadia de Goiás (GO),CO2e (t) GWP-AR2,Cerrado,8144.906978
4,1970s,Agropecuária,Abadia de Goiás (GO),CO2e (t) GWP-AR4,Cerrado,7829.620257


Salvo em: C:\Users\Vitoria Oliveira\Desktop\bigdata_dp\aps\emissoes_gee\emissoes_por_decada_streaming.parquet
