In [29]:
import os
import zipfile
import requests
from datetime import datetime
import re
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 500)

In [None]:
BASE_URL = "https://downloads.climatetrace.org/v5.1.0/sector_packages"

SECTORS = [
    "manufacturing",
    "mineral_extraction",
    "power",
]

EMISSION_TYPES = {
    "co2e_100yr": "co2e_100yr",
    "co2e_20yr": "co2e_20yr",
    "co2": "co2",
    "ch4": "ch4",
    "n2o": "n2o",
    "pm2.5": "pm2_5",
    "vocs": "vocs",
    "co": "co",
    "nh3": "nh3",
    "nox": "nox",
    "so2": "so2",
    "bc": "bc",
    "oc": "oc",
}

DOWNLOAD_DIR = "downloads"
EXTRACT_DIR = "extracted"

os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(EXTRACT_DIR, exist_ok=True)

def log(message):
    ts = datetime.utcnow().isoformat(timespec="seconds")
    print(f"[{ts}] {message}")

def download_zip(url, zip_path):
    r = requests.get(url, stream=True, timeout=60)
    if r.status_code == 200:
        with open(zip_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
        return "downloaded"
    elif r.status_code == 404:
        return "missing"
    else:
        return f"http_{r.status_code}"

def extract_zip(zip_path, extract_to):
    os.makedirs(extract_to, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(extract_to)

for sector in SECTORS:
    for emission_name, emission_slug in EMISSION_TYPES.items():

        url = f"{BASE_URL}/{emission_slug}/{sector}.zip"
        zip_filename = f"{sector}__{emission_name}.zip"
        zip_path = os.path.join(DOWNLOAD_DIR, zip_filename)
        extract_path = os.path.join(EXTRACT_DIR, sector, emission_name)

        try:
            status = download_zip(url, zip_path)

            if status == "downloaded":
                extract_zip(zip_path, extract_path)
                log(f"OK     | {sector:<18} | {emission_name:<10} | extracted → {extract_path}")
            elif status == "missing":
                log(f"MISSING| {sector:<18} | {emission_name:<10} | not available")
            else:
                log(f"ERROR  | {sector:<18} | {emission_name:<10} | {status}")

        except Exception as e:
            log(f"EXCEPT | {sector:<18} | {emission_name:<10} | {e}")


  ts = datetime.utcnow().isoformat(timespec="seconds")


[2025-12-15T04:52:13] OK     | manufacturing      | co2e_100yr | extracted → extracted/manufacturing/co2e_100yr
[2025-12-15T04:52:15] OK     | manufacturing      | co2e_20yr  | extracted → extracted/manufacturing/co2e_20yr
[2025-12-15T04:52:20] OK     | manufacturing      | co2        | extracted → extracted/manufacturing/co2
[2025-12-15T04:52:23] OK     | manufacturing      | ch4        | extracted → extracted/manufacturing/ch4
[2025-12-15T04:52:28] OK     | manufacturing      | n2o        | extracted → extracted/manufacturing/n2o
[2025-12-15T04:52:34] OK     | manufacturing      | pm2.5      | extracted → extracted/manufacturing/pm2.5
[2025-12-15T04:52:34] OK     | manufacturing      | vocs       | extracted → extracted/manufacturing/vocs
[2025-12-15T04:52:39] OK     | manufacturing      | co         | extracted → extracted/manufacturing/co
[2025-12-15T04:52:44] OK     | manufacturing      | nh3        | extracted → extracted/manufacturing/nh3
[2025-12-15T04:52:48] OK     | manufactu

In [8]:
import os
import sys

def print_tree(root_path, prefix=""):
    try:
        items = sorted(os.listdir(root_path))
    except PermissionError:
        print(prefix + "└── [Permission Denied]")
        return

    for index, item in enumerate(items):
        path = os.path.join(root_path, item)
        is_last = index == len(items) - 1

        connector = "└── " if is_last else "├── "
        print(prefix + connector + item)

        if os.path.isdir(path):
            extension = "    " if is_last else "│   "
            print_tree(path, prefix + extension)

print_tree('/Users/annayuen/Desktop/ERG/Master Thesis/industrial_decarb/climate_trace_data/extracted/manufacturing')

├── .DS_Store
├── bc
│   ├── .DS_Store
│   ├── ABOUT_THE_DATA
│   │   ├── about_the_data_v5_1_0.pdf
│   │   └── detailed_data_schema_v5_1_0.csv
│   └── DATA
│       ├── aluminum_emissions_sources_confidence_v5_1_0.csv
│       ├── aluminum_emissions_sources_ownership_v5_1_0.csv
│       ├── aluminum_emissions_sources_v5_1_0.csv
│       ├── cement_emissions_sources_confidence_v5_1_0.csv
│       ├── cement_emissions_sources_ownership_v5_1_0.csv
│       ├── cement_emissions_sources_v5_1_0.csv
│       ├── chemicals_emissions_sources_confidence_v5_1_0.csv
│       ├── chemicals_emissions_sources_ownership_v5_1_0.csv
│       ├── chemicals_emissions_sources_v5_1_0.csv
│       ├── food-beverage-tobacco_emissions_sources_confidence_v5_1_0.csv
│       ├── food-beverage-tobacco_emissions_sources_v5_1_0.csv
│       ├── glass_emissions_sources_confidence_v5_1_0.csv
│       ├── glass_emissions_sources_v5_1_0.csv
│       ├── iron-and-steel_emissions_sources_confidence_v5_1_0.csv
│       ├── iron-and-ste

In [15]:
FILE_RE = re.compile(r"(.+)_emissions_v5_1_0\.csv$")

def normalize_columns(df):
    df.columns = (
        df.columns
        .str.strip()
        .str.lower()
        .str.replace(" ", "_")
        .str.replace(r"[^\w_]", "", regex=True)
    )
    return df

def process_csv(path, sector, subsector, emission_type, version):
    df = pd.read_csv(path, low_memory=False)

    df = normalize_columns(df)

    # Add explicit metadata columns
    df["sector"] = sector
    df["subsector"] = subsector
    df["emission_type"] = emission_type
    df["source_version"] = version

    return df

def write_parquet(df, sector, subsector, emission_type, out_base):
    out_dir = os.path.join(
        out_base,
        f"sector={sector}",
        f"emission={emission_type}",
        f"subsector={subsector}"
    )
    os.makedirs(out_dir, exist_ok=True)

    table = pa.Table.from_pandas(df, preserve_index=False)

    pq.write_table(
        table,
        os.path.join(out_dir, "part-000.parquet"),
        compression="zstd",
        use_dictionary=True
    )

    print(f"✔ {subsector}: {len(df):,} rows, {df.shape[1]} columns")

# for file in sorted(os.listdir(RAW_BASE)):
#     match = FILE_RE.match(file)
#     if not match:
#         print(f"{file} not matched.")
#         continue

#     subsector = match.group(1)
#     csv_path = os.path.join(RAW_BASE, file)

#     print(f"Processing {file}")
#     df = process_csv(csv_path, subsector)
#     write_parquet(df, subsector)


aluminum_emissions_sources_confidence_v5_1_0.csv not matched.
aluminum_emissions_sources_ownership_v5_1_0.csv not matched.
Processing aluminum_emissions_sources_v5_1_0.csv
✔ aluminum: 17,727 rows, 49 columns
cement_emissions_sources_confidence_v5_1_0.csv not matched.
cement_emissions_sources_ownership_v5_1_0.csv not matched.
Processing cement_emissions_sources_v5_1_0.csv
✔ cement: 123,120 rows, 49 columns
chemicals_emissions_sources_confidence_v5_1_0.csv not matched.
chemicals_emissions_sources_ownership_v5_1_0.csv not matched.
Processing chemicals_emissions_sources_v5_1_0.csv
✔ chemicals: 24,795 rows, 49 columns
food-beverage-tobacco_emissions_sources_confidence_v5_1_0.csv not matched.
Processing food-beverage-tobacco_emissions_sources_v5_1_0.csv
✔ food-beverage-tobacco: 857,679 rows, 49 columns
glass_emissions_sources_confidence_v5_1_0.csv not matched.
Processing glass_emissions_sources_v5_1_0.csv
✔ glass: 27,474 rows, 49 columns
iron-and-steel_emissions_sources_confidence_v5_1_0.csv

In [18]:
for sector in SECTORS:
    for emission_type in EMISSION_TYPES.keys():
        print(f"{sector} - {emission_type}")
        RAW_BASE = f"extracted/{sector}/{emission_type}/DATA"
        OUT_BASE = "parquet/emissions"

        SECTOR = f"{sector}"
        EMISSION_TYPE = emission_type
        VERSION = "v5.1.0"

        for file in sorted(os.listdir(RAW_BASE)):
            match = FILE_RE.match(file)
            if not match:
                print(f"{file} not matched.")
                continue

            subsector = match.group(1)
            csv_path = os.path.join(RAW_BASE, file)

            print(f"Processing {file}")
            df = process_csv(csv_path, subsector)
            write_parquet(df, subsector)


manufacturing - co2e_100yr
aluminum_country_emissions_v5_1_0.csv not matched.
aluminum_emissions_sources_confidence_v5_1_0.csv not matched.
aluminum_emissions_sources_ownership_v5_1_0.csv not matched.
Processing aluminum_emissions_sources_v5_1_0.csv
✔ aluminum: 17,727 rows, 49 columns
cement_country_emissions_v5_1_0.csv not matched.
cement_emissions_sources_confidence_v5_1_0.csv not matched.
cement_emissions_sources_ownership_v5_1_0.csv not matched.
Processing cement_emissions_sources_v5_1_0.csv
✔ cement: 123,120 rows, 49 columns
chemicals_country_emissions_v5_1_0.csv not matched.
chemicals_emissions_sources_confidence_v5_1_0.csv not matched.
chemicals_emissions_sources_ownership_v5_1_0.csv not matched.
Processing chemicals_emissions_sources_v5_1_0.csv
✔ chemicals: 24,795 rows, 49 columns
food-beverage-tobacco_country_emissions_v5_1_0.csv not matched.
food-beverage-tobacco_emissions_sources_confidence_v5_1_0.csv not matched.
Processing food-beverage-tobacco_emissions_sources_v5_1_0.csv

In [None]:
# example query of the parquet files. 
import duckdb
import os

# Path to your local Parquet data
PARQUET_BASE = "parquet/emissions"

# Output master file
MASTER_PARQUET = "facility_master.parquet"

# Initialize DuckDB in-memory connection
con = duckdb.connect()

# Step 1: Find all Parquet files recursively
parquet_files = []
for root, dirs, files in os.walk(PARQUET_BASE):
    for file in files:
        if file.endswith(".parquet"):
            parquet_files.append(os.path.join(root, file))

print(f"Found {len(parquet_files)} Parquet files.")

# Step 2: Build SQL UNION query over all files
# This keeps all columns, adds partition metadata if missing
queries = []
for f in parquet_files:
    # Extract sector and emission_type from folder structure
    parts = f.split(os.sep)
    # Adjust based on your folder layout
    try:
        sector = [p for p in parts if p.startswith("sector=")][0].split("=")[1]
        emission_type = [p for p in parts if p.startswith("emission=")][0].split("=")[1]
    except IndexError:
        sector = None
        emission_type = None

    q = f"""
    SELECT *, '{sector}' AS sector, '{emission_type}' AS emission_type
    FROM '{f}'
    """
    queries.append(q)

full_query = " UNION ALL ".join(queries)

# Step 3: Aggregate if needed (optional: here we just combine all rows)
print("Combining all files into master Parquet...")
con.execute(f"""
CREATE TABLE facility_master AS
{full_query}
""")

# Step 4: Export to Parquet
print(f"Writing master Parquet to {MASTER_PARQUET} ...")
con.execute(f"COPY facility_master TO '{MASTER_PARQUET}' (FORMAT PARQUET);")

print("Done! Master Parquet ready for local analysis or S3 upload.")


Found 204 Parquet files.
Combining all files into master Parquet...
Writing master Parquet to facility_master.parquet ...
Done! Master Parquet ready for local analysis or S3 upload.


In [None]:
res = con.execute("""
SELECT *
FROM facility_master
""").df()
res.head()

In [32]:
res.source_name.unique()

array(['BALASHI', 'TAKAKHIL'], dtype=object)