In [None]:
import os
import numpy as np
import numpy as _np
if not hasattr(_np, 'complex'):
    _np.complex = complex

import librosa
from glob import glob
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

import tensorflow as tf
from tensorflow.keras.layers import (
    Input, Conv1D, ZeroPadding1D,
    Activation, Multiply, Add,
    Softmax, Layer
)
from tensorflow.keras.models import Model


2025-04-21 08:17:08.777050: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1745223431.051406   39507 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1745223431.694105   39507 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-21 08:17:17.142675: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# 1. Tham số chung
SAMPLE_RATE  = 16000 
MAX_DURATION = 3.0                      
MAX_SAMPLES  = int(SAMPLE_RATE * MAX_DURATION) 
BATCH_SIZE   = 8                     
EPOCHS       = 50                      


# Load data 

In [None]:
TARGET_EMOTIONS = ['neutral', 'sad', 'happy', 'angry']

In [None]:
EMO_MAP = {
    "01": "neutral",
    "02": "calm",
    "03": "happy",
    "04": "sad",
    "05": "angry",
    "06": "fearful",
    "07": "disgust",
    "08": "surprised"
}

def load_ravdess(path):
    X, y = [], []
    for fp in glob(os.path.join(path, "Actor_*", "*.wav")):
        code = os.path.basename(fp).split('-')[2]
        label = EMO_MAP.get(code)
        if label not in TARGET_EMOTIONS:
            continue
        sig, _ = librosa.load(fp, sr=SAMPLE_RATE)
        sig, _ = librosa.effects.trim(sig, top_db=20)
        if len(sig) < MAX_SAMPLES:
            sig = np.pad(sig, (0, MAX_SAMPLES - len(sig)), 'constant')
        else:
            sig = sig[:MAX_SAMPLES]
        X.append(sig)
        y.append(label)
    return np.array(X), np.array(y)


Loading & trimming RAVDESS...
Loaded 1440 samples


In [None]:
CREMA_EMO_MAP = {
    "ANG": "angry",
    "DIS": "disgust",
    "FEA": "fearful",
    "HAP": "happy",
    "NEU": "neutral",
    "SAD": "sad"
}


def load_crema(path, limit=3000):
    X, y = [], []
    files = glob(os.path.join(path, "*.wav"))
    count = 0
    for fp in files:
        if count >= limit:
            break
        filename = os.path.basename(fp)
        parts = filename.split('_')
        emo_code = parts[2]
        label = CREMA_EMO_MAP.get(emo_code)
        if label not in TARGET_EMOTIONS:
            continue
        sig, _ = librosa.load(fp, sr=SAMPLE_RATE)
        sig, _ = librosa.effects.trim(sig, top_db=20)
        if len(sig) < MAX_SAMPLES:
            sig = np.pad(sig, (0, MAX_SAMPLES - len(sig)), 'constant')
        else:
            sig = sig[:MAX_SAMPLES]
        X.append(sig)
        y.append(label)
        count += 1
    return np.array(X), np.array(y)



In [None]:
RAVDESS_PATH = '../data/RAVDESS_Data'
CREMA_PATH   = '../data/CREMA_Data'

print("Loading & trimming RAVDESS...")
X_ravdess, y_ravdess = load_ravdess(RAVDESS_PATH)
print(f"Loaded {X_ravdess.shape[0]} samples from RAVDESS")

print("Loading & trimming CREMA-D...")
X_crema, y_crema = load_crema(CREMA_PATH)
print(f"Loaded {X_crema.shape[0]} samples from CREMA-D")

In [None]:

X_total = np.concatenate([X_ravdess, X_crema])
y_total = np.concatenate([y_ravdess, y_crema])

print(f"Tổng số samples sau khi gộp: {X_total.shape[0]}")

X_total, y_total = shuffle(X_total, y_total, random_state=42)

lb = LabelBinarizer()
y_onehot = lb.fit_transform(y_total)


X_tr, X_va, y_tr, y_va = train_test_split(
    X_total, y_onehot, test_size=0.2, stratify=y_total, random_state=42
)

X_tr = X_tr[..., np.newaxis]
X_va = X_va[..., np.newaxis]

print(f"Train set: {X_tr.shape}, Validation set: {X_va.shape}")
print(f"Số lượng class: {len(lb.classes_)} - {lb.classes_}")

# Model

In [None]:
import tensorflow as tf

class WaveNetSER(tf.keras.Model):
    def __init__(self, 
                 dilations=[1, 2, 4, 8, 16],
                 filter_width=2,
                 residual_channels=32,
                 dilation_channels=64,
                 skip_channels=32,
                 num_emotions=4,
                 use_biases=True):
        super(WaveNetSER, self).__init__()

        self.dilations = dilations
        self.num_emotions = num_emotions
        self.use_biases = use_biases

        # Causal Convolution Layer
        self.causal_conv = tf.keras.layers.Conv1D(
            filters=residual_channels,
            kernel_size=filter_width,
            padding='causal')

        # Dilated Residual Blocks
        self.residual_blocks = []
        for dilation in dilations:
            block = {
                'conv_filter': tf.keras.layers.Conv1D(
                    filters=dilation_channels,
                    kernel_size=filter_width,
                    dilation_rate=dilation,
                    padding='causal'),
                'conv_gate': tf.keras.layers.Conv1D(
                    filters=dilation_channels,
                    kernel_size=filter_width,
                    dilation_rate=dilation,
                    padding='causal'),
                'dense': tf.keras.layers.Conv1D(
                    filters=residual_channels,
                    kernel_size=1),
                'skip': tf.keras.layers.Conv1D(
                    filters=skip_channels,
                    kernel_size=1),
                'batch_norm': tf.keras.layers.BatchNormalization(),
                'dropout': tf.keras.layers.Dropout(0.3)
            }
            self.residual_blocks.append(block)

        # Post-processing layers
        self.post1 = tf.keras.layers.Conv1D(filters=skip_channels, kernel_size=1)
        self.post2 = tf.keras.layers.Conv1D(filters=num_emotions, kernel_size=1)

    def call(self, inputs, training=False):
        x = self.causal_conv(inputs)

        skip_connections = []
        for block in self.residual_blocks:
            tanh_out = tf.nn.tanh(block['conv_filter'](x))
            sigm_out = tf.nn.sigmoid(block['conv_gate'](x))
            z = tanh_out * sigm_out

            z = block['batch_norm'](z, training=training)
            z = block['dropout'](z, training=training)

            skip = block['skip'](z)
            skip_connections.append(skip)

            x = x + block['dense'](z)

        total = tf.add_n(skip_connections)
        total = tf.nn.relu(total)
        total = self.post1(total)
        total = tf.nn.relu(total)
        logits = self.post2(total)

        # Global Average Pooling để tổng hợp theo thời gian
        logits = tf.reduce_mean(logits, axis=1)
        return logits


In [None]:


# Hyperparameters
BATCH_SIZE = 16
EPOCHS = 30
NUM_EMOTIONS = 4
LEARNING_RATE = 0.001

# Load Dataset (Giả sử dữ liệu đã chuẩn hóa và padding)
train_ds, val_ds = load_data(batch_size=BATCH_SIZE)

# Khởi tạo mô hình
model = WaveNetSER(num_emotions=NUM_EMOTIONS)

# Compile
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

# Train
model.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)

# Save model
model.save("wavenet_ser_model.h5")


In [None]:


def load_data(batch_size=16):
    # Giả lập: Dữ liệu input dạng [batch, time_steps, 1]
    # Label dạng one-hot [batch, num_emotions]
    # Thực tế cần load từ RAVDESS, IEMOCAP,...

    def generator():
        for _ in range(1000):
            yield (tf.random.normal([16000, 1]), tf.one_hot(tf.random.uniform([], 0, 4, dtype=tf.int32), 4))

    dataset = tf.data.Dataset.from_generator(generator, 
                                             output_types=(tf.float32, tf.float32),
                                             output_shapes=((16000,1), (4,)))
    dataset = dataset.batch(batch_size)
    return dataset, dataset  # Tạm thời dùng chung train và val
