In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, initializers
import math


# === Activation Functions === #
class Swish(layers.Layer):
    def call(self, inputs):
        return inputs * tf.nn.sigmoid(inputs)


class GLU(layers.Layer):
    def __init__(self, axis=-1):
        super().__init__()
        self.axis = axis

    def call(self, inputs):
        a, b = tf.split(inputs, num_or_size_splits=2, axis=self.axis)
        return a * tf.nn.sigmoid(b)


# === Utility Layers === #
class Linear(layers.Layer):
    def __init__(self, in_dim, out_dim, use_bias=True):
        super().__init__()
        self.linear = layers.Dense(out_dim, use_bias=use_bias,
                                   kernel_initializer=initializers.GlorotUniform(),
                                   bias_initializer='zeros')

    def call(self, x):
        return self.linear(x)


class ResidualConnectionModule(tf.keras.layers.Layer):
    def __init__(self, module, module_factor=1.0, input_factor=1.0):
        super().__init__()
        self.module = module
        self.module_factor = module_factor
        self.input_factor = input_factor

    def call(self, inputs):
        if isinstance(inputs, tuple):
            # Do not apply residual connection for multi-input layers
            return self.module(inputs)
        else:
            return (self.module(inputs) * self.module_factor) + (inputs * self.input_factor)


# === Positional Encoding === #
class RelPositionalEncoding(layers.Layer):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.d_model = d_model
        self.max_len = max_len

    def call(self, x):
        position = tf.range(self.max_len, dtype=tf.float32)[:, tf.newaxis]
        div_term = tf.exp(tf.range(0, self.d_model, 2, dtype=tf.float32) * -(math.log(10000.0) / self.d_model))
        pe = tf.zeros((self.max_len, self.d_model))
        pe = tf.tensor_scatter_nd_update(pe, indices=[[i, j] for i in range(self.max_len) for j in range(self.d_model)],
                                         updates=tf.reshape(tf.concat([tf.sin(position * div_term), tf.cos(position * div_term)], axis=-1), [-1]))
        pe = pe[tf.newaxis, :, :]
        return pe[:, :tf.shape(x)[1], :]


# === Multi-head Attention with Relative Position === #
class RelativeMultiHeadAttention(layers.Layer):
    def __init__(self, d_model, num_heads, dropout_rate):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads

        self.query_dense = Linear(d_model, d_model)
        self.key_dense = Linear(d_model, d_model)
        self.value_dense = Linear(d_model, d_model)
        self.pos_dense = Linear(d_model, d_model, use_bias=False)
        self.dropout = layers.Dropout(dropout_rate)
        self.out_dense = Linear(d_model, d_model)

    def split_heads(self, x):
        x = tf.reshape(x, (tf.shape(x)[0], -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs, training=None):
        q, k, v, pos_enc, mask = inputs
        q = self.split_heads(self.query_dense(q))
        k = self.split_heads(self.key_dense(k))
        v = self.split_heads(self.value_dense(v))
        pos = self.split_heads(self.pos_dense(pos_enc))

        attn_logits = tf.matmul(q, k, transpose_b=True)
        attn_logits += tf.matmul(q, pos, transpose_b=True)  # simplified version
        attn_logits = attn_logits / tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            attn_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(attn_logits, axis=-1)
        attention_weights = self.dropout(attention_weights)
        context = tf.matmul(attention_weights, v)

        context = tf.transpose(context, perm=[0, 2, 1, 3])
        context = tf.reshape(context, (tf.shape(context)[0], -1, self.d_model))
        return self.out_dense(context)


# === Feedforward Module === #
class FeedForwardModule(layers.Layer):
    def __init__(self, d_model, expansion_factor, dropout_rate):
        super().__init__()
        self.ffn = models.Sequential([
            layers.LayerNormalization(),
            Linear(d_model, d_model * expansion_factor),
            Swish(),
            layers.Dropout(dropout_rate),
            Linear(d_model * expansion_factor, d_model),
            layers.Dropout(dropout_rate)
        ])

    def call(self, x):
        return self.ffn(x)


# === Conformer Conv Module === #
class ConformerConvModule(tf.keras.layers.Layer):
    def __init__(self, d_model, kernel_size=31, dropout_rate=0.1):
        super().__init__()
        self.layer_norm = tf.keras.layers.LayerNormalization()
        self.pointwise_conv1 = tf.keras.layers.Conv1D(
            filters=2 * d_model, kernel_size=1, padding='same'
        )
        self.glu = GLU(axis=-1)
        self.depthwise_conv = tf.keras.layers.DepthwiseConv2D(
            kernel_size=(kernel_size, 1),
            padding='same'
        )
        self.batch_norm = tf.keras.layers.BatchNormalization()
        self.swish = Swish()
        self.pointwise_conv2 = tf.keras.layers.Conv1D(
            filters=d_model, kernel_size=1, padding='same'
        )
        self.dropout = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, training=False):
        # x shape: (batch, time, dim)
        x = self.layer_norm(x)

        # Conv1D expects (batch, time, dim) → transpose to (batch, dim, time) for Depthwise
        x = self.pointwise_conv1(x)  # (batch, time, 2*dim)
        x = self.glu(x)              # (batch, time, dim)

        # reshape to (batch, dim, time, 1) for DepthwiseConv2D
        x = tf.transpose(x, [0, 2, 1])       # (batch, dim, time)
        x = tf.expand_dims(x, axis=-1)       # (batch, dim, time, 1)

        x = self.depthwise_conv(x)           # (batch, dim, time, 1)
        x = tf.squeeze(x, axis=-1)           # (batch, dim, time)
        x = tf.transpose(x, [0, 2, 1])       # (batch, time, dim)

        x = self.batch_norm(x, training=training)
        x = self.swish(x)
        x = self.pointwise_conv2(x)
        return self.dropout(x, training=training)


# === Conformer Block === #
class ConformerBlock(layers.Layer):
    def __init__(self, d_model, num_heads, ff_expansion, conv_kernel_size, dropout_rate):
        super().__init__()
        self.ff1 = ResidualConnectionModule(FeedForwardModule(d_model, ff_expansion, dropout_rate), 0.5)
        self.mha = ResidualConnectionModule(RelativeMultiHeadAttention(d_model, num_heads, dropout_rate))
        self.conv = ResidualConnectionModule(ConformerConvModule(d_model, conv_kernel_size, dropout_rate))
        self.ff2 = ResidualConnectionModule(FeedForwardModule(d_model, ff_expansion, dropout_rate), 0.5)
        self.norm = layers.LayerNormalization()

    def call(self, x, pos_enc):
        x = self.ff1(x)
        x = self.mha((x, x, x, pos_enc, None))
        x = self.conv(x)
        x = self.ff2(x)
        return self.norm(x)


# === Conformer Encoder === #
class ConformerEncoder(layers.Layer):
    def __init__(self, input_dim, d_model, num_layers, num_heads, ff_expansion, conv_kernel_size, dropout_rate):
        super().__init__()
        self.input_proj = layers.Dense(d_model)
        self.pos_enc = RelPositionalEncoding(d_model)
        self.blocks = [ConformerBlock(d_model, num_heads, ff_expansion, conv_kernel_size, dropout_rate) for _ in range(num_layers)]

    def call(self, x):
        x = self.input_proj(x)
        pos = self.pos_enc(x)
        for block in self.blocks:
            x = block(x, pos)
        return x


# === Top-level Conformer Model === #
class ConformerModel(tf.keras.Model):
    def __init__(self, num_classes, input_dim=80, d_model=256, num_layers=4, num_heads=4,
                 ff_expansion=4, conv_kernel_size=15, dropout_rate=0.1):
        super().__init__()
        self.encoder = ConformerEncoder(input_dim, d_model, num_layers, num_heads, ff_expansion, conv_kernel_size, dropout_rate)
        self.fc = layers.Dense(num_classes)

    def call(self, x):
        x = self.encoder(x)
        x = tf.nn.log_softmax(self.fc(x), axis=-1)
        x = tf.reduce_mean(x, axis=1)
        return x

In [None]:
import os
import tensorflow as tf
import numpy as np
import librosa
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from sklearn.utils import shuffle

# =====================
# Preprocessing + Feature Extraction
# =====================
def extract_features(file_path, sr=16000, n_mels=80, duration=3):
    y, _ = librosa.load(file_path, sr=sr, duration=duration)
    if len(y) < sr * duration:
        y = np.pad(y, (0, sr * duration - len(y)))
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)
    log_mel = librosa.power_to_db(mel, ref=np.max)
    return log_mel.T[:300]  # Pad/crop to 300 frames

def load_dataset(data_dir, sr=16000, n_mels=80, duration=3):
    features, labels = [], []
    classes = os.listdir(data_dir)
    for label in classes:
        class_dir = os.path.join(data_dir, label)
        if not os.path.isdir(class_dir):
            continue
        for fname in os.listdir(class_dir):
            if not fname.endswith('.wav'):
                continue
            fpath = os.path.join(class_dir, fname)
            feat = extract_features(fpath, sr, n_mels, duration)
            features.append(feat)
            labels.append(label)
    X = np.stack(features)
    le = LabelEncoder()
    y = to_categorical(le.fit_transform(labels))
    return X, y, le.classes_

# =====================
# Model Architecture (from before)
# =====================
# [Previous code defining Swish, GLU, Linear, ResidualConnectionModule, etc. remains unchanged]

# =====================
# Training Loop
# =====================
def train_model(model, train_data, val_data, epochs=20, batch_size=32):
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
        metrics=['accuracy']
    )
    model.fit(
        x=train_data[0], y=train_data[1],
        validation_data=val_data,
        batch_size=batch_size,
        epochs=epochs
    )

# =====================
# Entry Point
# =====================
if __name__ == "__main__":
    DATA_DIR = "./data/all-samples"

    X, y, class_names = load_dataset(DATA_DIR)
    X, y = shuffle(X, y, random_state=42)
    split = int(0.8 * len(X))
    train_data = (X[:split], y[:split])
    val_data = (X[split:], y[split:])

    model = ConformerModel(num_classes=len(class_names), input_dim=80, d_model=256, num_layers=4)
    train_model(model, train_data, val_data)

NameError: name 'load_dataset' is not defined