In [None]:
import pandas as pd
import numpy as np
from sklearn.cluster import DBSCAN
from google.cloud import bigquery
import warnings
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
warnings.filterwarnings('ignore')

class GeographicFireEventClustering:
    def __init__(self, project_id, dataset_id):
        self.client = bigquery.Client(project=project_id)
        self.dataset_id = dataset_id
        self.project_id = project_id

    def haversine_distance(self, lat1, lon1, lat2, lon2):
        """Calculate haversine distance between two points in kilometers"""
        R = 6371  # earths radius in kms
        lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a))
        return R * c

    def km_to_radians(self, km):
        """Convert kilometers to radians for use with haversine metric"""
        R = 6371
        return km / R

    def define_geographic_regions(self):
        """Define strictly non-overlapping geographic regions with clear boundaries"""
        regions = {
            'Pacific_West': {'lat_min': 32.0, 'lat_max': 49.0, 'lon_min': -125.0, 'lon_max': -115.0},
            'Mountain_West': {'lat_min': 32.0, 'lat_max': 49.0, 'lon_min': -115.0, 'lon_max': -105.0},
            'Great_Plains': {'lat_min': 32.0, 'lat_max': 49.0, 'lon_min': -105.0, 'lon_max': -95.0},
            'South_Texas': {'lat_min': 25.0, 'lat_max': 32.0, 'lon_min': -105.0, 'lon_max': -95.0},
            'South_Central': {'lat_min': 25.0, 'lat_max': 37.0, 'lon_min': -95.0, 'lon_max': -85.1},
            'Midwest': {'lat_min': 37.0, 'lat_max': 49.0, 'lon_min': -95.0, 'lon_max': -85.0},
            'Southeast': {'lat_min': 25.0, 'lat_max': 37.0, 'lon_min': -85.1, 'lon_max': -75.0},
            'Northeast': {'lat_min': 37.0, 'lat_max': 49.0, 'lon_min': -85.0, 'lon_max': -67.0}
        }
        return regions

    def get_region_parameters(self, region_name):
        """Get optimized parameters for each region based on fire patterns"""
        region_params = {
            'Pacific_West': {'spatial_eps_km': 0.5, 'temporal_days': 3, 'min_samples': 3},
            'Mountain_West': {'spatial_eps_km': 0.5, 'temporal_days': 3, 'min_samples': 3},
            'Great_Plains': {'spatial_eps_km': 0.3, 'temporal_days': 2, 'min_samples': 3},
            'South_Texas': {'spatial_eps_km': 0.3, 'temporal_days': 2, 'min_samples': 3},
            'South_Central': {'spatial_eps_km': 0.2, 'temporal_days': 1, 'min_samples': 3},
            'Midwest': {'spatial_eps_km': 0.3, 'temporal_days': 2, 'min_samples': 3},
            'Southeast': {'spatial_eps_km': 0.2, 'temporal_days': 1, 'min_samples': 3},
            'Northeast': {'spatial_eps_km': 0.5, 'temporal_days': 3, 'min_samples': 3}
        }
        return region_params.get(region_name, {'spatial_eps_km': 0.5, 'temporal_days': 3, 'min_samples': 3})

    def process_geographic_region(self, year, region_name, region_bounds):
        """Process fire events for a specific geographic region with region-optimized parameters"""

        # region-specific parameters
        params = self.get_region_parameters(region_name)
        spatial_eps_km = params['spatial_eps_km']
        temporal_days = params['temporal_days']
        min_samples = params['min_samples']

        # km to radians for DBSCAN
        spatial_eps_rad = self.km_to_radians(spatial_eps_km)

        table_name = f"emission_{year}"

        print(f"\nProcessing region: {region_name}")
        print(f"   Bounds: {region_bounds}")
        print(f"   Parameters: eps={spatial_eps_km}km ({spatial_eps_rad:.6f} rad), temporal={temporal_days}d, min_samples={min_samples}")

        # query data for this geographic region - strict boundaries
        query = f"""
        SELECT
            id, year, doy, longitude, latitude, fire_date,
            grid10k, covertype, fuelcode, area_burned,
            consumed_fuel, ECO2, burn_source, burnday_source
        FROM `{self.project_id}.{self.dataset_id}.{table_name}`
        WHERE longitude IS NOT NULL
        AND latitude IS NOT NULL
        AND fire_date IS NOT NULL
        AND latitude >= {region_bounds['lat_min']}
        AND latitude < {region_bounds['lat_max']}
        AND longitude >= {region_bounds['lon_min']}
        AND longitude < {region_bounds['lon_max']}
        ORDER BY fire_date, longitude, latitude
        """

        df = self.client.query(query).to_dataframe()

        if len(df) == 0:
            print(f"   No data found for {region_name}")
            return pd.DataFrame()

        print(f"   Found {len(df):,} records")

        # convert fire_date to datetime
        df['fire_date'] = pd.to_datetime(df['fire_date'])

        # apply clustering to this region
        clustered_df = self._cluster_region_data(df, spatial_eps_rad, temporal_days, min_samples)

        if len(clustered_df) > 0:
            # region identifier
            clustered_df['region'] = region_name
            unique_events = len(clustered_df['fire_event_id'].unique())
            print(f"   Found {unique_events} fire events in {region_name}")
            return clustered_df
        else:
            print(f"   No fire events found in {region_name}")
            return pd.DataFrame()

    def _cluster_region_data(self, df, spatial_eps_rad, temporal_days, min_samples):
        """Apply DBSCAN clustering to data from a single geographic region"""

        # apply temporal pre-filtering to break up data into smaller temporal chunks
        # prevents huge spatial clusters from forming across long time periods
        df_sorted = df.sort_values('fire_date').copy()

        # temporal groups (break if gap > temporal_days)
        df_sorted['temporal_group'] = 0
        current_group = 0

        for i in range(1, len(df_sorted)):
            prev_date = df_sorted.iloc[i-1]['fire_date']
            curr_date = df_sorted.iloc[i]['fire_date']

            if (curr_date - prev_date).days > temporal_days:
                current_group += 1

            df_sorted.iloc[i, df_sorted.columns.get_loc('temporal_group')] = current_group

        # apply spatial clustering within each temporal group
        fire_events = []
        isolated_points = []
        event_id = 0

        for temp_group in df_sorted['temporal_group'].unique():
            temp_group_data = df_sorted[df_sorted['temporal_group'] == temp_group].copy()

            # handle small temporal groups differently
            if len(temp_group_data) < min_samples:
                # treat each point as individual fire event (small groups)
                for idx, row in temp_group_data.iterrows():
                    single_event = pd.DataFrame([row])
                    single_event['fire_event_id'] = event_id
                    fire_events.append(single_event)
                    event_id += 1
                continue

            # spatial clustering
            coords = temp_group_data[['longitude', 'latitude']].values
            coords_rad = np.radians(coords)

            dbscan = DBSCAN(eps=spatial_eps_rad, min_samples=min_samples, metric='haversine')
            spatial_labels = dbscan.fit_predict(coords_rad)
            temp_group_data['spatial_cluster'] = spatial_labels

            # spatial cluster within this temporal group becomes a fire event
            for spatial_cluster in temp_group_data['spatial_cluster'].unique():
                if spatial_cluster == -1:  # Handle noise points
                    # convert noise points to individual events
                    noise_data = temp_group_data[temp_group_data['spatial_cluster'] == spatial_cluster]
                    for idx, row in noise_data.iterrows():
                        single_event = pd.DataFrame([row]).drop(['spatial_cluster'], axis=1)
                        single_event['fire_event_id'] = event_id
                        fire_events.append(single_event)
                        event_id += 1
                    continue

                cluster_data = temp_group_data[temp_group_data['spatial_cluster'] == spatial_cluster].copy()
                cluster_data['fire_event_id'] = event_id
                fire_events.append(cluster_data)
                event_id += 1

        if fire_events:
            result_df = pd.concat(fire_events, ignore_index=True)
            # clean up
            columns_to_drop = ['temporal_group', 'spatial_cluster']
            columns_to_drop = [col for col in columns_to_drop if col in result_df.columns]
            if columns_to_drop:
                result_df = result_df.drop(columns_to_drop, axis=1)
            return result_df
        else:
            return pd.DataFrame()

    def process_year_geographic(self, year):
        """Process entire year using geographic chunking approach with region-specific parameters"""

        print(f"GEOGRAPHIC FIRE CLUSTERING for {year}")
        print("Using region-specific optimized parameters (spatial eps in km, converted to radians):")

        regions = self.define_geographic_regions()

        # params for each region
        for region_name in regions.keys():
            params = self.get_region_parameters(region_name)
            spatial_eps_rad = self.km_to_radians(params['spatial_eps_km'])
            print(f"  {region_name}: eps={params['spatial_eps_km']}km ({spatial_eps_rad:.6f} rad), temporal={params['temporal_days']}d")

        print("=" * 80)

        # total original data count for coverage calculation
        total_query = f"""
        SELECT COUNT(*) as total_count
        FROM `{self.project_id}.{self.dataset_id}.emission_{year}`
        WHERE longitude IS NOT NULL
        AND latitude IS NOT NULL
        AND fire_date IS NOT NULL
        """

        total_result = self.client.query(total_query).to_dataframe()
        original_count = total_result['total_count'].iloc[0]

        all_regional_events = []
        global_event_id = 0

        for region_name, region_bounds in regions.items():
            regional_events = self.process_geographic_region(
                year, region_name, region_bounds
            )

            if len(regional_events) > 0:
                # adjust event IDs to be globally unique
                max_regional_id = regional_events['fire_event_id'].max()
                regional_events['fire_event_id'] += global_event_id
                global_event_id += max_regional_id + 1

                all_regional_events.append(regional_events)

        # combine all regions + check for duplicates
        if all_regional_events:
            combined_events = pd.concat(all_regional_events, ignore_index=True)

            # remove duplicate records (same id appearing multiple times)
            initial_count = len(combined_events)
            combined_events = combined_events.drop_duplicates(subset=['id'], keep='first')
            final_count = len(combined_events)

            if initial_count != final_count:
                print(f"WARNING: Removed {initial_count - final_count} duplicate records")

            print(f"\nFINAL RESULTS:")
            print(f"Total fire events: {len(combined_events['fire_event_id'].unique())}")
            print(f"Total data points: {len(combined_events):,}")
            print(f"Original data points: {original_count:,}")
            print(f"Coverage: {len(combined_events)/original_count*100:.1f}% of original data")

            if len(combined_events) > original_count:
                print(f"WARNING: More data points than original - investigating...")

                # check regional totals
                print("Regional data point totals:")
                for events in all_regional_events:
                    region = events['region'].iloc[0]
                    print(f"  {region}: {len(events):,} points")

            return combined_events
        else:
            return pd.DataFrame()

    def analyze_fire_events(self, fire_events_df):
        """Analyze the identified fire events"""
        if len(fire_events_df) == 0:
            return pd.DataFrame()

        # calc fire event stats
        event_stats = fire_events_df.groupby('fire_event_id').agg({
            'id': 'count',
            'fire_date': ['min', 'max'],
            'longitude': ['min', 'max'],
            'latitude': ['min', 'max'],
            'area_burned': 'sum',
            'consumed_fuel': 'sum',
            'ECO2': 'sum',
            'region': 'first'
        }).reset_index()

        # flatten column names
        event_stats.columns = ['fire_event_id', 'num_points', 'start_date', 'end_date',
                              'min_lon', 'max_lon', 'min_lat', 'max_lat',
                              'total_area_burned', 'total_consumed_fuel', 'total_ECO2', 'region']

        # calc duration and spatial extent
        event_stats['duration_days'] = (event_stats['end_date'] - event_stats['start_date']).dt.days + 1
        event_stats['spatial_extent_km'] = event_stats.apply(
            lambda row: self.haversine_distance(row['min_lat'], row['min_lon'],
                                              row['max_lat'], row['max_lon']), axis=1
        )

        return event_stats

    def apply_realistic_fire_limits(self, fire_events_df):
        """Applying better limits based on actual wildfire behavior"""
        if len(fire_events_df) == 0:
            return fire_events_df

        print(f"\nApplying realistic fire event limits:")
        print(f"  Spatial limit: 80km (auto-split), 120km (hard fail)")
        print(f"  Temporal limit: 10 days (split in time)")

        # calc current event statistics
        event_stats = self.analyze_fire_events(fire_events_df)

        # problematic events
        spatial_moderate = event_stats[event_stats['spatial_extent_km'] > 80]['fire_event_id'].tolist()
        spatial_extreme = event_stats[event_stats['spatial_extent_km'] > 120]['fire_event_id'].tolist()
        temporal_long = event_stats[event_stats['duration_days'] > 10]['fire_event_id'].tolist()

        print(f"  Events >80km (auto-split): {len(spatial_moderate)}")
        print(f"  Events >120km (hard fail): {len(spatial_extreme)}")
        print(f"  Events >10 days (time-split): {len(temporal_long)}")

        # hard fail- Drop events >120km (cannot be single fires, basing this off historical data)
        if spatial_extreme:
            print(f"\nHARD FAIL: Dropping {len(spatial_extreme)} events >120km (not realistic single fires)")
            fire_events_df = fire_events_df[~fire_events_df['fire_event_id'].isin(spatial_extreme)]

        # problematic events
        remaining_events = fire_events_df['fire_event_id'].unique()
        spatial_moderate = [e for e in spatial_moderate if e in remaining_events and e not in spatial_extreme]
        temporal_long = [e for e in temporal_long if e in remaining_events and e not in spatial_extreme]

        # process events that need fixing
        events_to_fix = list(set(spatial_moderate + temporal_long))

        if not events_to_fix:
            print(f"No events require fixing")
            return fire_events_df

        print(f"\nFixing {len(events_to_fix)} events...")

        # separate good events from events needing fixes
        good_events = fire_events_df[~fire_events_df['fire_event_id'].isin(events_to_fix)].copy()
        problem_events = fire_events_df[fire_events_df['fire_event_id'].isin(events_to_fix)].copy()

        # new event ID starting point
        max_event_id = fire_events_df['fire_event_id'].max() + 1

        # fix problematic event
        fixed_events = []

        for i, event_id in enumerate(events_to_fix):
            event_data = problem_events[problem_events['fire_event_id'] == event_id].copy()
            region = event_data['region'].iloc[0]

            # event stats
            event_stat = event_stats[event_stats['fire_event_id'] == event_id].iloc[0]
            spatial_extent = event_stat['spatial_extent_km']
            duration = event_stat['duration_days']

            # progress indicator (every 100 events - didnt want to flood logs)
            if i % 100 == 0:
                print(f"  Progress: {i}/{len(events_to_fix)} events processed...")

            # fix strategy
            if spatial_extent > 80:
                # use tight spatial clustering
                tight_spatial_eps = self.km_to_radians(0.1)  # 100 meters
                tight_temporal_days = 1
                tight_min_samples = 2

            elif duration > 10:
                # use tighter temporal windows
                tight_spatial_eps = self.km_to_radians(0.5)
                tight_temporal_days = 2
                tight_min_samples = 2

            else:
                # shouldnt happen, but handle
                tight_spatial_eps = self.km_to_radians(0.2)
                tight_temporal_days = 1
                tight_min_samples = 2

            # recluster
            fixed_data = self._cluster_region_data(event_data, tight_spatial_eps, tight_temporal_days, tight_min_samples)

            if len(fixed_data) > 0:
                # reassign id
                unique_events = fixed_data['fire_event_id'].unique()
                for j, old_id in enumerate(unique_events):
                    fixed_data.loc[fixed_data['fire_event_id'] == old_id, 'fire_event_id'] = max_event_id + j
                max_event_id += len(unique_events)

                fixed_events.append(fixed_data)
            else:
                # convert to individual events
                for idx, row in event_data.iterrows():
                    single_event = pd.DataFrame([row])
                    single_event['fire_event_id'] = max_event_id
                    fixed_events.append(single_event)
                    max_event_id += 1

        # combine
        if fixed_events:
            all_fixed = pd.concat(fixed_events, ignore_index=True)
            final_events = pd.concat([good_events, all_fixed], ignore_index=True)
        else:
            final_events = good_events

        print(f"Realistic limits applied: {len(fire_events_df['fire_event_id'].unique())} → {len(final_events['fire_event_id'].unique())} events")

        return final_events

    def save_results_to_csv(self, fire_events_df, filename):
        """Save results to CSV file"""
        fire_events_df.to_csv(filename, index=False)
        print(f"Results saved to {filename}")

def main():

    # init cluster
    clustering = GeographicFireEventClustering(
        project_id="code-for-planet",
        dataset_id="emission_db"
    )

    # process 2006
    year = 2006
    fire_events = clustering.process_year_geographic(year=year)

    if len(fire_events) > 0:
        # apply realistic fire limits
        fire_events = clustering.apply_realistic_fire_limits(fire_events)

        stats = clustering.analyze_fire_events(fire_events)
        print(f"\nFINAL CLUSTERING RESULTS:")
        print(f"Fire events found: {len(stats)}")
        print(f"Average duration: {stats['duration_days'].mean():.1f} days")
        print(f"Average spatial extent: {stats['spatial_extent_km'].mean():.1f} km")
        print(f"Max spatial extent: {stats['spatial_extent_km'].max():.1f} km")

        # events exceeding realistic limits
        large_events = stats[stats['spatial_extent_km'] > 80]
        very_large_events = stats[stats['spatial_extent_km'] > 120]
        long_events = stats[stats['duration_days'] > 10]

        print(f"\nQUALITY CHECK (Scientifically-grounded limits):")
        print(f"Events >80km: {len(large_events)} (auto-split threshold)")
        print(f"Events >120km: {len(very_large_events)} (hard fail - should be 0)")
        print(f"Events >10 days: {len(long_events)} (temporal split threshold)")

        if len(large_events) > 0:
            print(f"\nDETAILED ANALYSIS OF LARGE EVENTS:")
            for idx, event in large_events.iterrows():
                print(f"\nEvent {event['fire_event_id']} in {event['region']}:")
                print(f"  Spatial extent: {event['spatial_extent_km']:.1f} km")
                print(f"  Duration: {event['duration_days']} days")
                print(f"  Points: {event['num_points']}")
                print(f"  Date range: {event['start_date']} to {event['end_date']}")
                print(f"  Lat range: {event['min_lat']:.3f} to {event['max_lat']:.3f}")
                print(f"  Lon range: {event['min_lon']:.3f} to {event['max_lon']:.3f}")

                # approximate distance components
                lat_dist = abs(event['max_lat'] - event['min_lat']) * 111  # ~111 km per degree
                lon_dist = abs(event['max_lon'] - event['min_lon']) * 111 * np.cos(np.radians((event['max_lat'] + event['min_lat'])/2))
                print(f"  Approximate N-S span: {lat_dist:.1f} km")
                print(f"  Approximate E-W span: {lon_dist:.1f} km")

                # flag based on thresholds
                if event['spatial_extent_km'] > 120:
                    print(f"  STATUS: HARD FAIL - Drop this event (>120km)")
                elif event['spatial_extent_km'] > 80:
                    print(f"  STATUS: AUTO-SPLIT - Re-cluster with tight spatial limits")
                elif event['duration_days'] > 10:
                    print(f"  STATUS: TIME-SPLIT - Re-cluster with tight temporal limits")

            large_by_region = large_events.groupby('region')['fire_event_id'].count()
            print(f"\nLarge events by region:")
            print(large_by_region)

        # save
        clustering.save_results_to_csv(fire_events, f"fire_events_{year}_final.csv")
        clustering.save_results_to_csv(stats, f"fire_event_stats_{year}_final.csv")

        print(f"\nResults saved:")
        print(f"- fire_events_{year}_final.csv")
        print(f"- fire_event_stats_{year}_final.csv")

        # sample
        print(f"\nResults by region:")
        region_summary = stats.groupby('region').agg({
            'fire_event_id': 'count',
            'spatial_extent_km': 'mean',
            'duration_days': 'mean'
        }).round(1)
        print(region_summary)

    else:
        print("No fire events found")

if __name__ == "__main__":
    main()

GEOGRAPHIC FIRE CLUSTERING
Key improvements made over a1 and other previous attempts:
1. DBSCAN eps parameter correctly converted from km to radians
2. Temporal filtering applied BEFORE spatial clustering to prevent chaining
3. Much tighter spatial clustering distances (0.2-0.5 km)
4. Reduced min_samples from 5 to 3 for better coverage
5. Convert noise points and small groups to individual events
6. Conducted better research on what realistic limits are could be to break up events:
   - >80km: Auto-split (re-cluster with tight spatial limits)
   - >120km: Hard fail (drop - cannot be single fire)
   - >10 days: Time-split (re-cluster with tight temporal limits)

GEOGRAPHIC FIRE CLUSTERING for 2006
Using region-specific optimized parameters (spatial eps in km, converted to radians):
  Pacific_West: eps=0.5km (0.000078 rad), temporal=3d
  Mountain_West: eps=0.5km (0.000078 rad), temporal=3d
  Great_Plains: eps=0.3km (0.000047 rad), temporal=2d
  South_Texas: eps=0.3km (0.000047 rad), temp

In [None]:
# QC for clustering

def test_fire_clustering_quality():
    """Comprehensive quality tests for the fire clustering results"""

    print("FIRE CLUSTERING QUALITY TESTS")
    print("=" * 50)

    try:
        fire_events = pd.read_csv("fire_events_2006_final.csv")
        stats = pd.read_csv("fire_event_stats_2006_final.csv")
        print(f"Loaded results: {len(fire_events):,} data points, {len(stats):,} events")
    except FileNotFoundError:
        print("Results files not found. Run main clustering first.")
        return

    print("\n" + "="*50)
    print("TEST 1: SPATIAL REALISM")
    print("="*50)

    spatial_bins = [0, 1, 5, 10, 20, 50, 80, 120, 1000]
    spatial_counts = pd.cut(stats['spatial_extent_km'], bins=spatial_bins, right=False).value_counts().sort_index()

    print("Spatial extent distribution:")
    for interval, count in spatial_counts.items():
        pct = count / len(stats) * 100
        print(f"  {interval}: {count:,} events ({pct:.1f}%)")

    mega_fires = stats[stats['spatial_extent_km'] > 80]
    impossible_fires = stats[stats['spatial_extent_km'] > 120]

    print(f"\nCRITICAL SPATIAL TESTS:")
    print(f"  Events >80km: {len(mega_fires)} (should be 0)")
    print(f"  Events >120km: {len(impossible_fires)} (MUST be 0)")
    print(f"  Max extent: {stats['spatial_extent_km'].max():.1f} km")
    print(f"  95th percentile: {stats['spatial_extent_km'].quantile(0.95):.1f} km")

    if len(impossible_fires) > 0:
        print("FAILED: Found impossible fires >120km!")
        return False
    else:
        print("PASSED: No impossible mega-fires")

    print("\n" + "="*50)
    print("TEST 2: TEMPORAL REALISM")
    print("="*50)

    duration_bins = [0, 1, 2, 3, 5, 7, 10, 15, 30, 365]
    duration_counts = pd.cut(stats['duration_days'], bins=duration_bins, right=False).value_counts().sort_index()

    print("Duration distribution:")
    for interval, count in duration_counts.items():
        pct = count / len(stats) * 100
        print(f"  {interval}: {count:,} events ({pct:.1f}%)")

    very_long = stats[stats['duration_days'] > 30]
    impossible_long = stats[stats['duration_days'] > 365]

    print(f"\nCRITICAL TEMPORAL TESTS:")
    print(f"  Events >30 days: {len(very_long)} (should be rare)")
    print(f"  Events >365 days: {len(impossible_long)} (MUST be 0)")
    print(f"  Max duration: {stats['duration_days'].max()} days")
    print(f"  95th percentile: {stats['duration_days'].quantile(0.95):.1f} days")

    if len(impossible_long) > 0:
        print("FAILED: Found impossible year-long fires!")
        return False
    else:
        print("PASSED: No impossible year-long fires")

    print("\n" + "="*50)
    print("TEST 3: DATA INTEGRITY")
    print("="*50)

    fire_events['fire_date'] = pd.to_datetime(fire_events['fire_date'])

    actual_event_sizes = fire_events.groupby('fire_event_id').size()
    reported_sizes = stats.set_index('fire_event_id')['num_points']

    size_mismatch = (actual_event_sizes != reported_sizes).sum()
    print(f"Event size mismatches: {size_mismatch} (should be 0)")

    events_in_data = set(fire_events['fire_event_id'].unique())
    events_in_stats = set(stats['fire_event_id'].unique())
    orphaned = len(events_in_stats - events_in_data)
    missing = len(events_in_data - events_in_stats)

    print(f"Orphaned events in stats: {orphaned} (should be 0)")
    print(f"Missing events from stats: {missing} (should be 0)")

    null_coords = fire_events[['latitude', 'longitude']].isnull().any(axis=1).sum()
    null_dates = fire_events['fire_date'].isnull().sum()

    print(f"Null coordinates: {null_coords} (should be 0)")
    print(f"Null dates: {null_dates} (should be 0)")

    if size_mismatch == 0 and orphaned == 0 and missing == 0 and null_coords == 0 and null_dates == 0:
        print("PASSED: Data integrity checks")
    else:
        print("FAILED: Data integrity issues found")
        return False

    print("\n" + "="*50)
    print("TEST 4: GEOGRAPHIC COVERAGE")
    print("="*50)

    region_coverage = stats.groupby('region').agg({
        'fire_event_id': 'count',
        'spatial_extent_km': ['mean', 'max'],
        'duration_days': ['mean', 'max'],
        'num_points': 'sum'
    }).round(2)

    print("Regional summary:")
    print(region_coverage)

    expected_regions = {
        'Pacific_West', 'Mountain_West', 'Great_Plains', 'South_Texas',
        'South_Central', 'Midwest', 'Southeast', 'Northeast'
    }
    actual_regions = set(stats['region'].unique())
    missing_regions = expected_regions - actual_regions

    if missing_regions:
        print(f"Missing regions: {missing_regions}")
        return False
    else:
        print("PASSED: All regions represented")

    print("\n" + "="*50)
    print("TEST 5: EDGE CASE VALIDATION")
    print("="*50)

    single_point_events = stats[stats['num_points'] == 1]
    print(f"Single-point events: {len(single_point_events):,} ({len(single_point_events)/len(stats)*100:.1f}%)")

    zero_duration = stats[stats['duration_days'] == 1]
    print(f"Same-day events: {len(zero_duration):,} ({len(zero_duration)/len(stats)*100:.1f}%)")

    zero_spatial = stats[stats['spatial_extent_km'] < 0.001]
    print(f"Point fires (<1m): {len(zero_spatial):,} ({len(zero_spatial)/len(stats)*100:.1f}%)")

    print("\n" + "="*50)
    print("TEST 6: CLUSTERING EFFECTIVENESS")
    print("="*50)

    total_points = len(fire_events)
    total_events = len(stats)
    avg_points_per_event = total_points / total_events

    print(f"Clustering efficiency:")
    print(f"  Total points: {total_points:,}")
    print(f"  Total events: {total_events:,}")
    print(f"  Average points per event: {avg_points_per_event:.1f}")
    print(f"  Reduction factor: {avg_points_per_event:.1f}x")

    event_sizes = stats['num_points'].value_counts().sort_index()
    print(f"\nEvent size distribution:")
    for size in [1, 2, 3, 4, 5]:
        if size in event_sizes.index:
            count = event_sizes[size]
            pct = count / len(stats) * 100
            print(f"  {size} points: {count:,} events ({pct:.1f}%)")

    large_events = stats[stats['num_points'] > 100]
    print(f"  >100 points: {len(large_events):,} events ({len(large_events)/len(stats)*100:.1f}%)")

    if avg_points_per_event > 1.5:
        print("PASSED: Effective clustering achieved")
    else:
        print("WARNING: Very little clustering - mostly individual points")

    print("\n" + "="*50)
    print("OVERALL TEST RESULTS")
    print("="*50)

    print(f"  • {len(stats):,} realistic fire events")
    print(f"  • {stats['spatial_extent_km'].mean():.1f} km average spatial extent")
    print(f"  • {stats['duration_days'].mean():.1f} days average duration")
    print(f"  • {stats['spatial_extent_km'].max():.1f} km maximum extent (realistic)")

    return True

def test_different_year():
    """Test the system on a different year to ensure robustness"""

    print("\n" + "="*50)
    print("ROBUSTNESS TEST: Different Year")
    print("="*50)

    print("Testing system on year 2005 (subset) to ensure robustness...")

    clustering = GeographicFireEventClustering(
        project_id="code-for-planet",
        dataset_id="emission_db"
    )

    region_name = "Southeast"
    region_bounds = {'lat_min': 25.0, 'lat_max': 32.0, 'lon_min': -85.1, 'lon_max': -80.0}

    try:
        regional_events = clustering.process_geographic_region(
            2005, region_name, region_bounds
        )

        if len(regional_events) > 0:
            cleaned_events = clustering.apply_realistic_fire_limits(regional_events)
            stats = clustering.analyze_fire_events(cleaned_events)

            print(f"2005 {region_name} test successful:")
            print(f"  Events found: {len(stats)}")
            print(f"  Max spatial extent: {stats['spatial_extent_km'].max():.1f} km")
            print(f"  Max duration: {stats['duration_days'].max()} days")
            print(f"  Average extent: {stats['spatial_extent_km'].mean():.1f} km")

            if stats['spatial_extent_km'].max() <= 80 and stats['duration_days'].max() <= 365:
                print("PASSED: 2005 data processed correctly")
                return True
            else:
                print("FAILED: 2005 data has unrealistic events")
                return False
        else:
            print("No events found for 2005 test region")
            return True

    except Exception as e:
        print(f"FAILED: 2005 test error: {str(e)}")
        return False

if __name__ == "__main__":
    print("Starting comprehensive fire clustering tests...\n")

    main_tests_passed = test_fire_clustering_quality()

    if main_tests_passed:
        robustness_passed = test_different_year()

        if robustness_passed:
            print("\nALL TESTS PASSED")
        else:
            print("\nMain tests passed but robustness test failed.")
    else:
        print("\nMain tests failed. System needs fixes before production.")

Starting comprehensive fire clustering tests...

FIRE CLUSTERING QUALITY TESTS
Loaded results: 808,558 data points, 64,212 events

TEST 1: SPATIAL REALISM
Spatial extent distribution:
  [0, 1): 50,600 events (78.8%)
  [1, 5): 10,670 events (16.6%)
  [5, 10): 1,748 events (2.7%)
  [10, 20): 949 events (1.5%)
  [20, 50): 228 events (0.4%)
  [50, 80): 17 events (0.0%)
  [80, 120): 0 events (0.0%)
  [120, 1000): 0 events (0.0%)

CRITICAL SPATIAL TESTS:
  Events >80km: 0 (should be 0)
  Events >120km: 0 (MUST be 0)
  Max extent: 70.1 km
  95th percentile: 4.7 km
PASSED: No impossible mega-fires

TEST 2: TEMPORAL REALISM
Duration distribution:
  [0, 1): 0 events (0.0%)
  [1, 2): 50,856 events (79.2%)
  [2, 3): 2,803 events (4.4%)
  [3, 5): 4,418 events (6.9%)
  [5, 7): 2,644 events (4.1%)
  [7, 10): 2,315 events (3.6%)
  [10, 15): 751 events (1.2%)
  [15, 30): 332 events (0.5%)
  [30, 365): 93 events (0.1%)

CRITICAL TEMPORAL TESTS:
  Events >30 days: 93 (should be rare)
  Events >365 days: 

In [None]:
def create_bigquery_tables_from_csv(project_id="code-for-planet", dataset_id="emission_db", year=2006):
    """
    Create BigQuery tables directly from the saved clustering CSV results
    """

    client = bigquery.Client(project=project_id)

    # load from csv
    print("Loading clustering results from CSV...")

    try:
        fire_events = pd.read_csv(f'fire_events_{year}_final.csv')
        print(f"Loaded {len(fire_events):,} clustered fire event records")
        print(f"Covering {len(fire_events['fire_event_id'].unique())} unique fire events")

        # sample
        print(f"Columns in CSV: {list(fire_events.columns)}")
        print(f"Sample data:")
        print(fire_events.head(3))

    except FileNotFoundError:
        print(f"Error: fire_events_{year}_final.csv not found!")
        print("Please run the clustering algorithm first to generate this file.")
        return None

    # create mapping table and join with original data
    print("\n Creating enhanced emission table with fire_event_id...")

    # mapping from clustered results
    id_mapping = fire_events[['id', 'fire_event_id', 'region']].copy()

    # mapping - BigQuery as a temp table
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
    mapping_table_id = f"{project_id}.{dataset_id}.temp_fire_event_mapping_{year}"

    job = client.load_table_from_dataframe(id_mapping, mapping_table_id, job_config=job_config)
    job.result()
    print(f"Uploaded mapping table with {len(id_mapping):,} records")

    # creat table with fire_event_id
    enhanced_table_name = f"emission_{year}_with_fire_events"
    enhanced_table_id = f"{project_id}.{dataset_id}.{enhanced_table_name}"

    create_enhanced_table_query = f"""
    CREATE OR REPLACE TABLE `{enhanced_table_id}` AS
    SELECT
        e.*,
        COALESCE(m.fire_event_id, -1) as fire_event_id,
        m.region as fire_region,
        CASE
            WHEN m.fire_event_id IS NOT NULL THEN 'clustered'
            ELSE 'isolated'
        END as clustering_status
    FROM `{project_id}.{dataset_id}.emission_{year}` e
    LEFT JOIN `{mapping_table_id}` m
    ON e.id = m.id
    """

    job = client.query(create_enhanced_table_query)
    job.result()
    print(f"Created {enhanced_table_name} table")

    # add derived clustering metrics
    print("\nAdding derived clustering metrics...")

    add_metrics_query = f"""
    CREATE OR REPLACE TABLE `{enhanced_table_id}` AS
    WITH fire_event_metrics AS (
        SELECT
            fire_event_id,
            COUNT(*) as event_size_points,
            DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1 as event_duration_days,
            MIN(fire_date) as event_start_date,
            MAX(fire_date) as event_end_date,
            -- Calculate spatial extent using Haversine formula approximation
            ST_DISTANCE(
                ST_GEOGPOINT(MIN(longitude), MIN(latitude)),
                ST_GEOGPOINT(MAX(longitude), MAX(latitude))
            ) / 1000 as event_spatial_extent_km,
            -- Calculate centroid
            AVG(longitude) as event_centroid_lon,
            AVG(latitude) as event_centroid_lat,
            -- Calculate total emissions per event
            SUM(COALESCE(ECO2, 0)) as event_total_ECO2,
            SUM(COALESCE(area_burned, 0)) as event_total_area_burned,
            -- Calculate event spread rate
            CASE
                WHEN DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1 > 1 THEN
                    ST_DISTANCE(
                        ST_GEOGPOINT(MIN(longitude), MIN(latitude)),
                        ST_GEOGPOINT(MAX(longitude), MAX(latitude))
                    ) / 1000 / (DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1)
                ELSE 0
            END as event_spread_rate_km_per_day
        FROM `{enhanced_table_id}`
        WHERE fire_event_id != -1
        GROUP BY fire_event_id
    ),
    point_level_metrics AS (
        SELECT
            e.*,
            -- Distance from each point to event centroid
            CASE
                WHEN e.fire_event_id != -1 THEN
                    ST_DISTANCE(
                        ST_GEOGPOINT(e.longitude, e.latitude),
                        ST_GEOGPOINT(m.event_centroid_lon, m.event_centroid_lat)
                    ) / 1000
                ELSE NULL
            END as distance_to_event_centroid_km,
            -- Days from event start
            CASE
                WHEN e.fire_event_id != -1 THEN
                    DATE_DIFF(e.fire_date, m.event_start_date, DAY)
                ELSE NULL
            END as days_from_event_start,
            -- Add event-level metrics
            m.event_size_points,
            m.event_duration_days,
            m.event_start_date,
            m.event_end_date,
            m.event_spatial_extent_km,
            m.event_centroid_lon,
            m.event_centroid_lat,
            m.event_total_ECO2,
            m.event_total_area_burned,
            m.event_spread_rate_km_per_day,
            -- Quality flags
            CASE
                WHEN e.fire_event_id = -1 THEN 'isolated'
                WHEN m.event_spatial_extent_km > 80 THEN 'large_extent'
                WHEN m.event_duration_days > 30 THEN 'long_duration'
                WHEN m.event_spread_rate_km_per_day > 10 THEN 'fast_spread'
                ELSE 'normal'
            END as fire_event_quality_flag
        FROM `{enhanced_table_id}` e
        LEFT JOIN fire_event_metrics m ON e.fire_event_id = m.fire_event_id
    )
    SELECT * FROM point_level_metrics
    """

    job = client.query(add_metrics_query)
    job.result()
    print("Added derived clustering metrics")

    # create summary statistics table
    print("\nCreating fire events summary table...")

    summary_table_name = f"fire_events_{year}_summary"
    summary_table_id = f"{project_id}.{dataset_id}.{summary_table_name}"

    create_summary_query = f"""
    CREATE OR REPLACE TABLE `{summary_table_id}` AS
    SELECT
        fire_event_id,
        fire_region as region,
        COUNT(*) as num_points,
        MIN(fire_date) as start_date,
        MAX(fire_date) as end_date,
        DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1 as duration_days,
        MIN(longitude) as min_lon,
        MAX(longitude) as max_lon,
        MIN(latitude) as min_lat,
        MAX(latitude) as max_lat,
        SUM(COALESCE(area_burned, 0)) as total_area_burned,
        SUM(COALESCE(consumed_fuel, 0)) as total_consumed_fuel,
        SUM(COALESCE(ECO2, 0)) as total_ECO2,
        SUM(COALESCE(ECO, 0)) as total_ECO,
        SUM(COALESCE(ECH4, 0)) as total_ECH4,
        SUM(COALESCE(EPM2_5, 0)) as total_EPM2_5,
        -- Use the derived metrics from the enhanced table
        AVG(event_spatial_extent_km) as spatial_extent_km,
        AVG(event_spread_rate_km_per_day) as spread_rate_km_per_day,
        AVG(event_centroid_lon) as centroid_lon,
        AVG(event_centroid_lat) as centroid_lat,
        -- Quality flags summary
        COUNT(CASE WHEN fire_event_quality_flag = 'large_extent' THEN 1 END) as large_extent_points,
        COUNT(CASE WHEN fire_event_quality_flag = 'long_duration' THEN 1 END) as long_duration_points,
        COUNT(CASE WHEN fire_event_quality_flag = 'fast_spread' THEN 1 END) as fast_spread_points,
        CURRENT_TIMESTAMP() as created_timestamp
    FROM `{enhanced_table_id}`
    WHERE fire_event_id != -1
    GROUP BY fire_event_id, fire_region
    ORDER BY fire_event_id
    """

    job = client.query(create_summary_query)
    job.result()

    # looking at coverage difference
    print("\nStep 5: Verifying results and investigating coverage...")

    # original emission table count
    original_count_query = f"""
    SELECT COUNT(*) as total_original
    FROM `{project_id}.{dataset_id}.emission_{year}`
    WHERE longitude IS NOT NULL
    AND latitude IS NOT NULL
    AND fire_date IS NOT NULL
    """

    original_result = client.query(original_count_query).to_dataframe()
    original_count = original_result['total_original'].iloc[0]

    # csv counts
    csv_count = len(fire_events)

    # table results
    verification_query = f"""
    SELECT
        COUNT(*) as total_records,
        COUNT(CASE WHEN fire_event_id != -1 THEN 1 END) as records_with_fire_event,
        COUNT(DISTINCT fire_event_id) - 1 as unique_fire_events,
        COUNT(CASE WHEN fire_event_id = -1 THEN 1 END) as unassigned_records
    FROM `{enhanced_table_id}`
    """

    verification_result = client.query(verification_query).to_dataframe()

    print("VERIFICATION RESULTS:")
    print(f"Original emission table (filtered): {original_count:,}")
    print(f"CSV clustering results: {csv_count:,}")
    print(f"Enhanced table total: {verification_result['total_records'].iloc[0]:,}")
    print(f"Records assigned to fire events: {verification_result['records_with_fire_event'].iloc[0]:,}")
    print(f"Unique fire events: {verification_result['unique_fire_events'].iloc[0]:,}")
    print(f"Unassigned records (noise): {verification_result['unassigned_records'].iloc[0]:,}")

    # coverage %s
    total_enhanced = verification_result['total_records'].iloc[0]
    assigned = verification_result['records_with_fire_event'].iloc[0]
    coverage_pct = (assigned / total_enhanced) * 100
    csv_coverage_pct = (csv_count / original_count) * 100

    print(f"\nCOVERAGE ANALYSIS:")
    print(f"CSV coverage vs original: {csv_coverage_pct:.1f}%")
    print(f"Enhanced table coverage: {coverage_pct:.1f}%")
    print(f"Difference: {csv_coverage_pct - coverage_pct:.1f} percentage points")

    # potential issues (QC)
    missing_records = original_count - total_enhanced
    if missing_records > 0:
        print(f"\nPOTENTIAL ISSUE: {missing_records:,} records missing from enhanced table")
        print("This could be due to:")
        print("- Different filtering criteria between clustering and BigQuery")
        print("- Data type conversion issues during join")
        print("- ID mismatches between CSV and original table")

        # ID mismatches
        id_check_query = f"""
        SELECT
            COUNT(DISTINCT e.id) as original_ids,
            COUNT(DISTINCT m.id) as mapping_ids,
            COUNT(DISTINCT e.id) - COUNT(DISTINCT m.id) as missing_from_mapping
        FROM `{project_id}.{dataset_id}.emission_{year}` e
        LEFT JOIN `{mapping_table_id}` m ON e.id = m.id
        WHERE e.longitude IS NOT NULL
        AND e.latitude IS NOT NULL
        AND e.fire_date IS NOT NULL
        """

        id_check_result = client.query(id_check_query).to_dataframe()
        missing_ids = id_check_result['missing_from_mapping'].iloc[0]

        if missing_ids > 0:
            print(f"- {missing_ids:,} IDs from original table not found in CSV mapping")
    else:
        print("\nTable counts match - coverage difference is within the enhanced table")

    # check if the difference is noise/isolated points
    if abs(csv_coverage_pct - coverage_pct) < 1.0:
        print("Coverage difference is small (<1%) - likely due to minor filtering differences")
    else:
        print("Coverage difference is significant - investigation needed")

    # summary table count
    count_query = f"SELECT COUNT(*) as num_events FROM `{summary_table_id}`"
    count_result = client.query(count_query).to_dataframe()

    # clean up temp table
    print("\nCleaning up temporary table...")
    cleanup_query = f"DROP TABLE `{mapping_table_id}`"
    client.query(cleanup_query).result()
    print("Temporary mapping table removed")

    # sample results
    print("\nSample of data...")
    sample_query = f"""
    SELECT
        id, fire_event_id, fire_region, clustering_status,
        event_size_points, event_duration_days, event_spatial_extent_km,
        fire_event_quality_flag, area_burned, ECO2
    FROM `{enhanced_table_id}`
    WHERE fire_event_id != -1
    ORDER BY fire_event_id, fire_date
    LIMIT 5
    """

    sample_result = client.query(sample_query).to_dataframe()
    print(sample_result)

    # summary by region
    print("\nFire events by region:")
    region_query = f"""
    SELECT
        region,
        COUNT(*) as num_events,
        AVG(spatial_extent_km) as avg_extent_km,
        AVG(duration_days) as avg_duration_days
    FROM `{summary_table_id}`
    GROUP BY region
    ORDER BY num_events DESC
    """

    region_result = client.query(region_query).to_dataframe()
    print(region_result)

    print(f"\nSUCCESS! Created BigQuery tables:")
    print(f"Main table: {enhanced_table_name} ({total_enhanced:,} records)")
    print(f"Summary table: {summary_table_name} ({count_result['num_events'].iloc[0]:,} fire events)")

    return enhanced_table_id, summary_table_id

# main that uses csv
def main():
    print("CREATING BIGQUERY TABLES FROM EXISTING CSV RESULTS")
    print("=" * 70)

    try:
        enhanced_table_id, summary_table_id = create_bigquery_tables_from_csv(
            project_id="code-for-planet",
            dataset_id="emission_db",
            year=2006
        )

        print("\nDone - Table Created.")

    except Exception as e:
        print(f"Error: {str(e)}")
        print("Make sure fire_events_2006_final.csv exists in your current directory")

if __name__ == "__main__":
    main()

CREATING BIGQUERY TABLES FROM EXISTING CSV RESULTS
Loading clustering results from CSV...
Loaded 808,558 clustered fire event records
Covering 64212 unique fire events
Columns in CSV: ['id', 'year', 'doy', 'longitude', 'latitude', 'fire_date', 'grid10k', 'covertype', 'fuelcode', 'area_burned', 'consumed_fuel', 'ECO2', 'burn_source', 'burnday_source', 'fire_event_id', 'region']
Sample data:
       id  year  doy  longitude  latitude   fire_date  grid10k  covertype  \
0  648899  2006    1  -121.4373   44.7098  2006-01-01   111137          3   
1  648887  2006    1  -121.4295   44.7067  2006-01-01   111137          3   
2  648892  2006    2  -121.4334   44.7082  2006-01-02   111137          3   

   fuelcode  area_burned  consumed_fuel         ECO2  burn_source  \
0      1180      62500.0    1657.645804  2575.981579            3   
1      1180      62500.0    1335.911902  2076.007096            3   
2      1180      62500.0    1453.302180  2258.431588            3   

   burnday_source  fi

In [None]:
from google.cloud import bigquery

client = bigquery.Client(project="code-for-planet")

# Check if clustered points have metrics populated
query = """
SELECT
  fire_event_id, fire_region, clustering_status,
  event_size_points, event_duration_days, event_spatial_extent_km,
  distance_to_event_centroid_km, days_from_event_start,
  fire_event_quality_flag
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
WHERE fire_event_id != -1 AND fire_event_id IS NOT NULL
LIMIT 10
"""

result = client.query(query).to_dataframe()
print("Sample of clustered points:")
print(result)

# Also check if we have any non-null metrics at all
count_query = """
SELECT
  COUNT(*) as total_rows,
  COUNT(event_size_points) as rows_with_size,
  COUNT(event_duration_days) as rows_with_duration,
  COUNT(distance_to_event_centroid_km) as rows_with_distance
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
WHERE fire_event_id != -1
"""

count_result = client.query(count_query).to_dataframe()
print("\nMetrics population check:")
print(count_result)

Sample of clustered points:
   fire_event_id   fire_region clustering_status  event_size_points  \
0              1  Pacific_West         clustered                  1   
1              2  Pacific_West         clustered                  1   
2              3  Pacific_West         clustered                  1   
3              5  Pacific_West         clustered                  1   
4              8  Pacific_West         clustered                  1   
5              9  Pacific_West         clustered                  1   
6             10  Pacific_West         clustered                  1   
7             11  Pacific_West         clustered                  1   
8             12  Pacific_West         clustered                  1   
9             17  Pacific_West         clustered                  8   

   event_duration_days  event_spatial_extent_km  \
0                    1                  0.00000   
1                    1                  0.00000   
2                    1               

In [None]:
from google.cloud import bigquery

client = bigquery.Client(project="code-for-planet")

print("Converting isolated points to unique events...")

# simple approach - just update the fire_event_id and add the flag
update_query = """
CREATE OR REPLACE TABLE `code-for-planet.emission_db.emission_2006_with_fire_events` AS
SELECT
  -- All original columns except fire_event_id and clustering_status
  * EXCEPT(fire_event_id, clustering_status),

  -- Updated fire_event_id (convert -1 to unique IDs)
  CASE
    WHEN fire_event_id = -1 THEN
      ROW_NUMBER() OVER (ORDER BY id) + 35488  -- Start after max existing ID
    ELSE fire_event_id
  END as fire_event_id,

  -- Add isolated event flag
  CASE WHEN fire_event_id = -1 THEN TRUE ELSE FALSE END as is_isolated_event,

  -- Updated clustering status
  CASE
    WHEN fire_event_id = -1 THEN 'isolated_event'
    ELSE clustering_status
  END as clustering_status

FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
"""

job = client.query(update_query)
job.result()
print("Successfully converted isolated points to unique events!")

# recalculate all the derived metrics for the new events
print("\nRecalculating derived metrics for all events...")

recalc_query = """
CREATE OR REPLACE TABLE `code-for-planet.emission_db.emission_2006_with_fire_events` AS
WITH fire_event_metrics AS (
  SELECT
    fire_event_id,
    COUNT(*) as event_size_points,
    DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1 as event_duration_days,
    MIN(fire_date) as event_start_date,
    MAX(fire_date) as event_end_date,
    ST_DISTANCE(
      ST_GEOGPOINT(MIN(longitude), MIN(latitude)),
      ST_GEOGPOINT(MAX(longitude), MAX(latitude))
    ) / 1000 as event_spatial_extent_km,
    AVG(longitude) as event_centroid_lon,
    AVG(latitude) as event_centroid_lat,
    SUM(COALESCE(ECO2, 0)) as event_total_ECO2,
    SUM(COALESCE(area_burned, 0)) as event_total_area_burned,
    CASE
      WHEN DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1 > 1 THEN
        ST_DISTANCE(
          ST_GEOGPOINT(MIN(longitude), MIN(latitude)),
          ST_GEOGPOINT(MAX(longitude), MAX(latitude))
        ) / 1000 / (DATE_DIFF(MAX(fire_date), MIN(fire_date), DAY) + 1)
      ELSE 0
    END as event_spread_rate_km_per_day
  FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
  GROUP BY fire_event_id
)
SELECT
  -- Original emission data and basic clustering results
  e.* EXCEPT(distance_to_event_centroid_km, days_from_event_start,
             event_size_points, event_duration_days, event_start_date, event_end_date,
             event_spatial_extent_km, event_centroid_lon, event_centroid_lat,
             event_total_ECO2, event_total_area_burned, event_spread_rate_km_per_day,
             fire_event_quality_flag),

  -- Recalculated point-level metrics
  CASE
    WHEN e.is_isolated_event THEN 0.0
    ELSE ST_DISTANCE(
      ST_GEOGPOINT(e.longitude, e.latitude),
      ST_GEOGPOINT(m.event_centroid_lon, m.event_centroid_lat)
    ) / 1000
  END as distance_to_event_centroid_km,

  CASE
    WHEN e.is_isolated_event THEN 0
    ELSE DATE_DIFF(e.fire_date, m.event_start_date, DAY)
  END as days_from_event_start,

  -- Event-level metrics
  m.event_size_points,
  m.event_duration_days,
  m.event_start_date,
  m.event_end_date,
  m.event_spatial_extent_km,
  m.event_centroid_lon,
  m.event_centroid_lat,
  m.event_total_ECO2,
  m.event_total_area_burned,
  m.event_spread_rate_km_per_day,

  -- Quality flags
  CASE
    WHEN e.is_isolated_event THEN 'isolated_fire'
    WHEN m.event_spatial_extent_km > 80 THEN 'large_extent'
    WHEN m.event_duration_days > 30 THEN 'long_duration'
    WHEN m.event_spread_rate_km_per_day > 10 THEN 'fast_spread'
    ELSE 'normal'
  END as fire_event_quality_flag

FROM `code-for-planet.emission_db.emission_2006_with_fire_events` e
LEFT JOIN fire_event_metrics m ON e.fire_event_id = m.fire_event_id
"""

# recalc
job = client.query(recalc_query)
job.result()
print("Recalculated all derived metrics!")

# verify results
print("\nVerifying final results...")
verify_query = """
SELECT
  is_isolated_event,
  clustering_status,
  COUNT(*) as count,
  COUNT(DISTINCT fire_event_id) as unique_events,
  MIN(fire_event_id) as min_id,
  MAX(fire_event_id) as max_id
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
GROUP BY is_isolated_event, clustering_status
ORDER BY is_isolated_event
"""

result = client.query(verify_query).to_dataframe()
print("Final event summary:")
print(result)

# check - should be 0 isolated points (-1 values)
final_check = """
SELECT
  COUNT(*) as total_records,
  COUNT(DISTINCT fire_event_id) as total_unique_events,
  COUNT(CASE WHEN fire_event_id = -1 THEN 1 END) as remaining_minus_one_values
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
"""

final_result = client.query(final_check).to_dataframe()
print("\nFinal summary:")
print(final_result)

Converting isolated points to unique events...
Successfully converted isolated points to unique events!

Recalculating derived metrics for all events...
Recalculated all derived metrics!

Verifying final results...
Final event summary:
   is_isolated_event clustering_status   count  unique_events  min_id  max_id
0              False         clustered  808558          64212       0   69050
1               True    isolated_event    4698           4698   55897  816755

Final summary:
   total_records  total_unique_events  remaining_minus_one_values
0         813256                68781                           0


In [None]:
client = bigquery.Client(project="code-for-planet")

print("Fixing NULL fire_regions for isolated events using original region definitions...")

# update isolated events with fire_regions based on exact original boundaries
fix_regions_query = """
UPDATE `code-for-planet.emission_db.emission_2006_with_fire_events`
SET fire_region = CASE
  WHEN latitude >= 32.0 AND latitude < 49.0 AND longitude >= -125.0 AND longitude < -115.0 THEN 'Pacific_West'
  WHEN latitude >= 32.0 AND latitude < 49.0 AND longitude >= -115.0 AND longitude < -105.0 THEN 'Mountain_West'
  WHEN latitude >= 32.0 AND latitude < 49.0 AND longitude >= -105.0 AND longitude < -95.0 THEN 'Great_Plains'
  WHEN latitude >= 25.0 AND latitude < 32.0 AND longitude >= -105.0 AND longitude < -95.0 THEN 'South_Texas'
  WHEN latitude >= 25.0 AND latitude < 37.0 AND longitude >= -95.0 AND longitude < -85.1 THEN 'South_Central'
  WHEN latitude >= 37.0 AND latitude < 49.0 AND longitude >= -95.0 AND longitude < -85.0 THEN 'Midwest'
  WHEN latitude >= 25.0 AND latitude < 37.0 AND longitude >= -85.1 AND longitude < -75.0 THEN 'Southeast'
  WHEN latitude >= 37.0 AND latitude < 49.0 AND longitude >= -85.0 AND longitude < -67.0 THEN 'Northeast'
  ELSE 'Outside_Regions'
END
WHERE is_isolated_event = TRUE AND fire_region IS NULL
"""

job = client.query(fix_regions_query)
job.result()

# verify the fix
verify_query = """
SELECT
  fire_region,
  COUNT(*) as isolated_events
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
WHERE is_isolated_event = TRUE
GROUP BY fire_region
ORDER BY isolated_events DESC
"""

result = client.query(verify_query).to_dataframe()
print("Fixed! Isolated events by region:")
print(result)

# double-check for any remaining NULLs
null_check_query = """
SELECT
  COUNT(*) as total_isolated,
  COUNT(fire_region) as non_null_regions
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
WHERE is_isolated_event = TRUE
"""

null_check = client.query(null_check_query).to_dataframe()
print("\nFinal NULL check:")
print(null_check)

Fixing NULL fire_regions for isolated events using original region definitions...
Fixed! Isolated events by region:
       fire_region  isolated_events
0  Outside_Regions             4230
1    South_Central              468

Final NULL check:
   total_isolated  non_null_regions
0            4698              4698


In [None]:
client = bigquery.Client(project="code-for-planet")

# NULL check
null_check_query = """
SELECT *
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
WHERE
  id IS NULL OR year IS NULL OR doy IS NULL OR longitude IS NULL OR latitude IS NULL OR
  grid10k IS NULL OR covertype IS NULL OR fuelcode IS NULL OR area_burned IS NULL OR
  prefire_fuel IS NULL OR consumed_fuel IS NULL OR ECO2 IS NULL OR ECO IS NULL OR
  ECH4 IS NULL OR EPM2_5 IS NULL OR cwd_frac IS NULL OR duff_frac IS NULL OR
  fuel_moisture_class IS NULL OR burn_source IS NULL OR burnday_source IS NULL OR
  BSEV IS NULL OR BSEV_flag IS NULL OR fire_date IS NULL OR bi_value IS NULL OR
  fm100_value IS NULL OR pet_value IS NULL OR fm1000_value IS NULL OR pr_value IS NULL OR
  rmax_value IS NULL OR rmin_value IS NULL OR sph_value IS NULL OR srad_value IS NULL OR
  tmmn_value IS NULL OR th_value IS NULL OR tmmx_value IS NULL OR vpd_value IS NULL OR
  vs_value IS NULL OR fire_event_id IS NULL OR fire_region IS NULL OR
  clustering_status IS NULL OR is_isolated_event IS NULL OR
  distance_to_event_centroid_km IS NULL OR days_from_event_start IS NULL OR
  event_size_points IS NULL OR event_duration_days IS NULL OR event_start_date IS NULL OR
  event_end_date IS NULL OR event_spatial_extent_km IS NULL OR event_centroid_lon IS NULL OR
  event_centroid_lat IS NULL OR event_total_ECO2 IS NULL OR event_total_area_burned IS NULL OR
  event_spread_rate_km_per_day IS NULL OR fire_event_quality_flag IS NULL
LIMIT 5
"""

null_result = client.query(null_check_query).to_dataframe()
print(f"Found {len(null_result)} rows with NULL values")
if len(null_result) > 0:
    print("\nFirst few rows with NULLs:")
    print(null_result)
else:
    print("No NULL values found in any columns!")

Found 5 rows with NULL values

First few rows with NULLs:
       id  year  doy  longitude  latitude  grid10k  covertype  fuelcode  \
0  595555  2006  255  -120.4958   49.0878   131902          0         0   
1  595556  2006  255  -120.4925   49.0884   131902          0         0   
2  595558  2006  253  -120.5097   49.0877   131902          0         0   
3  595559  2006  256  -120.5065   49.0883   131902          0         0   
4  595560  2006  256  -120.5032   49.0889   131902          0         0   

   area_burned  prefire_fuel  ...  event_duration_days  event_start_date  \
0          0.0           0.0  ...                    1        2006-09-12   
1          0.0           0.0  ...                    1        2006-09-12   
2      62500.0           0.0  ...                    1        2006-09-10   
3      62500.0           0.0  ...                    1        2006-09-13   
4      62500.0           0.0  ...                    1        2006-09-13   

   event_end_date  event_spatial_e

In [None]:
client = bigquery.Client(project="code-for-planet")

# actual fire_event_id distribution
id_distribution_query = """
SELECT
  MIN(fire_event_id) as min_id,
  MAX(fire_event_id) as max_id,
  COUNT(DISTINCT fire_event_id) as unique_events,
  COUNT(*) as total_records
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
"""

distribution = client.query(id_distribution_query).to_dataframe()
print("Fire Event ID Distribution:")
print(distribution)

# check for gaps in the sequence
gap_check_query = """
WITH consecutive_ids AS (
  SELECT DISTINCT fire_event_id
  FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
  ORDER BY fire_event_id
),
gaps AS (
  SELECT
    fire_event_id,
    LAG(fire_event_id) OVER (ORDER BY fire_event_id) as prev_id,
    fire_event_id - LAG(fire_event_id) OVER (ORDER BY fire_event_id) as gap
  FROM consecutive_ids
)
SELECT fire_event_id, prev_id, gap
FROM gaps
WHERE gap > 1
ORDER BY fire_event_id
LIMIT 10
"""

gaps = client.query(gap_check_query).to_dataframe()
print("\nFirst 10 gaps in fire_event_id sequence:")
print(gaps)

lowest_ids_query = """
SELECT DISTINCT fire_event_id
FROM `code-for-planet.emission_db.emission_2006_with_fire_events`
ORDER BY fire_event_id
LIMIT 20
"""

lowest = client.query(lowest_ids_query).to_dataframe()
print("\nLowest 20 fire_event_ids:")
print(lowest['fire_event_id'].tolist())

Fire Event ID Distribution:
   min_id  max_id  unique_events  total_records
0       0  816755          68781         813256

First 10 gaps in fire_event_id sequence:
   fire_event_id  prev_id  gap
0             16       13    3
1             21       19    2
2             49       47    2
3            699      696    3
4            701      699    2
5            708      706    2
6            711      708    3
7            727      725    2
8            730      728    2
9            733      731    2

Lowest 20 fire_event_ids:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 16, 17, 18, 19, 21, 22]
