In [2]:
import numpy as np
import pandas as pd

np.random.seed(42)

def generate_flash_samples(label, n, center):
    """Generates synthetic flash flood data centered on specific conditions."""
    return pd.DataFrame({
        "rainfall_intensity":   np.random.normal(loc=center["rain"], scale=12, size=n),
        "slope":                np.random.normal(loc=center["slope"], scale=3.5, size=n),
        "drainage_density":     np.random.normal(loc=center["drainage"], scale=0.6, size=n),
        "soil_saturation":      np.random.normal(loc=center["saturation"], scale=0.1, size=n),
        "convergence_index":  np.random.normal(loc=center["convergence"], scale=0.1, size=n),
        "flash_binary":         label
    })

# --- Core Archetypes ---
flood_mountain_torrential = generate_flash_samples(1, 1000, {
    "rain": 100, "slope": 20, "drainage": 1.5, "saturation": 0.9, "convergence": 0.8
})

flood_valley_choke = generate_flash_samples(1, 600, {
    "rain": 80, "slope": 10, "drainage": 2.0, "saturation": 0.7, "convergence": 0.95
})

no_flood_flat_absorbent = generate_flash_samples(0, 800, {
    "rain": 50, "slope": 5, "drainage": 4.5, "saturation": 0.4, "convergence": 0.4
})

no_flood_urban_controlled = generate_flash_samples(0, 700, {
    "rain": 65, "slope": 8, "drainage": 4.0, "saturation": 0.5, "convergence": 0.5
})

contrast_mix = generate_flash_samples(1, 300, {
    "rain": 90, "slope": 15, "drainage": 2.8, "saturation": 0.8, "convergence": 0.65
})

# --- Final Assembly ---
data = pd.concat([
    flood_mountain_torrential, flood_valley_choke, contrast_mix,
    no_flood_flat_absorbent, no_flood_urban_controlled
])
data = data.clip(lower=0, upper=None)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)

data["rainfall_intensity"] *= 1.5  # accentuate the rainfall signal

data.to_csv("dataset/flash_flood_data.csv", index=False)
print("✅ FlashFlood dataset rebuilt:", data.shape)

✅ FlashFlood dataset rebuilt: (3400, 6)


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.saving import register_keras_serializable
import pandas as pd
from sklearn.model_selection import train_test_split

In [4]:

data = pd.read_csv("dataset/flash_flood_data.csv")
print("✅ Dataset size:", len(data),
      "| Flood:", data["flash_binary"].sum(),
      "| No Flood:", (data["flash_binary"] == 0).sum())

X = data.drop("flash_binary", axis=1).astype("float32")
y = data["flash_binary"].astype("float32")
X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, test_size=0.3, random_state=42)

@register_keras_serializable()
def intensity_slope_amplifier(inputs):
    rain_intensity = inputs[:, 0]
    slope = inputs[:, 1]
    runoff_boost = tf.sigmoid((rain_intensity - 75) * 0.08)
    slope_boost = tf.sigmoid((slope - 10) * 0.05)
    return (1.0 + 0.35 * runoff_boost * slope_boost)[:, None]

@register_keras_serializable()
def drainage_penalty(inputs):
    drainage = inputs[:, 2]
    return (1.0 - 0.4 * tf.sigmoid((drainage - 3.5) * 2))[:, None]

@register_keras_serializable()
def convergence_suppressor(inputs):
    convergence = inputs[:, 4]
    return (1.0 + 0.3 * tf.sigmoid((convergence - 0.5) * 8))[:, None]

@register_keras_serializable()
def clip_modulation(x):
    return tf.clip_by_value(x, 0.7, 1.3)

input_layer = layers.Input(shape=(5,))
rain_input = layers.Lambda(lambda x: x[:, 0:1])(input_layer)
rain_branch = layers.Dense(8, activation="relu")(rain_input)

x = layers.BatchNormalization()(input_layer)
x1 = layers.Dense(128, activation="relu")(x)
x2 = layers.Dense(64, activation="relu")(x1)
x3 = layers.Dense(64, activation="relu")(x2)
residual = layers.Add()([x3, x2])
combined = layers.Concatenate()([residual, rain_branch])
logits = layers.Dense(1)(combined)

amplifier   = layers.Lambda(intensity_slope_amplifier)(input_layer)
penalty     = layers.Lambda(drainage_penalty)(input_layer)
suppression = layers.Lambda(convergence_suppressor)(input_layer)
mod_strength = layers.Multiply()([amplifier, penalty, suppression])
mod_strength = layers.Lambda(clip_modulation)(mod_strength)

modulated_logits = layers.Add()([logits, mod_strength])
adjusted_output  = layers.Activation("sigmoid")(modulated_logits)

model = models.Model(inputs=input_layer, outputs=adjusted_output)
model.compile(
    optimizer="adam",
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False, label_smoothing=0.05),
    metrics=["accuracy"]
)

early_stop = callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)
model.fit(X_train, y_train, validation_split=0.2, epochs=10, batch_size=8, callbacks=[early_stop])

loss, acc = model.evaluate(X_test, y_test)
print(f"⚡ FlashFloodNet Accuracy: {acc:.4f}")
model.save("models/FlashFloodNet.h5")

✅ Dataset size: 3400 | Flood: 1900 | No Flood: 1500

Epoch 1/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 13ms/step - accuracy: 0.8224 - loss: 0.9121 - val_accuracy: 0.9769 - val_loss: 0.1747
Epoch 2/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step - accuracy: 0.9718 - loss: 0.1885 - val_accuracy: 1.0000 - val_loss: 0.1408
Epoch 3/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - accuracy: 0.9766 - loss: 0.1815 - val_accuracy: 0.9979 - val_loss: 0.1506
Epoch 4/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 7ms/step - accuracy: 0.9545 - loss: 0.2116 - val_accuracy: 1.0000 - val_loss: 0.1354
Epoch 5/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 13ms/step - accuracy: 0.9641 - loss: 0.1762 - val_accuracy: 1.0000 - val_loss: 0.1344
Epoch 6/10
[1m238/238[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 10ms/step - accuracy: 0.9802 - loss: 0.1699 - val_accuracy



⚡ FlashFloodNet Accuracy: 0.9971
