In [None]:
# If using Colab
from google.colab import drive

drive.mount('/content/drive')
FIRED_DATAPATH = '<YOUR_DIRECTORY_PATH>'

Only run above cell if in Colab

In [None]:
import pandas as pd
import numpy as np
import geopandas as gpd
import rasterio
from rasterio.sample import sample_gen
from shapely.wkt import loads
from shapely.geometry import Point, MultiPolygon
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
from tqdm import tqdm
import os

print("Loading FIRED CONUS dataset...")
fired_conus_ak_df = pd.read_csv('<>YOUR_FILEPATH')

df = fired_conus_ak_df.copy()

ignition_data = []

fire_ids = df['id'].unique()
print(f"Found {len(fire_ids)} unique fire events")


ELEV_DATA_DIR = 'YOUR_ELEVATION_DATA_DIRECTORY'

dem_path = "YOUR_FILEPATH"
slope_deg_path = "YOUR_FILEPATH"
slope_pct_path = "YOUR_FILEPATH"


if os.path.exists(dem_path):
    try:
        dem = rasterio.open(dem_path)
        print(f"DEM loaded from file: {dem_path}")
        dem_crs = dem.crs
        print(f"DEM CRS: {dem_crs}")
    except Exception as e:
        print(f"Error loading DEM: {e}")
        dem = None
else:
    print(f"DEM file not found at {dem_path}")
    dem = None

if os.path.exists(slope_deg_path):
    try:
        slope_deg = rasterio.open(slope_deg_path)
        print(f"Slope Degrees loaded from file: {slope_deg_path}")
        slope_deg_crs = slope_deg.crs
        print(f"Slope Degrees CRS: {slope_deg_crs}")
    except Exception as e:
        print(f"Error loading Slope Degrees: {e}")
        dem = None
else:
    print(f"Slope Degrees file not found at {slope_deg_path}")
    slope_deg = None

if os.path.exists(slope_pct_path):
    try:
        slope_pct = rasterio.open(slope_pct_path)
        print(f"Slope Pct loaded from file: {slope_pct_path}")
        slope_pct_crs = slope_pct.crs
        print(f"Slope Pct CRS: {slope_pct_crs}")
    except Exception as e:
        print(f"Error loading Slope Pct: {e}")
        slope_pct = None
else:
    print(f"Slope Pct file not found at {slope_pct_path}")
    dem = None

# Extract data

In [None]:
if dem is not None and slope_deg is not None and slope_pct is not None:
    for fire_id in tqdm(fire_ids):
        fire_data = df[df['id'] == fire_id].sort_values('event_dur')
        if len(fire_data) == 0:
            continue

        ignition = fire_data.iloc[0]

        if 'geometry' in ignition and pd.notna(ignition['geometry']):
            geom = loads(ignition['geometry']) if isinstance(ignition['geometry'], str) else ignition['geometry']
        elif 'ig_utm_x' in ignition and 'ig_utm_y' in ignition:
            geom = Point(ignition['ig_utm_x'], ignition['ig_utm_y'])
        else:
            continue

        point_gdf = gpd.GeoDataFrame(geometry=[geom], crs="EPSG:3857")
        point_gdf = point_gdf.to_crs(dem.crs)
        ignition_geom = point_gdf.geometry.iloc[0]

        try:
            if isinstance(ignition_geom, Point):
                x, y = ignition_geom.x, ignition_geom.y
            elif isinstance(ignition_geom, (MultiPolygon)):
                centroid = ignition_geom.centroid
                x, y = centroid.x, centroid.y
            else:
                rep_point = ignition_geom.representative_point()
                x, y = rep_point.x, rep_point.y

            # Sample elevation
            elev_vals = list(sample_gen(dem, [(x, y)]))
            ignition_elev = float(elev_vals[0][0]) if elev_vals and elev_vals[0][0] is not None else None
            if ignition_elev is None:
                continue

            # Sample slope (deg)
            slope_deg_val = list(sample_gen(slope_deg, [(x, y)]))
            ignition_slope_deg = float(slope_deg_val[0][0]) if slope_deg_val and slope_deg_val[0][0] is not None else np.nan

            # Sample slope (%)
            slope_pct_val = list(sample_gen(slope_pct, [(x, y)]))
            ignition_slope_pct = float(slope_pct_val[0][0]) if slope_pct_val and slope_pct_val[0][0] is not None else np.nan

            # Clip outliers (optional)
            ignition_slope_deg = np.clip(ignition_slope_deg, 0, 60)
            ignition_slope_pct = np.clip(ignition_slope_pct, 0, 100)

            # Elevation buffer
            centroid = ignition_geom.centroid
            buffer_size = 1000
            buffer_geom = centroid.buffer(buffer_size / 111000)
            buffer_gdf = gpd.GeoDataFrame(geometry=[buffer_geom], crs=dem.crs)
            minx, miny, maxx, maxy = buffer_gdf.total_bounds

            x_points = np.linspace(minx, maxx, 10)
            y_points = np.linspace(miny, maxy, 10)

            buffer_elevations = []
            for x_grid in x_points:
                for y_grid in y_points:
                    point = Point(x_grid, y_grid)
                    if buffer_geom.contains(point):
                        elev = list(sample_gen(dem, [(x_grid, y_grid)]))
                        if elev and elev[0][0] is not None:
                            buffer_elevations.append(float(elev[0][0]))

            if buffer_elevations:
                mean_elev = np.mean(buffer_elevations)
                min_elev = np.min(buffer_elevations)
                max_elev = np.max(buffer_elevations)
                std_elev = np.std(buffer_elevations)
                range_elev = max_elev - min_elev

                ignition_data.append({
                    'fire_id': fire_id,
                    'ig_date': ignition.get('ig_date'),
                    'fsr_km2_dy': ignition.get('fsr_km2_dy'),
                    'tot_ar_km2': ignition.get('tot_ar_km2'),
                    'event_dur': ignition.get('event_dur'),
                    'ignition_elev': ignition_elev,
                    'mean_elev': mean_elev,
                    'min_elev': min_elev,
                    'max_elev': max_elev,
                    'std_elev': std_elev,
                    'range_elev': range_elev,
                    'land_cover': ignition.get('lc_name'),
                    'ecoregion': ignition.get('eco_name'),
                    'ignition_slope_deg': ignition_slope_deg,
                    'ignition_slope_pct': ignition_slope_pct
                })

        except Exception as e:
            print(f"Error processing fire ID {fire_id}: {e}")
            continue
else:
    print("Cannot proceed without a valid DEM.")

In [None]:
if ignition_data:
    ignition_df = pd.DataFrame(ignition_data)
    print(f"Successfully processed {len(ignition_df)} ignition locations with elevation and slope data")

    ignition_df = ignition_df.dropna(subset=['fsr_km2_dy'])

    print("Building model to predict fire spread rate based on ignition topography...")

    features = ['ignition_elev', 'mean_elev', 'min_elev', 'max_elev', 'std_elev', 'range_elev', 
                'ignition_slope_deg', 'ignition_slope_pct'] 
    features = [f for f in features if f in ignition_df.columns]

    if 'land_cover' in ignition_df.columns:
        lc_dummies = pd.get_dummies(ignition_df['land_cover'], prefix='lc')
        ignition_df = pd.concat([ignition_df, lc_dummies], axis=1)
        features.extend(lc_dummies.columns.tolist())

    if 'ecoregion' in ignition_df.columns:
        # Only include if the number of unique ecoregions is manageable
        if ignition_df['ecoregion'].nunique() < 20:  # Arbitrary threshold
            eco_dummies = pd.get_dummies(ignition_df['ecoregion'], prefix='eco')
            ignition_df = pd.concat([ignition_df, eco_dummies], axis=1)
            features.extend(eco_dummies.columns.tolist())

    X = ignition_df[features]
    y = ignition_df['fsr_km2_dy']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42)

    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    print(f"Model performance metrics:")
    print(f"  Mean Squared Error: {mse:.4f}")
    print(f"  R² Score: {r2:.4f}")

    importance = pd.DataFrame({
        'Feature': features,
        'Importance': model.feature_importances_
    }).sort_values('Importance', ascending=False)

    print("\nFeature importance:")
    print(importance.head(10))

    plt.figure(figsize=(15, 10))

    # Plot 1: Spread rate vs ignition elevation
    plt.subplot(2, 3, 1)
    plt.scatter(ignition_df['ignition_elev'], ignition_df['fsr_km2_dy'],
                alpha=0.4, s=10)
    plt.xlabel('Ignition Point Elevation (m)')
    plt.ylabel('Fire Spread Rate (km²/day)')
    plt.title('Fire Spread Rate vs. Elevation')

    # Plot 2: Spread rate vs elevation range
    plt.subplot(2, 3, 2)
    plt.scatter(ignition_df['range_elev'], ignition_df['fsr_km2_dy'],
                alpha=0.4, s=10)
    plt.xlabel('Elevation Range (m)')
    plt.ylabel('Fire Spread Rate (km²/day)')
    plt.title('Fire Spread Rate vs. Elevation Range')

    # Plot 3: Spread rate vs slope (degrees)
    plt.subplot(2, 3, 3)
    plt.scatter(ignition_df['ignition_slope_deg'], ignition_df['fsr_km2_dy'],
                alpha=0.4, s=10, color='orange')
    plt.xlabel('Ignition Point Slope (degrees)')
    plt.ylabel('Fire Spread Rate (km²/day)')
    plt.title('Fire Spread Rate vs. Slope')

    # Plot 4: Feature importance
    plt.subplot(2, 3, 4)
    importance.head(10).sort_values('Importance').plot(
        kind='barh', x='Feature', y='Importance', legend=False)
    plt.xlabel('Importance')
    plt.title('Top Features for Predicting Fire Spread Rate')

    # Plot 5: Actual vs Predicted
    plt.subplot(2, 3, 5)
    plt.scatter(y_test, y_pred, alpha=0.5)
    plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--')
    plt.xlabel('Actual Spread Rate (km²/day)')
    plt.ylabel('Predicted Spread Rate (km²/day)')
    plt.title(f'Model Predictions (R² = {r2:.3f})')
    
    # Plot 6: Slope vs Elevation with spread rate as color
    plt.subplot(2, 3, 6)
    scatter = plt.scatter(ignition_df['ignition_elev'], 
                         ignition_df['ignition_slope_deg'],
                         c=ignition_df['fsr_km2_dy'], 
                         alpha=0.5, 
                         cmap='viridis')
    plt.colorbar(scatter, label='Fire Spread Rate (km²/day)')
    plt.xlabel('Elevation (m)')
    plt.ylabel('Slope (degrees)')
    plt.title('Elevation vs. Slope Colored by Fire Spread Rate')

    plt.tight_layout()
    plt.savefig('fire_topography_analysis.png')
    plt.show()

    print("\nCorrelations with fire spread rate:")
    topo_features = ['ignition_elev', 'mean_elev', 'range_elev', 
                     'ignition_slope_deg', 'ignition_slope_pct']
    corr = ignition_df[topo_features + ['fsr_km2_dy']].corr()['fsr_km2_dy'].drop('fsr_km2_dy')
    print(corr.sort_values(ascending=False))

    ignition_df.to_csv('ignition_topography_data.csv', index=False)
    print("Saved ignition topography dataset to 'ignition_topography_data.csv'")
else:
    print("No valid ignition topography data could be extracted.")