In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import shutil
import os

src_dir = '/content/drive/MyDrive/facial_fuse/label-distribution-learning-fer-tf'
dst_dir = '/content/facial_fuse/label-distribution-learning-fer-tf'

os.makedirs(os.path.dirname(dst_dir), exist_ok=True)

shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)

print(" Folder copied to /content/facial_fuse/label-distribution-learning-fer-tf")

FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/facial_fuse/label-distribution-learning-fer-tf'

In [2]:
import pickle
import numpy as np
from PIL import Image

with open('/content/drive/MyDrive/facial_fuse/train_data.pkl', 'rb') as f:
    train_data = pickle.load(f)

with open('/content/drive/MyDrive/facial_fuse/test_data.pkl', 'rb') as f:
    test_data = pickle.load(f)

print(f" Loaded {len(train_data)} training samples.")
print(f"Loaded {len(test_data)} testing samples.")
print("First train sample keys:", train_data[0].keys())

sample = train_data[0]
print("\nSample Structure:")
for k, v in sample.items():
    print(f"{k}: {v}")

# === Optional: Check Landmark Shape ===
import numpy as np
landmark = np.load(sample['npy'])
print(f"\nLandmark shape: {landmark.shape}")

# === Check Image Shape ===
img = Image.open(sample['img']).convert('RGB')
img_array = np.array(img)
print(f"Image shape: {img_array.shape}")  # Expecting (H, W, 3)

FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/facial_fuse/train_data.pkl'

In [3]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

def preview_samples(data_list, n=4):
    plt.figure(figsize=(n * 3, 3))
    for i in range(min(n, len(data_list))):
        sample = data_list[i]
        img = Image.open(sample['img']).convert('RGB')
        img_array = np.array(img)

        lmk = np.load(sample['npy'])

        plt.subplot(1, n, i + 1)
        plt.imshow(img_array)
        if lmk.shape[1] >= 2:
            plt.scatter(lmk[:, 0], lmk[:, 1], s=1, c='red')
        plt.title(f"Label: {sample['label']}")
        plt.axis('off')
    plt.tight_layout()
    plt.show()

preview_samples(train_data, n=4)


NameError: name 'train_data' is not defined

In [4]:
import tensorflow as tf
import numpy as np
from PIL import Image
from tqdm import tqdm

mlp_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5')
resnet_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/resnet50.h5')

resnet_backbone.trainable = True
mlp_backbone.trainable = True

class ConfidenceLateFusionModel(tf.keras.Model):
    def __init__(self, image_backbone, lmk_backbone, num_classes=7):
        super().__init__()
        self.image_backbone = image_backbone
        self.lmk_backbone = lmk_backbone
        self.shared_fc = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.3)
        ])
        self.class_head = tf.keras.layers.Dense(num_classes, activation='softmax')
        self.conf_head = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs, training=False):
        image_input, landmark_input = inputs
        img_feat = self.image_backbone(image_input, training=training)
        lmk_feat = self.lmk_backbone(landmark_input, training=training)
        fused = tf.concat([img_feat, lmk_feat], axis=-1)
        x = self.shared_fc(fused, training=training)
        p = self.class_head(x, training=training)
        c = self.conf_head(x, training=training)
        return p, tf.squeeze(c, axis=-1)

model = ConfidenceLateFusionModel(resnet_backbone, mlp_backbone)

NUM_CLASSES = 7

def preprocess_confidence_sample(sample):
    img = Image.open(sample['img']).convert('RGB').resize((112, 112))
    img_arr = np.array(img).astype(np.float32)
    img_arr = (img_arr - 127.5) / 127.5
    img_arr = np.transpose(img_arr, (2, 0, 1))

    lmk = np.load(sample['npy']).astype(np.float32).flatten()

    label = np.zeros(NUM_CLASSES, dtype=np.float32)
    label[sample['label']] = 1.0

    return img_arr, lmk, label

def dual_input_generator(data_list):
    for sample in data_list:
        img, lmk, label = preprocess_confidence_sample(sample)
        yield (img, lmk), label

train_dataset = tf.data.Dataset.from_generator(
    lambda: dual_input_generator(train_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(NUM_CLASSES,), dtype=tf.float32)
    )
)

BATCH_SIZE = 32
train_dataset = train_dataset.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

def full_confidence_loss(y_true, y_pred, confidence, epoch, reg_weight=0.01, max_conf_weight=1.0):
    kl = tf.keras.losses.KLDivergence(reduction=tf.keras.losses.Reduction.NONE)
    per_sample_kl = kl(y_true, y_pred)
    warmup_factor = tf.minimum(tf.exp(epoch / 2.0) / 10.0, 1.0)
    weighted_kl = tf.reduce_mean((warmup_factor * tf.stop_gradient(confidence) + (1 - warmup_factor)) * per_sample_kl)
    reg_term = tf.reduce_mean((1.0 - confidence) ** 2)
    return weighted_kl + reg_weight * reg_term

train_loss = tf.keras.metrics.Mean()
train_acc = tf.keras.metrics.CategoricalAccuracy()

EPOCHS = 10

for epoch in range(EPOCHS):
    train_loss.reset_state()
    train_acc.reset_state()
    print(f"\n Epoch {epoch+1}/{EPOCHS}")

    for batch in tqdm(train_dataset):
        (img_batch, lmk_batch), label_batch = batch

        with tf.GradientTape() as tape:
            pred_dist, conf = model((img_batch, lmk_batch), training=True)
            loss = full_confidence_loss(label_batch, pred_dist, conf, epoch)

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        train_loss.update_state(loss)
        train_acc.update_state(label_batch, pred_dist)

    print(f" Loss: {train_loss.result():.4f} |  Accuracy: {train_acc.result():.4f}")


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = '/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [5]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import tensorflow as tf
from PIL import Image
def preprocess_eval_sample(sample):
    img = Image.open(sample['img']).convert('RGB').resize((112, 112))
    img_arr = np.array(img).astype(np.float32)
    img_arr = (img_arr - 127.5) / 127.5
    img_arr = np.transpose(img_arr, (2, 0, 1))

    lmk = np.load(sample['npy']).astype(np.float32).flatten()
    return img_arr, lmk, sample['label']

def eval_generator(data_list):
    for sample in data_list:
        img, lmk, label = preprocess_eval_sample(sample)
        yield (img, lmk), label

eval_dataset = tf.data.Dataset.from_generator(
    lambda: eval_generator(test_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(), dtype=tf.int32)
    )
).batch(32).prefetch(tf.data.AUTOTUNE)

all_preds, all_labels, all_confs, all_features = [], [], [], []

for (img_batch, lmk_batch), labels in eval_dataset:
    pred_dist, conf = model((img_batch, lmk_batch), training=False)
    preds = tf.argmax(pred_dist, axis=1)

    all_preds.extend(preds.numpy())
    all_labels.extend(labels.numpy())
    all_confs.extend(conf.numpy())

    img_feat = model.image_backbone(img_batch, training=False)
    lmk_feat = model.lmk_backbone(lmk_batch, training=False)
    fused_feat = tf.concat([img_feat, lmk_feat], axis=-1)
    all_features.extend(fused_feat.numpy())

class_names = ['Neutral', 'Happy', 'Sad', 'Surprise', 'Fear', 'Disgust', 'Anger']
all_preds_np = np.array(all_preds)
all_labels_np = np.array(all_labels)

acc = accuracy_score(all_labels_np, all_preds_np)
conf_matrix = confusion_matrix(all_labels_np, all_preds_np)
per_class_acc = conf_matrix.diagonal() / np.maximum(conf_matrix.sum(axis=1), 1)

plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Emotion')
plt.ylabel('True Emotion')
plt.gca().invert_yaxis()
plt.show()

tsne = TSNE(n_components=2, random_state=42)
tsne_proj = tsne.fit_transform(np.array(all_features))

plt.figure(figsize=(8, 6))
scatter = plt.scatter(tsne_proj[:, 0], tsne_proj[:, 1], c=all_labels_np, cmap='tab10', alpha=0.6)
handles, _ = scatter.legend_elements()
plt.legend(handles, class_names, title='Emotions')
plt.title('t-SNE of Fused Features')
plt.show()

print(f" Accuracy: {acc:.4f}")
print(" Per-class Accuracy:")
for i, a in enumerate(per_class_acc):
    print(f"  {class_names[i]}: {a:.4f}")

print("\n Classification Report:")
print(classification_report(all_labels_np, all_preds_np, target_names=class_names))

print("\n Confidence stats:")
print(f"Min: {np.min(all_confs):.4f}, Max: {np.max(all_confs):.4f}, Mean: {np.mean(all_confs):.4f}")


UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_3_device_/job:localhost/replica:0/task:0/device:CPU:0}} NameError: name 'test_data' is not defined
Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 865, in get_iterator
    return self._iterators[iterator_id]
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^

KeyError: np.int64(0)


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    ret = func(*args)
          ^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/from_generator_op.py", line 198, in generator_py_func
    values = next(generator_state.get_iterator(iterator_id))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 867, in get_iterator
    iterator = iter(self._generator(*self._args.pop(iterator_id)))
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "<ipython-input-5-2535687053>", line 23, in <lambda>
    lambda: eval_generator(test_data),
                           ^^^^^^^^^

NameError: name 'test_data' is not defined


	 [[{{node PyFunc}}]] [Op:IteratorGetNext] name: 

In [6]:

model.save_weights('/content/confidence_latefusion.weights.h5')
print("Model weights saved to 'confidence_latefusion.weights.h5'")


restored_model = ConfidenceLateFusionModel(resnet_backbone, mlp_backbone)

dummy_img = tf.random.normal((1, 3, 112, 112))
dummy_lmk = tf.random.normal((1, 1434))
restored_model((dummy_img, dummy_lmk), training=False)

restored_model.load_weights('/content/confidence_latefusion.weights.h5')
print("Weights successfully reloaded into restored_model")

NameError: name 'model' is not defined

In [7]:

# model.save("conf_head.h5")

# from google.colab import files
# files.download("conf_head.h5")

In [None]:
%pip install tensorflow-addons


In [8]:
import tensorflow as tf
import numpy as np
from PIL import Image
from tqdm import tqdm

def pairwise_distance(embeddings, squared=False):
    dot = tf.matmul(embeddings, tf.transpose(embeddings))
    square_norm = tf.linalg.diag_part(dot)
    distances = tf.expand_dims(square_norm, 0) - 2.0 * dot + tf.expand_dims(square_norm, 1)
    distances = tf.maximum(distances, 0.0)
    if not squared:
        mask = tf.cast(tf.equal(distances, 0.0), tf.float32)
        distances = distances + mask * 1e-16
        distances = tf.sqrt(distances)
        distances = distances * (1.0 - mask)
    return distances

def custom_triplet_semihard_loss(labels, embeddings, margin=0.2):
    labels = tf.reshape(labels, [-1, 1])
    pdist_matrix = pairwise_distance(embeddings)

    adjacency = tf.equal(labels, tf.transpose(labels))
    adjacency_not = tf.logical_not(adjacency)

    def compute_loss(i):
        dist_i = pdist_matrix[i]
        label_i = labels[i]

        pos_mask = adjacency[i]
        neg_mask = adjacency_not[i]

        pos_dists = tf.boolean_mask(dist_i, pos_mask)
        neg_dists = tf.boolean_mask(dist_i, neg_mask)

        def per_pos_loss(pos_dist):
            semi_hard_negatives = tf.boolean_mask(neg_dists, neg_dists > pos_dist)
            semi_hard_negatives = tf.boolean_mask(semi_hard_negatives, semi_hard_negatives < pos_dist + margin)
            if tf.size(semi_hard_negatives) > 0:
                neg_dist = tf.reduce_min(semi_hard_negatives)
                return tf.maximum(pos_dist - neg_dist + margin, 0.0)
            else:
                return 0.0

        losses = tf.map_fn(per_pos_loss, pos_dists, dtype=tf.float32)
        return tf.reduce_mean(losses) if tf.size(losses) > 0 else 0.0

    losses = tf.map_fn(compute_loss, tf.range(tf.shape(embeddings)[0]), dtype=tf.float32)
    return tf.reduce_mean(losses)

mlp_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5')
resnet_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/resnet50.h5')

resnet_backbone.trainable = True
mlp_backbone.trainable = True

class FullModel(tf.keras.Model):
    def __init__(self, image_backbone, lmk_backbone, num_classes=7):
        super().__init__()
        self.image_backbone = image_backbone
        self.lmk_backbone = lmk_backbone
        self.shared_fc = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.3)
        ])
        self.class_head = tf.keras.layers.Dense(num_classes, activation='softmax')
        self.conf_head = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs, training=False, return_embedding=False):
        image_input, landmark_input = inputs
        img_feat = self.image_backbone(image_input, training=training)
        lmk_feat = self.lmk_backbone(landmark_input, training=training)
        fused = tf.concat([img_feat, lmk_feat], axis=-1)
        x = self.shared_fc(fused, training=training)
        if return_embedding:
            return tf.math.l2_normalize(x, axis=-1)
        p = self.class_head(x, training=training)
        c = self.conf_head(x, training=training)
        return p, tf.squeeze(c, axis=-1)

model = FullModel(resnet_backbone, mlp_backbone)

NUM_CLASSES = 7

def preprocess_sample(sample):
    img = Image.open(sample['img']).convert('RGB').resize((112, 112))
    img = (np.array(img).astype(np.float32) - 127.5) / 127.5
    img = np.transpose(img, (2, 0, 1))
    lmk = np.load(sample['npy']).astype(np.float32).flatten()
    label = np.zeros(NUM_CLASSES, dtype=np.float32)
    label[sample['label']] = 1.0
    return img, lmk, label

def dual_input_generator(data_list):
    for sample in data_list:
        img, lmk, label = preprocess_sample(sample)
        yield (img, lmk), label

train_dataset = tf.data.Dataset.from_generator(
    lambda: dual_input_generator(train_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(NUM_CLASSES,), dtype=tf.float32)
    )
).shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

kl_loss_fn = tf.keras.losses.KLDivergence(reduction=tf.keras.losses.Reduction.NONE)

def confidence_weighted_kl(y_true, y_pred, confidence, epoch, reg_weight=0.01):
    warmup = tf.minimum(tf.exp(epoch / 2.0) / 10.0, 1.0)
    per_sample_kl = kl_loss_fn(y_true, y_pred)
    weighted = (warmup * tf.stop_gradient(confidence) + (1 - warmup)) * per_sample_kl
    kl_term = tf.reduce_mean(weighted)
    reg_term = tf.reduce_mean((1.0 - confidence) ** 2)
    return kl_term + reg_weight * reg_term, kl_term

def train_model(dataset, epochs=10, base_triplet_lambda=0.1):
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
    acc_metric = tf.keras.metrics.CategoricalAccuracy()

    print("Batch sample structure verified.")

    for epoch in range(epochs):
        acc_metric.reset_state()
        print(f"\n Epoch {epoch+1}/{epochs}")
        total_loss = 0
        step = 0

        triplet_lambda = min(epoch / 5.0, 1.0) * base_triplet_lambda

        for batch in tqdm(dataset):
            (img_batch, lmk_batch), label_batch = batch

            with tf.GradientTape() as tape:
                pred_dist, conf = model((img_batch, lmk_batch), training=True)
                embeddings = model((img_batch, lmk_batch), training=True, return_embedding=True)

                hard_labels = tf.argmax(label_batch, axis=1)
                triplet_loss = custom_triplet_semihard_loss(hard_labels, embeddings)

                total_kl, kl_only = confidence_weighted_kl(label_batch, pred_dist, conf, epoch)
                loss = total_kl + triplet_lambda * triplet_loss

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

            total_loss += loss.numpy()
            acc_metric.update_state(label_batch, pred_dist)

            if step % 20 == 0:
                print(f" Step {step} | KL: {kl_only.numpy():.4f} | Triplet: {triplet_loss.numpy():.4f} | Conf µ: {tf.reduce_mean(conf).numpy():.3f}")
            step += 1

        print(f"📉 Loss: {total_loss/step:.4f} | 🎯 Accuracy: {acc_metric.result().numpy():.4f}")

train_model(train_dataset, epochs=10, base_triplet_lambda=0.1)


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = '/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [9]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.manifold import TSNE
from tqdm import tqdm

class_names = ['Neutral', 'Happy', 'Sad', 'Surprise', 'Fear', 'Disgust', 'Anger']

test_dataset = tf.data.Dataset.from_generator(
    lambda: dual_input_generator(test_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(NUM_CLASSES,), dtype=tf.float32)
    )
).batch(32).prefetch(tf.data.AUTOTUNE)

all_preds = []
all_labels = []
all_embeddings = []
acc_metric = tf.keras.metrics.CategoricalAccuracy()


print(" Running Evaluation...")
for (img_batch, lmk_batch), label_batch in tqdm(test_dataset):
    pred_dist, _ = model((img_batch, lmk_batch), training=False)
    embeddings = model((img_batch, lmk_batch), training=False, return_embedding=True)

    all_preds.extend(np.argmax(pred_dist.numpy(), axis=1))
    all_labels.extend(np.argmax(label_batch.numpy(), axis=1))
    all_embeddings.append(embeddings.numpy())
    acc_metric.update_state(label_batch, pred_dist)

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_embeddings = np.vstack(all_embeddings)

print(" Classification Report:\n")
print(classification_report(all_labels, all_preds, target_names=class_names, digits=4))

conf_mat = confusion_matrix(all_labels, all_preds)
per_class_acc = conf_mat.diagonal() / conf_mat.sum(axis=1)
print("\n Per-Class Accuracy (Recall):")
for i, acc in enumerate(per_class_acc):
    print(f"{class_names[i]:<10}: {acc:.4f}")

plt.figure(figsize=(8, 6))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix Heatmap')
plt.tight_layout()
plt.show()

print("🔍 Computing t-SNE Embedding...")
tsne = TSNE(n_components=2, random_state=42)
tsne_proj = tsne.fit_transform(all_embeddings)

plt.figure(figsize=(8, 6))
scatter = plt.scatter(tsne_proj[:, 0], tsne_proj[:, 1], c=all_labels, cmap='tab10', alpha=0.6)
handles, _ = scatter.legend_elements()
plt.legend(handles, class_names, title='Emotions', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('t-SNE of Fused Embeddings')
plt.xlabel('t-SNE Dimension 1')
plt.ylabel('t-SNE Dimension 2')
plt.tight_layout()
plt.show()

print(f"\n Final Test Accuracy: {acc_metric.result().numpy():.4f}")


NameError: name 'NUM_CLASSES' is not defined

In [None]:
# train_model(train_dataset, epochs=10, lambda_triplet=0.1)


In [10]:

model.save_weights("full_model.weights.h5")
print("Model saved as 'full_model_weights.h5'")

from google.colab import files
files.download("full_model.weights.h5")

NameError: name 'model' is not defined

In [11]:
import tensorflow as tf
import numpy as np
from PIL import Image
from tqdm import tqdm

def pairwise_distance(embeddings, squared=False):
    dot = tf.matmul(embeddings, tf.transpose(embeddings))
    square_norm = tf.linalg.diag_part(dot)
    distances = tf.expand_dims(square_norm, 0) - 2.0 * dot + tf.expand_dims(square_norm, 1)
    distances = tf.maximum(distances, 0.0)
    if not squared:
        mask = tf.cast(tf.equal(distances, 0.0), tf.float32)
        distances = distances + mask * 1e-16
        distances = tf.sqrt(distances)
        distances = distances * (1.0 - mask)
    return distances

def custom_triplet_semihard_loss(labels, embeddings, margin=0.2):
    labels = tf.reshape(labels, [-1, 1])
    pdist_matrix = pairwise_distance(embeddings)

    adjacency = tf.equal(labels, tf.transpose(labels))
    adjacency_not = tf.logical_not(adjacency)

    def compute_loss(i):
        dist_i = pdist_matrix[i]
        label_i = labels[i]

        pos_mask = adjacency[i]
        neg_mask = adjacency_not[i]

        pos_dists = tf.boolean_mask(dist_i, pos_mask)
        neg_dists = tf.boolean_mask(dist_i, neg_mask)

        def per_pos_loss(pos_dist):
            semi_hard_negatives = tf.boolean_mask(neg_dists, neg_dists > pos_dist)
            semi_hard_negatives = tf.boolean_mask(semi_hard_negatives, semi_hard_negatives < pos_dist + margin)
            if tf.size(semi_hard_negatives) > 0:
                neg_dist = tf.reduce_min(semi_hard_negatives)
                return tf.maximum(pos_dist - neg_dist + margin, 0.0)
            else:
                return 0.0

        losses = tf.map_fn(per_pos_loss, pos_dists, dtype=tf.float32)
        return tf.reduce_mean(losses) if tf.size(losses) > 0 else 0.0

    losses = tf.map_fn(compute_loss, tf.range(tf.shape(embeddings)[0]), dtype=tf.float32)
    return tf.reduce_mean(losses)


mlp_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5')
resnet_backbone = tf.keras.models.load_model('/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/resnet50.h5')

resnet_backbone.trainable = True
mlp_backbone.trainable = True

class ContrastiveLossModel(tf.keras.Model):
    def __init__(self, image_backbone, lmk_backbone, num_classes=7):
        super().__init__()
        self.image_backbone = image_backbone
        self.lmk_backbone = lmk_backbone
        self.shared_fc = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.3)
        ])
        self.class_head = tf.keras.layers.Dense(num_classes, activation='softmax')

    def call(self, inputs, training=False, return_embedding=False):
        image_input, landmark_input = inputs
        img_feat = self.image_backbone(image_input, training=training)
        lmk_feat = self.lmk_backbone(landmark_input, training=training)
        fused = tf.concat([img_feat, lmk_feat], axis=-1)
        x = self.shared_fc(fused, training=training)
        if return_embedding:
            return tf.math.l2_normalize(x, axis=-1)
        return self.class_head(x, training=training)

model = ContrastiveLossModel(resnet_backbone, mlp_backbone)

NUM_CLASSES = 7

def preprocess_sample(sample):
    img = Image.open(sample['img']).convert('RGB').resize((112, 112))
    img = (np.array(img).astype(np.float32) - 127.5) / 127.5
    img = np.transpose(img, (2, 0, 1))
    lmk = np.load(sample['npy']).astype(np.float32).flatten()
    label = np.zeros(NUM_CLASSES, dtype=np.float32)
    label[sample['label']] = 1.0
    return img, lmk, label

def dual_input_generator(data_list):
    for sample in data_list:
        img, lmk, label = preprocess_sample(sample)
        yield (img, lmk), label

train_dataset = tf.data.Dataset.from_generator(
    lambda: dual_input_generator(train_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(NUM_CLASSES,), dtype=tf.float32)
    )
).shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

kl_loss_fn = tf.keras.losses.KLDivergence(reduction=tf.keras.losses.Reduction.NONE)

def weighted_kl(y_true, y_pred, epoch, reg_weight=0.01):
    warmup = tf.minimum(tf.exp(epoch / 2.0) / 10.0, 1.0)
    per_sample_kl = kl_loss_fn(y_true, y_pred)
    weighted = warmup * per_sample_kl
    kl_term = tf.reduce_mean(weighted)
    return kl_term, kl_term

def train_model(dataset, epochs=10, base_triplet_lambda=0.1):
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
    acc_metric = tf.keras.metrics.CategoricalAccuracy()

    print(" Batch sample structure verified.")

    for epoch in range(epochs):
        acc_metric.reset_state()
        print(f"\n Epoch {epoch+1}/{epochs}")
        total_loss = 0
        step = 0

        triplet_lambda = min(epoch / 5.0, 1.0) * base_triplet_lambda

        for batch in tqdm(dataset):
            (img_batch, lmk_batch), label_batch = batch

            with tf.GradientTape() as tape:
                pred_dist = model((img_batch, lmk_batch), training=True)
                embeddings = model((img_batch, lmk_batch), training=True, return_embedding=True)

                hard_labels = tf.argmax(label_batch, axis=1)
                triplet_loss = custom_triplet_semihard_loss(hard_labels, embeddings)

                total_kl, kl_only = weighted_kl(label_batch, pred_dist, epoch)
                loss = total_kl + triplet_lambda * triplet_loss

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

            total_loss += loss.numpy()
            acc_metric.update_state(label_batch, pred_dist)

            if step % 20 == 0:
                print(f" Step {step} | KL: {kl_only.numpy():.4f} | Triplet: {triplet_loss.numpy():.4f}")
            step += 1

        print(f"📉 Loss: {total_loss/step:.4f} | 🎯 Accuracy: {acc_metric.result().numpy():.4f}")

train_model(train_dataset, epochs=10, base_triplet_lambda=0.1)


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = '/content/facial_fuse/label-distribution-learning-fer-tf/pretrained/mlp_landmark_backbone.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [12]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.manifold import TSNE
from tqdm import tqdm


class_names = ['Neutral', 'Happy', 'Sad', 'Surprise', 'Fear', 'Disgust', 'Anger']
NUM_CLASSES = len(class_names)


test_dataset = tf.data.Dataset.from_generator(
    lambda: dual_input_generator(test_data),
    output_signature=(
        (tf.TensorSpec(shape=(3, 112, 112), dtype=tf.float32),
         tf.TensorSpec(shape=(1434,), dtype=tf.float32)),
        tf.TensorSpec(shape=(NUM_CLASSES,), dtype=tf.float32)
    )
).batch(32).prefetch(tf.data.AUTOTUNE)

all_preds = []
all_labels = []
all_embeddings = []
acc_metric = tf.keras.metrics.CategoricalAccuracy()

print("Running Evaluation...")
for (img_batch, lmk_batch), label_batch in tqdm(test_dataset):
    pred_dist = model((img_batch, lmk_batch), training=False)
    embeddings = model((img_batch, lmk_batch), training=False, return_embedding=True)

    all_preds.extend(np.argmax(pred_dist.numpy(), axis=1))
    all_labels.extend(np.argmax(label_batch.numpy(), axis=1))
    all_embeddings.append(embeddings.numpy())
    acc_metric.update_state(label_batch, pred_dist)

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_embeddings = np.vstack(all_embeddings)

print(" Classification Report:\n")
print(classification_report(all_labels, all_preds, target_names=class_names, digits=4))

conf_mat = confusion_matrix(all_labels, all_preds)
per_class_acc = conf_mat.diagonal() / np.maximum(conf_mat.sum(axis=1), 1)
print("\n Per-Class Accuracy (Recall):")
for i, acc in enumerate(per_class_acc):
    print(f"{class_names[i]:<10}: {acc:.4f}")

plt.figure(figsize=(8, 6))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix Heatmap')
plt.tight_layout()
plt.show()


print(" Computing t-SNE Embedding...")
tsne = TSNE(n_components=2, random_state=42)
tsne_proj = tsne.fit_transform(all_embeddings)

plt.figure(figsize=(8, 6))
scatter = plt.scatter(tsne_proj[:, 0], tsne_proj[:, 1], c=all_labels, cmap='tab10', alpha=0.6)
handles, _ = scatter.legend_elements()
plt.legend(handles, class_names, title='Emotions', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('t-SNE of Fused Embeddings')
plt.xlabel('t-SNE Dimension 1')
plt.ylabel('t-SNE Dimension 2')
plt.tight_layout()
plt.show()

print(f"\n Final Test Accuracy (Top-1): {acc_metric.result().numpy():.4f}")


Running Evaluation...


0it [00:00, ?it/s]


UnknownError: {{function_node __wrapped__IteratorGetNext_output_types_3_device_/job:localhost/replica:0/task:0/device:CPU:0}} NameError: name 'dual_input_generator' is not defined
Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 865, in get_iterator
    return self._iterators[iterator_id]
           ~~~~~~~~~~~~~~~^^^^^^^^^^^^^

KeyError: np.int64(0)


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/ops/script_ops.py", line 269, in __call__
    ret = func(*args)
          ^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/autograph/impl/api.py", line 643, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/from_generator_op.py", line 198, in generator_py_func
    values = next(generator_state.get_iterator(iterator_id))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 867, in get_iterator
    iterator = iter(self._generator(*self._args.pop(iterator_id)))
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "<ipython-input-12-384913227>", line 15, in <lambda>
    lambda: dual_input_generator(test_data),
           ^^^^^^^^^^^^^^^^^^^^

NameError: name 'dual_input_generator' is not defined


	 [[{{node PyFunc}}]] [Op:IteratorGetNext] name: 

In [13]:

model.save_weights("contra_loss_only.weights.h5")
print(" Model saved as 'contra_loss_only.weights.h5'")

from google.colab import files
files.download("contra_loss_only.weights.h5")

NameError: name 'model' is not defined