In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from deap import base, creator, tools, algorithms
import random
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

2025-05-04 09:57:08.216549: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746352628.487683      31 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746352628.564394      31 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
class RandomSensorDropout(tf.keras.layers.Layer):
    """
    Layer that randomly zeros individual sensor channels with a given rate during training.
    Supports proper serialization.
    """
    def __init__(self, rate=0.3, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=False):
        if training and self.rate > 0.0:
            mask = tf.cast(tf.random.uniform(tf.shape(inputs)) > self.rate, inputs.dtype)
            return inputs * mask
        return inputs

    def get_config(self):
        config = super().get_config()
        config.update({"rate": self.rate})
        return config
    

# --------------- Monte Carlo Dropout Layer ---------------------    
class MCDropout(tf.keras.layers.Dropout):
    """
    Dropout that is active both at train *and* inference time,
    so we can sample N stochastic forward passes.
    """
    def call(self, inputs, training=None):
        # Force dropout even in inference
        return super().call(inputs, training=True)


# --------------- Sensor Model ---------------------

def build_sensor_model(
    input_dim=7,
    sensor_dropout_rate=0.3,
    layer_dropout = 0.5,
    hidden_units=(64, 32),
    output_units=4,
    lr=1e-4
):
    """
    Sensor-only MLP with RandomSensorDropout option for ablation.
    """
    inp = tf.keras.Input(shape=(input_dim,), name="sensor_input")
    x = RandomSensorDropout(sensor_dropout_rate, name="sensor_dropout")(inp)
    x = tf.keras.layers.Dense(hidden_units[0], activation="relu")(x)
    x = MCDropout(layer_dropout)(x)
    x = tf.keras.layers.Dense(hidden_units[1], activation="relu")(x)
    x = MCDropout(layer_dropout)(x)
    out = tf.keras.layers.Dense(output_units, activation="softmax", name="output")(x)

    model = tf.keras.Model(inp, out, name="sensor_only")
    model.compile(
        optimizer=tf.keras.optimizers.Adam(lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model



In [14]:
# Load data as numpy arrays
import os, glob
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
from tqdm import trange


def load_and_split_sensor_data(
    csv_path: str,
    test_size: float = 0.1,
    val_size: float = 0.2,
    random_state: int = 42
):
    """
    Load sensor CSV, preprocess (drop columns, encode labels, scale), and split into train/val/test.

    Args:
        csv_path: Path to the Gas Sensors Measurements CSV file.
        test_size: Fraction of data reserved for final test set.
        val_size: Fraction of remaining data reserved for validation.
        random_state: Random seed for reproducibility.

    Returns:
        train_ds: tf.data.Dataset for training (features, labels).
        val_ds: tf.data.Dataset for validation.
        test_ds: tf.data.Dataset for testing.
    """
    # 1) Load and clean
    df = pd.read_csv(csv_path)
    # drop unused columns
    df = df.drop(columns=["Serial Number", "Corresponding Image Name"], errors='ignore')
    # encode labels
    df['Gas'] = df['Gas'].astype('category').cat.codes

    # 2) Extract features and labels
    feature_cols = [c for c in df.columns if c != 'Gas']
    X = df[feature_cols].values.astype('float32')
    y = df['Gas'].values.astype('int32')

    # 3) Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # 4) Split into train+val and test
    X_temp, X_test, y_temp, y_test = train_test_split(
        X_scaled, y, test_size=test_size, stratify=y, random_state=random_state
    )
    # Split train_temp into train and val
    val_fraction = val_size / (1 - test_size)
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_fraction,
        stratify=y_temp, random_state=random_state
    )

    # 5) Build tf.data datasets
    def make_ds(features, labels, batch_size=32, shuffle=False):
        ds = tf.data.Dataset.from_tensor_slices((features, labels))
        if shuffle:
            ds = ds.shuffle(buffer_size=len(features), seed=random_state)
        return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    num = X_train.shape[1]
    batch_size = 32
    train_ds = make_ds(X_train, y_train, batch_size, shuffle=True)
    val_ds   = make_ds(X_val,   y_val,   batch_size, shuffle=False)
    test_ds  = make_ds(X_test,  y_test,  batch_size, shuffle=False)

    return train_ds, val_ds, test_ds,num
csv_path = "/kaggle/input/gas-dataset/zkwgkjkjn9-2/Gas Sensors Measurements/Gas_Sensors_Measurements.csv"
train_ds, val_ds,test_ds, num = load_and_split_sensor_data(csv_path, 0.1, 0.2, 123)

output_units = len(np.unique([label.numpy() for _, label in train_ds.unbatch()]))

In [22]:
import random, numpy as np, tensorflow as tf
from deap import base, creator, tools
from tqdm import trange


# ------------------------------------------------------------------
# 1. Load tf.data datasets + meta info
csv_path = "/kaggle/input/gas-dataset/zkwgkjkjn9-2/Gas Sensors Measurements/Gas_Sensors_Measurements.csv"
train_ds, val_ds, test_ds,num = load_and_split_sensor_data(csv_path, 0.1, 0.2, 123)
input_dim     = train_ds.element_spec[0].shape[-1]
output_units  = len(np.unique([y.numpy() for _, y in train_ds.unbatch()]))

# ------------------------------------------------------------------
# 2. Fitness: expect exactly three flat genes
def eval_individual(ind, train_ds, val_ds, test_ds, input_dim, output_units):
    layer_do, h1, h2 = float(ind[0]), int(ind[1]), int(ind[2])

    # quick validity check
    if not (0. <= layer_do <= 1. and h1 > 0 and h2 > 0):
        return (0.0,)

    model = build_sensor_model(
        input_dim     = input_dim,
        sensor_dropout_rate = 0.3,
        layer_dropout = layer_do,
        hidden_units  = (h1, h2),
        output_units  = output_units,
        lr            = 1e-4
    )
    model.fit(train_ds, validation_data=val_ds, epochs=5, verbose=0)
    _, acc = model.evaluate(val_ds, verbose=0)
    tf.keras.backend.clear_session()
    return (acc,)

# ------------------------------------------------------------------
# 3. DEAP setup (delete old classes if cell re‑run)
for cls in ("FitnessMax", "Individual"):
    if cls in creator.__dict__:
        del creator.__dict__[cls]

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("layer_do", random.uniform, 0.0, 0.8)
toolbox.register("h1",       random.randrange, 16, 257, 16)
toolbox.register("h2",       random.randrange, 16, 257, 16)

# ---- IMPORTANT: create a *flat* 3‑gene list ----------------------
toolbox.register(
    "individual",
    tools.initIterate,
    creator.Individual,
    lambda: [toolbox.layer_do(), toolbox.h1(), toolbox.h2()]
)
# -----------------------------------------------------------------

toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate",   eval_individual,
                 train_ds=train_ds, val_ds=val_ds, test_ds=test_ds,
                 input_dim=input_dim, output_units=output_units)
toolbox.register("mate", tools.cxTwoPoint)

# custom mutation – return the list itself (not tuple!)
def mutate_ind(ind, indpb):
    if random.random() < indpb:
        ind[0] = random.uniform(0.0, 0.8)
    if random.random() < indpb:
        ind[1] = random.randrange(16, 257, 16)
    if random.random() < indpb:
        ind[2] = random.randrange(16, 257, 16)
    return ind,                     # <- DEAP expects a tuple *of* individuals

toolbox.register("mutate", mutate_ind, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

# ------------------------------------------------------------------
# 4. Evolve
POP, NGEN, CX_PB, MUT_PB = 20, 30, 0.5, 0.2
pop = toolbox.population(n=POP)
hof = tools.HallOfFame(1)

for _ in trange(NGEN, desc="GA generations"):
    offspring = list(map(toolbox.clone, toolbox.select(pop, len(pop))))
    for c1, c2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CX_PB:
            toolbox.mate(c1, c2)
            del c1.fitness.values, c2.fitness.values
    for m in offspring:
        if random.random() < MUT_PB:
            toolbox.mutate(m)
            del m.fitness.values
    invalid = [ind for ind in offspring if not ind.fitness.valid]
    for ind, fit in zip(invalid, map(toolbox.evaluate, invalid)):
        ind.fitness.values = fit
    pop[:] = offspring
    hof.update(pop)

best = hof[0]
print(f"\nBest → dropout={best[0]:.3f}, hidden={best[1]},{best[2]}, "
      f"val_acc={best.fitness.values[0]:.4f}")


GA generations: 100%|██████████| 30/30 [41:43<00:00, 83.44s/it] 


Best → dropout=0.046, hidden=256,224, val_acc=0.9094





# with learning rate

In [None]:
import random, numpy as np, tensorflow as tf
from deap import base, creator, tools
from tqdm import trange

# ------------------------------------------------------------------
# 1. Load data
csv_path = "../dataset/Gas Sensors Measurements/Gas_Sensors_Measurements.csv"
train_ds, val_ds, test_ds, num = load_and_split_sensor_data(csv_path, 0.1, 0.2, 123)
input_dim     = train_ds.element_spec[0].shape[-1]
output_units  = len(np.unique([y.numpy() for _, y in train_ds.unbatch()]))

# ------------------------------------------------------------------
# 2. Fitness: genome = [dropout, h1, h2, log10_lr]
def eval_individual(ind, train_ds, val_ds, input_dim, output_units):
    layer_do, h1, h2, log_lr = float(ind[0]), int(ind[1]), int(ind[2]), float(ind[3])
    if not (0. <= layer_do <= 1. and h1 > 0 and h2 > 0 and -7 <= log_lr <= -2):
        return (0.0,)

    lr = 10 ** log_lr
    model = build_sensor_model(
        input_dim            = input_dim,
        sensor_dropout_rate  = 0.3,
        layer_dropout        = layer_do,
        hidden_units         = (h1, h2),
        output_units         = output_units,
        lr                   = lr
    )
    callback = tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
    model.fit(train_ds, validation_data=val_ds, epochs=25, callbacks=[callback], verbose=0)
    _, acc = model.evaluate(val_ds, verbose=0)
    tf.keras.backend.clear_session()
    return (acc,)

# ------------------------------------------------------------------
# 3. DEAP setup
for cls in ("FitnessMax", "Individual"):
    if cls in creator.__dict__: del creator.__dict__[cls]

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("layer_do", random.uniform, 0.0, 0.8)
toolbox.register("h1",       random.randrange, 16, 257, 16)
toolbox.register("h2",       random.randrange, 16, 257, 16)
toolbox.register("log_lr",   random.uniform, -5, -3)        # log₁₀ learning rate

toolbox.register(
    "individual",
    tools.initIterate,
    creator.Individual,
    lambda: [toolbox.layer_do(), toolbox.h1(), toolbox.h2(), toolbox.log_lr()]
)

toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate",   eval_individual,
                 train_ds=train_ds, val_ds=val_ds,
                 input_dim=input_dim, output_units=output_units)
toolbox.register("mate", tools.cxTwoPoint)

# Gaussian‑style jitter mutation
def mutate_ind(ind, indpb=0.2):
    if random.random() < indpb:
        ind[0] = np.clip(ind[0] + np.random.normal(0, 0.05), 0.0, 0.8)
    if random.random() < indpb:
        ind[1] = max(16, min(256, ind[1] + random.choice([-16, 16])))
    if random.random() < indpb:
        ind[2] = max(16, min(256, ind[2] + random.choice([-16, 16])))
    if random.random() < indpb:
        ind[3] = np.clip(ind[3] + np.random.normal(0, 0.25), -5, -3)
    return ind,

toolbox.register("mutate", mutate_ind)
toolbox.register("select", tools.selTournament, tournsize=3)

# ------------------------------------------------------------------
# 4. Evolve
POP, NGEN, CX_PB, MUT_PB = 40, 50, 0.5, 0.2
pop = toolbox.population(n=POP)
hof = tools.HallOfFame(1)

for _ in trange(NGEN, desc="GA generations"):
    offspring = list(map(toolbox.clone, toolbox.select(pop, len(pop))))
    for c1, c2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CX_PB:
            toolbox.mate(c1, c2)
            del c1.fitness.values, c2.fitness.values
    for m in offspring:
        if random.random() < MUT_PB:
            toolbox.mutate(m)
            del m.fitness.values
    invalid = [ind for ind in offspring if not ind.fitness.valid]
    for ind, fit in zip(invalid, map(toolbox.evaluate, invalid)):
        ind.fitness.values = fit
    elite = tools.selBest(pop, 2)          # elitism
    pop[:] = elite + offspring[:-2]
    hof.update(pop)

best = hof[0]
print(f"\nBest → dropout={best[0]:.3f}, hidden={best[1]},{best[2]}, "
      f"lr={10**best[3]:.2e}, val_acc={best.fitness.values[0]:.4f}")


GA generations: 100%|██████████| 50/50 [5:36:33<00:00, 403.86s/it]  


Best → dropout=0.036, hidden=224,240, lr=7.07e-04, val_acc=0.9531



