This script will make a train/val set up to 100:1 ratio negative to positive, and test set of 10% all data with true distrubtion (4000:1) of negative to positive

In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import uuid
import numpy as np
import pandas as pd

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

# ================== CONFIG ==================
SRC_PARQUET_DIR = "/explore/nobackup/people/spotter5/clelland_fire_ml/parquet_cems_with_fraction_dataset"

OUT_ROOT    = "/explore/nobackup/people/spotter5/clelland_fire_ml"
TRAIN100_DIR = os.path.join(OUT_ROOT, "parquet_cems_trainval_100x")
TEST_DIR     = os.path.join(OUT_ROOT, "parquet_cems_test_true_10pct")

os.makedirs(TRAIN100_DIR, exist_ok=True)
os.makedirs(TEST_DIR, exist_ok=True)

# True global neg:pos ratio you measured:
GLOBAL_NEG_POS_RATIO = 4169.10   # from your previous result

TEST_FRACTION   = 0.10           # 10% test set (true ratio)
TARGET_NEG_POS  = 100.0          # target ~100:1 neg:pos in Train/Val

# Approximate probability of keeping a negative in TrainVal
NEG_KEEP_PROB = min(1.0, TARGET_NEG_POS / GLOBAL_NEG_POS_RATIO)

BATCH_SIZE   = 500_000
RANDOM_STATE = 42

np.random.seed(RANDOM_STATE)

# ================== HELPERS ==================
def write_parquet_chunk(df: pd.DataFrame, root: str, prefix: str):
    """Write a chunk as a standalone Parquet file with a unique name."""
    if df.empty:
        return
    table = pa.Table.from_pandas(df, preserve_index=False)
    fname = f"{prefix}_part_{uuid.uuid4().hex}.parquet"
    fpath = os.path.join(root, fname)
    pq.write_table(table, fpath)

# ================== MAIN ==================
print(f"Scanning source Parquet dataset: {SRC_PARQUET_DIR}")
dataset = ds.dataset(SRC_PARQUET_DIR, format="parquet")

total_rows = 0
test_pos = test_neg = 0
train_pos = train_neg = 0

print(f"\nUsing TEST_FRACTION    = {TEST_FRACTION:.2f}")
print(f"Target TrainVal neg:pos ≈ {TARGET_NEG_POS:.1f}:1")
print(f"Using NEG_KEEP_PROB   ≈ {NEG_KEEP_PROB:.6f} for negatives in TrainVal\n")

for batch_idx, batch in enumerate(dataset.to_batches(batch_size=BATCH_SIZE), start=1):
    if batch.num_rows == 0:
        continue

    df = batch.to_pandas()

    if "fraction" not in df.columns:
        raise ValueError("Column 'fraction' missing in dataset!")

    frac = df["fraction"].to_numpy(dtype="float32")
    burned = frac > 0.5  # True for positives

    n_rows = len(df)
    total_rows += n_rows

    # ----------------- TEST ASSIGNMENT (TRUE RATIO) -----------------
    # Randomly assign ~10% of all rows to test, regardless of class
    u_test = np.random.rand(n_rows)
    is_test = (u_test < TEST_FRACTION)
    not_test = ~is_test

    # TEST set (keeps true ~4169:1 distribution)
    df_test = df[is_test].copy()
    if not df_test.empty:
        df_test["burned"] = burned[is_test].astype("uint8")
        write_parquet_chunk(df_test, TEST_DIR, prefix="test")

        test_pos += burned[is_test].sum()
        test_neg += (~burned[is_test]).sum()

    # ----------------- TRAIN/VAL (TARGET ~100:1) -----------------
    # all positives in TrainVal (from the 90% non-test pool)
    pos_train_mask = not_test & burned

    # negatives in TrainVal, subsampled with NEG_KEEP_PROB
    neg_train_cand_mask = not_test & (~burned)
    u_neg = np.random.rand(n_rows)
    neg_train_mask = neg_train_cand_mask & (u_neg < NEG_KEEP_PROB)

    train_mask = pos_train_mask | neg_train_mask

    df_train = df[train_mask].copy()
    if not df_train.empty:
        df_train["burned"] = burned[train_mask].astype("uint8")
        write_parquet_chunk(df_train, TRAIN100_DIR, prefix="train")

        train_pos += burned[train_mask].sum()
        train_neg += (~burned[train_mask]).sum()

    if batch_idx % 50 == 0:
        print(f"Processed {batch_idx} batches, total_rows so far: {total_rows:,}")

# ================== SUMMARY ==================
print("\n================= SAMPLING SUMMARY =================")
print(f"Total rows scanned              : {total_rows:,}")

print("\n--- TEST SET (10% true distribution) ---")
print(f"Test positives (burned=1)       : {int(test_pos):,}")
print(f"Test negatives (burned=0)       : {int(test_neg):,}")
if test_pos > 0:
    test_neg_pos_ratio = test_neg / test_pos
    print(f"Neg:Pos in TEST                 : {test_neg_pos_ratio:.2f}:1")
    print(f"Percent positives in TEST       : {100 * test_pos / (test_pos + test_neg):.6f} %")

print("\n--- TRAIN/VAL SET (~100:1 target) ---")
print(f"TrainVal positives (burned=1)   : {int(train_pos):,}")
print(f"TrainVal negatives (burned=0)   : {int(train_neg):,}")
if train_pos > 0:
    train_neg_pos_ratio = train_neg / train_pos
    print(f"Neg:Pos in TRAIN/VAL            : {train_neg_pos_ratio:.2f}:1")
    print(f"Percent positives in TRAIN/VAL  : {100 * train_pos / (train_pos + train_neg):.6f} %")

print("\nTrainVal dataset written to:")
print(f"  {TRAIN100_DIR}")
print("Test dataset written to:")
print(f"  {TEST_DIR}")
print("====================================================\n")


Scanning source Parquet dataset: /explore/nobackup/people/spotter5/clelland_fire_ml/parquet_cems_with_fraction_dataset

Using TEST_FRACTION    = 0.10
Target TrainVal neg:pos ≈ 100.0:1
Using NEG_KEEP_PROB   ≈ 0.023986 for negatives in TrainVal

Processed 50 batches, total_rows so far: 1,204,422
Processed 100 batches, total_rows so far: 2,517,301
Processed 150 batches, total_rows so far: 3,880,345
Processed 200 batches, total_rows so far: 5,464,567
Processed 250 batches, total_rows so far: 6,900,785
Processed 300 batches, total_rows so far: 8,280,925
Processed 350 batches, total_rows so far: 9,686,325
Processed 400 batches, total_rows so far: 10,988,053
Processed 450 batches, total_rows so far: 12,426,585
Processed 500 batches, total_rows so far: 13,864,504
Processed 550 batches, total_rows so far: 15,102,539
Processed 600 batches, total_rows so far: 16,392,297
Processed 650 batches, total_rows so far: 17,709,041
Processed 700 batches, total_rows so far: 19,077,236
Processed 750 batches,

Using rules to remove some pixels

In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import numpy as np
import pandas as pd
import pyarrow.dataset as ds

# ================== CONFIG ==================
SRC_PARQUET_DIR = "/explore/nobackup/people/spotter5/clelland_fire_ml/parquet_cems_with_fraction_dataset"

BATCH_SIZE   = 500_000

# ================== MAIN ==================
print(f"Scanning source Parquet dataset: {SRC_PARQUET_DIR}")
dataset = ds.dataset(SRC_PARQUET_DIR, format="parquet")

total_rows_scanned = 0
total_rows_kept = 0
dropped_rows_total = 0

total_pos = 0
total_neg = 0

print("APPLYING FILTERS: RH>80 OR Temp<270 OR FFMC<50 OR ISI<2 -> DROP\n")

# --- OPTIMIZATION: Only read the columns we actually need ---
# This prevents loading unused columns (like lat/lon, year, etc.) into memory,
# making the scan significantly faster.
potential_cols = {
    'fraction', 
    'relative_humidity', 
    'temperature_2m', 
    'fine_fuel_moisture_code', 
    'initial_spread_index'
    'build_up_index'
}
# Only request columns that actually exist in the dataset
available_cols = set(dataset.schema.names)
read_cols = list(potential_cols.intersection(available_cols))

print(f"Optimization: Reading only {len(read_cols)} columns: {read_cols}")

if 'fraction' not in read_cols:
    raise ValueError("Critical error: 'fraction' column not found in dataset.")

for batch_idx, batch in enumerate(dataset.to_batches(batch_size=BATCH_SIZE, columns=read_cols), start=1):
    if batch.num_rows == 0:
        continue

    df = batch.to_pandas()
    n_rows_batch = len(df)
    total_rows_scanned += n_rows_batch

    # ----------------- APPLY FILTERS -----------------
    # Remove pixels with relative_humidity >80 OR temperature_2m <270 
    # OR fine_fuel_moisture_code < 50 OR initial_spread_index <2
    
    mask_drop = pd.Series(False, index=df.index)
    
    # Check columns existence (safe because we intersected above, but good practice)
    if 'relative_humidity' in df.columns:
        mask_drop |= (df['relative_humidity'] > 80)
    if 'temperature_2m' in df.columns:
        mask_drop |= (df['temperature_2m'] < 270)
    if 'fine_fuel_moisture_code' in df.columns:
        mask_drop |= (df['fine_fuel_moisture_code'] < 50)
    if 'initial_spread_index' in df.columns:
        mask_drop |= (df['initial_spread_index'] < 2)
    if 'build_up_index' in df.columns:
        mask_drop |= (df['initial_spread_index'] < 20)
        
    df_filtered = df[~mask_drop].copy()
    
    n_kept = len(df_filtered)
    n_dropped = n_rows_batch - n_kept
    dropped_rows_total += n_dropped
    total_rows_kept += n_kept
    
    if df_filtered.empty:
        continue
        
    df = df_filtered

    frac = df["fraction"].to_numpy(dtype="float32")
    burned = frac > 0.5  # True for positives

    # Count classes in filtered batch
    n_pos_batch = burned.sum()
    n_neg_batch = (~burned).sum()

    total_pos += n_pos_batch
    total_neg += n_neg_batch

    if batch_idx % 50 == 0:
        print(f"Processed {batch_idx} batches. Scanned: {total_rows_scanned:,} | Kept: {total_rows_kept:,}")

# ================== SUMMARY ==================
print("\n================= FILTERED DATASET SUMMARY =================")
print(f"Total rows scanned              : {total_rows_scanned:,}")
print(f"Total rows dropped (filters)    : {dropped_rows_total:,} ({dropped_rows_total/max(1, total_rows_scanned)*100:.2f}%)")
print(f"Total rows kept                 : {total_rows_kept:,}")

print("\n--- CLASS DISTRIBUTION (After Filters) ---")
print(f"Total Positives (burned=1)      : {int(total_pos):,}")
print(f"Total Negatives (burned=0)      : {int(total_neg):,}")

if total_pos > 0:
    ratio = total_neg / total_pos
    print(f"Neg:Pos Ratio                   : {ratio:.4f}:1")
    print(f"Percent Positives               : {100 * total_pos / (total_pos + total_neg):.6f} %")
else:
    print("Neg:Pos Ratio                   : N/A (0 positives)")

print("============================================================\n")

Scanning source Parquet dataset: /explore/nobackup/people/spotter5/clelland_fire_ml/parquet_cems_with_fraction_dataset
APPLYING FILTERS: RH>80 OR Temp<270 OR FFMC<50 OR ISI<2 -> DROP

Optimization: Reading only 4 columns: ['temperature_2m', 'fraction', 'relative_humidity', 'fine_fuel_moisture_code']
Processed 50 batches. Scanned: 1,204,422 | Kept: 677,327
Processed 100 batches. Scanned: 2,517,301 | Kept: 1,302,559
Processed 150 batches. Scanned: 3,880,345 | Kept: 1,856,931
Processed 250 batches. Scanned: 6,900,785 | Kept: 2,622,706
Processed 300 batches. Scanned: 8,280,925 | Kept: 3,417,887
Processed 350 batches. Scanned: 9,686,325 | Kept: 4,343,765
Processed 400 batches. Scanned: 10,988,053 | Kept: 5,031,609
Processed 450 batches. Scanned: 12,426,585 | Kept: 5,692,619
Processed 500 batches. Scanned: 13,864,504 | Kept: 6,196,319
Processed 550 batches. Scanned: 15,102,539 | Kept: 6,724,356
Processed 600 batches. Scanned: 16,392,297 | Kept: 7,441,233
Processed 650 batches. Scanned: 17,70