## NATURAL DISASTER PREDICTION USING DECISION TREE

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, udf, count, sum as sf_sum, avg, max as sf_max
from snowflake.snowpark.functions import concat, lpad, lit
from snowflake.snowpark.types import FloatType, StringType



session = get_active_session()
session.sql("USE DATABASE NOAA_STORM_EVENTS_2025").collect()
session.sql("USE SCHEMA PUBLIC").collect()

In [None]:
df = session.table("STORM_EVENTS_DETAILS") # loads the raw NOAA storm events table

df = df.dropna(subset=['STATE', 'EVENT_TYPE', 'STATE_FIPS', 'CZ_FIPS']) # ensure valid FIPS codes

# keep FIPS and other relevant columns
df = df.select([
    'STATE_FIPS',
    'CZ_FIPS',  
    'STATE', 
    'YEAR', 
    'MONTH_NAME', 
    'BEGIN_LAT', 
    'BEGIN_LON',
    'EVENT_TYPE', 
    'MAGNITUDE', 
    'INJURIES_DIRECT', 
    'DEATHS_DIRECT',
    'DAMAGE_PROPERTY', 
    'DAMAGE_CROPS'
])

# ===== CREATE FULL 5-DIGIT COUNTY FIPS CODE =====
# creates a new column `FIPS_CODE` by combining STATE_FIPS + CZ_FIPS
df = df.with_column(
    "FIPS_CODE",
    concat(
        lpad(col("STATE_FIPS").cast("STRING"), lit(2), lit("0")),
        lpad(col("CZ_FIPS").cast("STRING"), lit(3), lit("0"))
    )
)

print(f" Created 5-digit FIPS codes for {df.count()} storm events")

# damage parsing UDF
# Converts damage strings like "50K" or "2.5M" into actual numbers
# "50K" → 50,000
# "2.5M" → 2,500,000

def parse_damage(val):
    if val is None:
        return 0.0
    val = str(val).upper().replace('$','').strip()
    try:
        if 'K' in val:
            return float(val.replace('K','')) * 1_000
        elif 'M' in val:
            return float(val.replace('M','')) * 1_000_000
        elif 'B' in val:
            return float(val.replace('B','')) * 1_000_000_000
        else:
            return float(val)
    except ValueError:
        return 0.0

parse_damage_udf = udf(func=parse_damage, return_type=FloatType(), input_types=[StringType()])
df = df.with_column("DAMAGE_PROPERTY", parse_damage_udf(col("DAMAGE_PROPERTY"))) \
       .with_column("DAMAGE_CROPS", parse_damage_udf(col("DAMAGE_CROPS")))

# now DAMAGE_PROPERTY and DAMAGE_CROPS are numbers we can sum/average

# event categorization - Groups 48+ different event types into 13 broader categories
def categorize_event(event):
    if event is None:
        return 'Other'
    event = str(event).title().strip()
    if event in ['Flood', 'Flash Flood', 'Coastal Flood']:
        return 'Flood'
    elif event in ['Tornado']:
        return 'Tornado'
    elif event in ['Thunderstorm Wind', 'Strong Wind', 'High Wind', 
                   'Marine Thunderstorm Wind', 'Marine High Wind']:
        return 'Wind'
    elif event in ['Hail']:
        return 'Hail'
    elif event in ['Lightning']:
        return 'Lightning'
    elif event in ['Blizzard', 'Winter Storm', 'Winter Weather', 'Heavy Snow', 
                   'Sleet', 'Ice Storm', 'Lake-Effect Snow']:
        return 'Snow/Ice'
    elif event in ['Drought', 'Heat', 'Excessive Heat']:
        return 'Heat/Drought'
    elif event in ['Heavy Rain', 'Debris Flow']:
        return 'Heavy Rain'
    elif event in ['Cold/Wind Chill', 'Extreme Cold/Wind Chill', 'Frost/Freeze']:
        return 'Cold'
    elif event in ['Wildfire']:
        return 'Wildfire'
    elif event in ['Dust Storm']:
        return 'Dust Storm'
    elif event in ['High Surf', 'Tsunami', 'Hurricane', 'Tropical Storm']:
        return 'Coastal/Marine'
    else:
        return 'Other'

# create new column `EVENT_GROUP` with simplified categories
categorize_event_udf = udf(func=categorize_event, return_type=StringType(), input_types=[StringType()])
df = df.with_column("EVENT_GROUP", categorize_event_udf(col("EVENT_TYPE")))

In [None]:
# convert to pandas for modeling
df_pandas = df.to_pandas()

# ===== STORE FIPS BEFORE ENCODING =====
fips_codes = df_pandas['FIPS_CODE'].copy() 

In [None]:
# encode and train model 
df_model = pd.get_dummies(df_pandas, columns=['STATE', 'MONTH_NAME'], drop_first=True)
X = df_model.drop(columns=['EVENT_TYPE', 'EVENT_GROUP', 'STATE_FIPS', 'CZ_FIPS', 'FIPS_CODE'])
y = df_model['EVENT_GROUP']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

model = DecisionTreeClassifier(
    max_depth=6,
    class_weight='balanced',
    random_state=42
)
model.fit(X_train, y_train)

# Evaluate model
y_pred = model.predict(X_test)
print("Classification Report:")
print(classification_report(y_test, y_pred))

In [None]:
# ===== CREATE COUNTY-LEVEL RISK FEATURES =====

# Get predictions for all data
df_pandas['PREDICTED_EVENT'] = model.predict(X)

# Get prediction probabilities for severity scoring
pred_proba = model.predict_proba(X)

# Create a severity score (max probability of high-risk events)
high_risk_events = ['Tornado', 'Flood', 'Wind', 'Snow/Ice']
high_risk_indices = [i for i, event in enumerate(model.classes_) if event in high_risk_events]

# gets probability scores for each category
# example: Event A might be 70% Wind, 20% Hail, 10% Other
# 'PREDICTED_SEVERITY`: Takes the MAX probability for high-risk events
# this becomes our severity score(0 to 1)
# severity score: probability of being a natural disaster

if high_risk_indices:
    df_pandas['PREDICTED_SEVERITY'] = pred_proba[:, high_risk_indices].max(axis=1)
else:
    df_pandas['PREDICTED_SEVERITY'] = pred_proba.max(axis=1)

# add FIPS back
df_pandas['FIPS_CODE'] = fips_codes

In [None]:
# ===== DISPLAY INDIVIDUAL EVENT PREDICTIONS =====
print("\n" + "="*80)
print("INDIVIDUAL STORM EVENT PREDICTIONS")
print("="*80)

# Show sample of predictions
display_df = df_pandas[[
    'FIPS_CODE', 
    'STATE',
    'EVENT_TYPE', 
    'PREDICTED_EVENT',
    'PREDICTED_SEVERITY',
    'DAMAGE_PROPERTY',
    'INJURIES_DIRECT',
    'DEATHS_DIRECT'
]].copy()

# Round severity for readability
display_df['PREDICTED_SEVERITY'] = display_df['PREDICTED_SEVERITY'].round(3)

print("\n Sample of 20 Storm Events with Predictions:")
print(display_df.head(20).to_string(index=False))

print("\n Distribution of Predicted Severity Scores:")
print(display_df['PREDICTED_SEVERITY'].describe())

# Show highest severity events
print("\n TOP 10 MOST SEVERE EVENTS (Highest Predicted Severity):")
top_severe = display_df.nlargest(10, 'PREDICTED_SEVERITY')
print(top_severe.to_string(index=False))

# Show lowest severity events
print("\n TOP 10 LEAST SEVERE EVENTS (Lowest Predicted Severity):")
low_severe = display_df.nsmallest(10, 'PREDICTED_SEVERITY')
print(low_severe.to_string(index=False))

# Comparison: Actual vs Predicted Event Type
print("\n EVENT TYPE COMPARISON (Sample):")
comparison_df = df_pandas[[
    'EVENT_TYPE',
    'EVENT_GROUP', 
    'PREDICTED_EVENT',
    'PREDICTED_SEVERITY',
    'DAMAGE_PROPERTY'
]].head(15)
comparison_df['PREDICTED_SEVERITY'] = comparison_df['PREDICTED_SEVERITY'].round(3)
print(comparison_df.to_string(index=False))

# Check prediction accuracy
correct_predictions = (df_pandas['EVENT_GROUP'] == df_pandas['PREDICTED_EVENT']).sum()
total_predictions = len(df_pandas)
accuracy = correct_predictions / total_predictions
print(f"\n Model Accuracy on All Events: {accuracy:.2%} ({correct_predictions}/{total_predictions})")

print("="*80 + "\n")

In [None]:
# ===== AGGREGATE BY COUNTY (FIPS) =====
# group all events by county 
# for each county, calculate:
# - **Mean severity**: Average of all PREDICTED_SEVERITY scores
# - **Max severity**: Worst storm that county experienced
# - **Std severity**: How variable the storms were
# - **Total injuries/deaths**: Sum across all events
# - **Total damage**: Sum of all property/crop damage
# - **Event count**: How many storms hit that county

county_risk = df_pandas.groupby('FIPS_CODE').agg({
    'PREDICTED_SEVERITY': ['mean', 'max', 'std'],  # Severity metrics
    'INJURIES_DIRECT': 'sum',                       # Total injuries
    'DEATHS_DIRECT': 'sum',                         # Total deaths
    'DAMAGE_PROPERTY': 'sum',                       # Total property damage
    'DAMAGE_CROPS': 'sum',                          # Total crop damage
    'EVENT_TYPE': 'count'                           # Event frequency
}).reset_index()

# 8 storm features!
county_risk.columns = [
    'FIPS_CODE',
    'STORM_SEVERITY_MEAN',
    'STORM_SEVERITY_MAX',
    'STORM_SEVERITY_STD',
    'STORM_INJURIES_TOTAL',
    'STORM_DEATHS_TOTAL',
    'STORM_DAMAGE_PROPERTY_TOTAL',
    'STORM_DAMAGE_CROPS_TOTAL',
    'STORM_EVENT_COUNT'
]

In [None]:
# Handle NaN in std (counties with only 1 event)
county_risk['STORM_SEVERITY_STD'] = county_risk['STORM_SEVERITY_STD'].fillna(0)
county_risk['FIPS_CODE'] = county_risk['FIPS_CODE'].astype(str).str.zfill(5)

# ===== SAVE TO SNOWFLAKE =====
# Convert back to Snowpark DataFrame
county_risk_sp = session.create_dataframe(county_risk)

# Create or replace the aggregated table
county_risk_sp.write.mode("overwrite").save_as_table("STORM_COUNTY_RISK_FEATURES")

print("✅ County-level storm risk features saved to STORM_COUNTY_RISK_FEATURES")
print(f"✅ Created {len(county_risk)} county-level risk profiles")
print("\nSample of features created:")
print(county_risk.head())