In [None]:
# ============================================
# 0) IMPORTS
# ============================================
import os
import glob
import pandas as pd
import numpy as np
import shapely.wkt
import shapely.geometry
import geopandas as gpd
from affine import Affine
import rasterio.features
from tqdm import tqdm  # Progress bar
import gc  # Garbage Collector for memory management

import ee

# ============================================
# 1) CONFIGURATION
# ============================================

# --- INPUT: The consolidated PKL from step 02a ---
INPUT_PKL = r"D:\Development\RESEARCH\urban_flood_database\chronicle\hydromerit_pluvial_outputs\chronicle_df_with_pfdi_FULL.pkl"

# --- OUTPUT: Where rain data will be saved ---
OUT_DIR = r"D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs"
OUT_FINAL_PKL = os.path.join(OUT_DIR, "chronicle_urban_df_with_IMERG_FULL.pkl")

# IMERG Constants
# Data available from June 2000
IMERG_START_DATE = pd.Timestamp("2000-06-01") 
SCALE = 0.1  # 0.1 Degree resolution
CRS = 'EPSG:4326'

# Batch Settings
# Keeping it safe at 100 to avoid memory overflow with 3D arrays
BATCH_SIZE = 1000 
N_BATCHES_TO_RUN = 350  # Limit execution to 30 batches (3000 events)

# ============================================
# 2) HELPERS
# ============================================

def ensure_out_dir(path):
    """Create output directory if it doesn't exist."""
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)

def initialize_ee():
    """Initialize Earth Engine."""
    try:
        ee.Initialize()
        print("Earth Engine initialized.")
    except Exception:
        print("Authenticating Earth Engine...")
        ee.Authenticate()
        ee.Initialize()
        print("Earth Engine initialized after auth.")

def get_next_batch_index(out_dir):
    """
    Scans output directory for 'imerg_batch_XXXX.pkl' to determine 
    the next batch number for file naming.
    """
    if not os.path.exists(out_dir):
        return 0
    
    pattern = os.path.join(out_dir, "imerg_batch_*.pkl")
    files = glob.glob(pattern)
    
    if not files:
        return 0
    
    max_batch = -1
    for f in files:
        try:
            filename = os.path.basename(f)
            # filename format: imerg_batch_0001.pkl
            num_part = filename.split('_')[-1].split('.')[0]
            batch_num = int(num_part)
            if batch_num > max_batch:
                max_batch = batch_num
        except ValueError:
            continue
            
    return max_batch + 1

def extract_rain_data(event_row):
    """
    Extracts IMERG rain matrix, metadata, and mask for a single event.
    
    Time Window Logic:
    - Start: 72 hours BEFORE event start.
    - End: 24 hours AFTER event end.
    
    Returns: (rain_matrix, rain_mask, rain_meta)
    """
    # 1. Geometry Setup
    try:
        # Load geometry from WKT
        if isinstance(event_row['geometry_wkt'], str):
            poly_geom = shapely.wkt.loads(event_row['geometry_wkt'])
        else:
            # Fallback if it's already a geometry object
            poly_geom = event_row['geometry_wkt']
            
        bounds = poly_geom.bounds # (minx, miny, maxx, maxy)
        roi = ee.Geometry.BBox(*bounds)
        
        # 2. Time Window Calculation
        # UPDATED: Taking 72 hours prior to the start time
        start_t = event_row['start_time'] - pd.Timedelta(hours=72)
        # We keep a buffer after the end time as well (e.g., 24 hours)
        end_t = event_row['end_time'] + pd.Timedelta(hours=24) 
        
        # 3. GEE Request
        imerg_coll = ee.ImageCollection("NASA/GPM_L3/IMERG_V06") \
            .select('precipitationCal') \
            .filterBounds(roi) \
            .filterDate(start_t, end_t)
        
        # Check if collection is empty
        if imerg_coll.size().getInfo() == 0:
            return None, None, None

        # 4. Download Data (sampleRectangle)
        # Convert collection to a multi-band image (each band is a time step)
        stack = imerg_coll.toBands()
        
        # sampleRectangle downloads the raw pixels within the BBox
        pixel_dict = stack.sampleRectangle(region=roi).getInfo()
        properties = pixel_dict['properties']
        
        # 5. Parse & Stack Arrays
        # Keys are like '20000604120000_precipitationCal'
        band_keys = sorted(list(properties.keys()))
        
        arrays_list = []
        for b in band_keys:
            # Convert list to numpy array (float32 to save memory)
            arr = np.array(properties[b], dtype=np.float32)
            arrays_list.append(arr)
            
        # Stack into 3D Array: (Time, Height, Width)
        rain_matrix = np.stack(arrays_list)
        
        # 6. Create Metadata (Anchor)
        height, width = rain_matrix.shape[1], rain_matrix.shape[2]
        min_lon, min_lat, max_lon, max_lat = bounds
        
        # Transform for Rasterio (Lat/Lon)
        # Note: We align to the BBox top-left
        transform = Affine(SCALE, 0, min_lon, 0, -SCALE, max_lat)
        
        meta = {
            'origin_top_left': (max_lat, min_lon), # (Lat, Lon)
            'scale': SCALE,
            'shape': (height, width),
            'timestamps': band_keys # Store timestamp keys to map matrix layers to time
        }

        # 7. Create Binary Mask (Polygon shape on grid)
        # 1 = Inside Polygon, 0 = Outside
        mask = rasterio.features.rasterize(
            [(poly_geom, 1)],
            out_shape=(height, width),
            transform=transform,
            fill=0,
            all_touched=True,
            dtype=np.uint8
        )
        
        return rain_matrix, mask, meta

    except Exception as e:
        # If extraction fails (e.g., GEE timeout), return None
        return None, None, None

# ============================================
# 3) INITIALIZATION
# ============================================
initialize_ee()
ensure_out_dir(OUT_DIR)

# Load Input Data (The consolidated file from 02a)
print(f"Loading full dataset: {INPUT_PKL}")
if not os.path.exists(INPUT_PKL):
    raise FileNotFoundError(f"Input file not found: {INPUT_PKL}. Please run 02a first.")

df = pd.read_pickle(INPUT_PKL)

# Basic cleaning
df = df.replace([np.inf, -np.inf], np.nan)
df['start_time'] = pd.to_datetime(df['start_time'], unit='s')
df['end_time'] = pd.to_datetime(df['end_time'], unit='s')

# Filter for IMERG Era (Post June 2000)
# Events before this date will not have IMERG data
df_valid = df[df['start_time'] >= IMERG_START_DATE].copy()

print(f"Total events in input: {len(df)}")
print(f"Events valid for IMERG (post-2000): {len(df_valid)}")

# ============================================
# 4) SMART BATCH PROCESSING (ID-BASED)
# ============================================

print(f"--- PREPARING WORK PLAN ---")

# 1. Identify what is already done
# We scan the output directory for existing RAIN batches to avoid re-processing
processed_ids = set()
pkl_pattern = os.path.join(OUT_DIR, "imerg_batch_*.pkl")
existing_files = glob.glob(pkl_pattern)

if existing_files:
    print(f"Found {len(existing_files)} existing batch files. Scanning for processed IDs...")
    for f in tqdm(existing_files, desc="Indexing existing data"):
        try:
            # Only read columns needed for ID check to save memory
            df_temp = pd.read_pickle(f)
            if 'event_id' in df_temp.columns:
                processed_ids.update(df_temp['event_id'].tolist())
            del df_temp
        except Exception as e:
            print(f"Skipping corrupted file {f}: {e}")

print(f"Total events already processed: {len(processed_ids)}")

# 2. Filter the Main DataFrame
# We keep only rows whose ID is NOT in the processed set
df_todo = df_valid[~df_valid['event_id'].isin(processed_ids)].copy()

print(f"Events remaining to process: {len(df_todo)}")

if len(df_todo) == 0:
    print("All events are already processed! Nothing to do.")
else:
    # 3. Process the remaining rows in new batches
    start_batch_num = get_next_batch_index(OUT_DIR)
    n_remaining = len(df_todo)
    
    # Calculate stop limit based on N_BATCHES_TO_RUN
    max_rows_limit = N_BATCHES_TO_RUN * BATCH_SIZE
    stop_at_row = min(n_remaining, max_rows_limit)

    print(f"Plan: Processing {min(N_BATCHES_TO_RUN, n_remaining // BATCH_SIZE + 1)} batches.")
    
    # Iterate in chunks up to stop_at_row
    for batch_i in range(0, stop_at_row, BATCH_SIZE):
        
        # Determine actual batch number for filename
        current_file_num = start_batch_num + (batch_i // BATCH_SIZE)
        
        # Slice the TODO dataframe
        batch_df = df_todo.iloc[batch_i : batch_i + BATCH_SIZE].copy()
        
        print(f"\nProcessing Batch {current_file_num} ({len(batch_df)} events)...")
        
        matrices = []
        masks = []
        metas = []
        
        # Inner loop: iterate rows in current batch
        for idx, row in tqdm(batch_df.iterrows(), total=len(batch_df), desc=f"Batch {current_file_num}"):
            mat, msk, mt = extract_rain_data(row)
            matrices.append(mat)
            masks.append(msk)
            metas.append(mt)
        
        # Assign results to columns
        batch_df['imerg_matrix'] = matrices
        batch_df['imerg_mask'] = masks
        batch_df['imerg_meta'] = metas
        
        # Save batch to disk (Pickle)
        out_path = os.path.join(OUT_DIR, f"imerg_batch_{current_file_num:04d}.pkl")
        batch_df.to_pickle(out_path)
        print(f"Saved: {out_path}")
        
        # === MEMORY CLEANUP ===
        # Explicitly delete large objects to free RAM for next iteration
        del batch_df
        del matrices
        del masks
        del metas
        gc.collect() # Force garbage collection
        # ======================

    print("\n--- BATCH LIMIT REACHED ---")
    print(f"Stopped execution after {N_BATCHES_TO_RUN} batches as requested.")

Earth Engine initialized.
Loading full dataset: D:\Development\RESEARCH\urban_flood_database\chronicle\hydromerit_pluvial_outputs\chronicle_df_with_pfdi_FULL.pkl
Total events in input: 882957
Events valid for IMERG (post-2000): 882661
--- PREPARING WORK PLAN ---
Found 676 existing batch files. Scanning for processed IDs...


Indexing existing data:  67%|█████████████████████████████████████▌                  | 454/676 [00:15<00:09, 23.85it/s]

Skipping corrupted file D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0451.pkl: Ran out of input


Indexing existing data: 100%|████████████████████████████████████████████████████████| 676/676 [00:24<00:00, 27.77it/s]


Total events already processed: 634500
Events remaining to process: 248161
Plan: Processing 249 batches.

Processing Batch 676 (1000 events)...



Attention required for NASA/GPM_L3/IMERG_V06! You are using a deprecated asset.
To make sure your code keeps working, please update it.
This dataset has been superseded by NASA/GPM_L3/IMERG_V07

Learn more: https://developers.google.com/earth-engine/datasets/catalog/NASA_GPM_L3_IMERG_V06

Batch 676: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:45<00:00,  1.55it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0676.pkl

Processing Batch 677 (1000 events)...


Batch 677: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:06<00:00,  1.65it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0677.pkl

Processing Batch 678 (1000 events)...


Batch 678: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:04<00:00,  1.66it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0678.pkl

Processing Batch 679 (1000 events)...


Batch 679: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:55<00:00,  1.68it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0679.pkl

Processing Batch 680 (1000 events)...


Batch 680: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:02<00:00,  1.66it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0680.pkl

Processing Batch 681 (1000 events)...


Batch 681: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:02<00:00,  1.66it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0681.pkl

Processing Batch 682 (1000 events)...


Batch 682: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:10<00:00,  1.64it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0682.pkl

Processing Batch 683 (1000 events)...


Batch 683: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:07<00:00,  1.65it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0683.pkl

Processing Batch 684 (1000 events)...


Batch 684: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:15<00:00,  1.63it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0684.pkl

Processing Batch 685 (1000 events)...


Batch 685: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:28<00:00,  1.59it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0685.pkl

Processing Batch 686 (1000 events)...


Batch 686: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:26<00:00,  1.60it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0686.pkl

Processing Batch 687 (1000 events)...


Batch 687: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:58<00:00,  1.52it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0687.pkl

Processing Batch 688 (1000 events)...


Batch 688: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:10<00:00,  1.64it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0688.pkl

Processing Batch 689 (1000 events)...


Batch 689: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:50<00:00,  1.69it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0689.pkl

Processing Batch 690 (1000 events)...


Batch 690: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:56<00:00,  1.68it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0690.pkl

Processing Batch 691 (1000 events)...


Batch 691: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:08<00:00,  1.64it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0691.pkl

Processing Batch 692 (1000 events)...


Batch 692: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:14<00:00,  1.63it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0692.pkl

Processing Batch 693 (1000 events)...


Batch 693: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:45<00:00,  1.55it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0693.pkl

Processing Batch 694 (1000 events)...


Batch 694: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:02<00:00,  1.66it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0694.pkl

Processing Batch 695 (1000 events)...


Batch 695: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:51<00:00,  1.69it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0695.pkl

Processing Batch 696 (1000 events)...


Batch 696: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:12<00:00,  1.63it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0696.pkl

Processing Batch 697 (1000 events)...


Batch 697: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:03<00:00,  1.66it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0697.pkl

Processing Batch 698 (1000 events)...


Batch 698: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:59<00:00,  1.67it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0698.pkl

Processing Batch 699 (1000 events)...


Batch 699: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [10:45<00:00,  1.55it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0699.pkl

Processing Batch 700 (1000 events)...


Batch 700: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:59<00:00,  1.67it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0700.pkl

Processing Batch 701 (1000 events)...


Batch 701: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:56<00:00,  1.68it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0701.pkl

Processing Batch 702 (1000 events)...


Batch 702: 100%|███████████████████████████████████████████████████████████████████| 1000/1000 [09:54<00:00,  1.68it/s]


Saved: D:\Development\RESEARCH\urban_flood_database\chronicle\imerg_rain_outputs\imerg_batch_0702.pkl

Processing Batch 703 (1000 events)...


Batch 703:  20%|█████████████▊                                                      | 203/1000 [02:00<07:36,  1.75it/s]

In [None]:
# ============================================
# 5) FINAL MERGE & CLEANUP
# ============================================
print("\n--- FINALIZING ---")
print("Merging all batch files...")

pkl_pattern = os.path.join(OUT_DIR, "imerg_batch_*.pkl")
all_pkl_files = glob.glob(pkl_pattern)

if not all_pkl_files:
    print("No output files found.")
else:
    # Concatenate all batches
    df_list = []
    for f in tqdm(all_pkl_files, desc="Loading Batches"):
        try:
            df_list.append(pd.read_pickle(f))
        except Exception as e:
            print(f"Error loading {f}: {e}")
            
    if df_list:
        # This DataFrame contains ONLY the processed events with rain data
        df_results = pd.concat(df_list, ignore_index=True)
        
        print(f"Loaded {len(df_results)} processed rain events (raw count).")
        print("Merging results back to main dataset structure...")
        
        # Reload the original input again to ensure we have the clean base columns
        df_base = pd.read_pickle(INPUT_PKL)
        
        # Merge: 'inner' keeps only keys that appear in BOTH DataFrames
        df_final = df_base.merge(
            df_results[['event_id', 'imerg_matrix', 'imerg_mask', 'imerg_meta']], 
            on='event_id', 
            how='inner' 
        )
        
        # --- NEW: FILTER OUT NULL RAIN DATA ---
        
        # 1. Ensure start_time is datetime format for readable printing
        if 'start_time' in df_final.columns:
            df_final['start_time'] = pd.to_datetime(df_final['start_time'], unit='s')

        # 2. Identify rows where 'imerg_matrix' is Null/None (Values of 0 are NOT null, so they stay)
        missing_rain_mask = df_final['imerg_matrix'].isnull()
        missing_events = df_final[missing_rain_mask]
        
        # 3. Print dates of dropped events
        if not missing_events.empty:
            print(f"\n[WARNING] Found {len(missing_events)} events with NULL rain data. Removing them...")
            print("--- Dropped Events Log ---")
            for idx, row in missing_events.iterrows():
                try:
                    d_str = row['start_time'].strftime('%d-%m-%Y %H:%M')
                except:
                    d_str = str(row['start_time'])
                print(f"Removing ID: {row['event_id']} | Date: {d_str}")
            
            # 4. Perform the drop
            df_final = df_final[~missing_rain_mask].copy()
            print(f"--- Cleaned. Remaining events: {len(df_final)} ---")
        else:
            print("No NULL rain events found. All processed events are valid.")

        # --------------------------------------

        # Save Final PKL (Cleaned subset)
        df_final.to_pickle(OUT_FINAL_PKL)
        
        print(f"SUCCESS! Final dataset saved to: {OUT_FINAL_PKL}")
        
        # Final Verification
        if 'imerg_matrix' in df_final.columns:
            count = df_final['imerg_matrix'].notnull().sum()
            print(f"Verified valid Rain Data events: {count}")
    else:
        print("Failed to load any batch files.")

In [None]:
df_final