<a href="https://colab.research.google.com/github/rcharan05/UGP/blob/main/One_Shot_Sign_Language_Recognition_Improved_Feature_Pooling_and_Fusion_Techniques(PCA%2C_GeM%2C_Vel_Attn).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Mounting the drive, importing all necessary libraries and loading all the data in the needed format

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install -q pose-format

Mounted at /content/drive
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m97.7/97.7 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os, numpy as np, pandas as pd
from pose_format import Pose
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize

In [None]:
DATA_DIR       = "/content/drive/MyDrive/UGP"
VIDEO_POSE_DIR = os.path.join(DATA_DIR, "CISLR_v1.5-a_videos_poses")
I3D_PKL        = os.path.join(DATA_DIR, "I3D_features.pkl")
PROTO_CSV      = os.path.join(DATA_DIR, "prototype.csv")
TEST_CSV       = os.path.join(DATA_DIR, "test.csv")

In [None]:
proto_df = pd.read_csv(PROTO_CSV)
proto_df["gloss"] = proto_df["gloss"].astype(str)
test_df  = pd.read_csv(TEST_CSV)
test_df["gloss"]  = test_df["gloss"].astype(str)
y_tr, y_te = proto_df.gloss.tolist(), test_df.gloss.tolist()

In [None]:
i3d_df = pd.read_pickle(I3D_PKL)
i3d_dict = {row["id"]: np.array(row["I3D_features"], dtype=np.float32)
            for _, row in i3d_df.iterrows()}

Defined the evaluation piece of code to evaluate each model

In [None]:
def topk_from_S(S, y_tr, y_te, k):
    ranks = np.argsort(-S, axis=1)
    return np.mean([ y_te[i] in [y_tr[j] for j in ranks[i,:k]]
                     for i in range(len(y_te)) ]) * 100

#OPTION 1: Improved I3D Feature Extraction via GeM Pooling

In [None]:
def gem_pooling(arr, p=2.0):
    # arr: shape (1024, S)
    return (np.mean(np.power(arr + 1e-6, p), axis=1) + 1e-12) ** (1.0/p)

In [None]:
def improved_i3d_feat_GeM(uid, p=2.0):
    # Extract I3D features: (1024, S)
    arr = i3d_dict[uid].squeeze((0,3,4))
    f = gem_pooling(arr, p)
    f = np.sign(f) * np.sqrt(np.abs(f) + 1e-8)
    return f / np.linalg.norm(f)

In [None]:
# Build features with Option 1 for training and testing
X_imp_GeM = np.stack([improved_i3d_feat_GeM(u) for u in proto_df.uid])
X_imp_te_GeM = np.stack([improved_i3d_feat_GeM(u) for u in test_df.uid])
X_imp_GeM_n = normalize(X_imp_GeM, axis=1)
X_imp_te_GeM_n = normalize(X_imp_te_GeM, axis=1)

In [None]:
# For fusion, use PCA-whiten on GeM features if desired.
pca = PCA(whiten=True, n_components=512).fit(X_imp_GeM)
X_pw = pca.transform(X_imp_GeM)
X_pw_te = pca.transform(X_imp_te_GeM)
S_geM = normalize(X_pw_te, axis=1).dot(normalize(X_pw, axis=1).T)

In [None]:
print("=== Option 1: GeM Pooling on I3D Features ===")
for k in (1, 5, 10):
    print(f"Top-{k}: {topk_from_S(S_geM, y_tr, y_te, k):.2f}%")

=== Option 1: GeM Pooling on I3D Features ===
Top-1: 19.21%
Top-5: 24.60%
Top-10: 27.92%


Experimented with different hyper-parameters, and the best is used below. This is the best model over-all with the best top-1 accuracy.

In [None]:
# For fusion, use PCA-whiten on GeM features if desired.
pca = PCA(whiten=True, n_components=1024).fit(X_imp_GeM)
X_pw = pca.transform(X_imp_GeM)
X_pw_te = pca.transform(X_imp_te_GeM)
S_geM = normalize(X_pw_te, axis=1).dot(normalize(X_pw, axis=1).T)
print("=== Option 1: GeM Pooling on I3D Features ===")
for k in (1, 5, 10):
    print(f"Top-{k}: {topk_from_S(S_geM, y_tr, y_te, k):.2f}%")

=== Option 1: GeM Pooling on I3D Features ===
Top-1: 19.69%
Top-5: 24.64%
Top-10: 27.22%


#OPTION 2: Attention-based Pooling for Pose Velocity Features

In [None]:
import torch
import torch.nn as nn

Attention based pooling module

In [None]:
class AttentionPooling(nn.Module):
    def __init__(self, in_dim, hidden_dim=128):
        super(AttentionPooling, self).__init__()
        self.fc1 = nn.Linear(in_dim, hidden_dim)
        self.tanh = nn.Tanh()
        self.fc2 = nn.Linear(hidden_dim, 1)  # attention score per frame

    def forward(self, x):
        # x: (T, D) where T = number of frames, D = feature dimension per frame
        # Compute attention scores
        attn = self.fc2(self.tanh(self.fc1(x)))  # (T,1)
        attn = torch.softmax(attn, dim=0)         # (T,1) weights sum to 1
        pooled = torch.sum(attn * x, dim=0)         # (D,)
        return pooled

In [None]:
Attention-based Pose Velocity Feature Extraction

In [None]:
def pose_velocity_feat_attention(uid):
    """
    Compute velocity features on face+hands with attention-based pooling.
    - Extract face (indices 33:33+468) and hands (indices 501:501+21 and 522:522+21).
    - Compute framewise differences and then apply attention pooling.
    Final dimension will be the same as the per-frame feature dimension.
    """
    buf = open(os.path.join(VIDEO_POSE_DIR, f"{uid}.pose"), "rb").read()
    p = Pose.read(buf)
    coords = p.body.data.squeeze(1)[...,:2]  # (T,576,2)
    if coords.shape[0] < 2:
        feat = np.zeros(510*2)  # fallback for very short sequences
        return feat / (np.linalg.norm(feat) + 1e-6)
    # face: indices 33:33+468, left-hand: 501:501+21, right-hand: 522:522+21
    face = coords[:,33:33+468]
    lh   = coords[:,501:501+21]
    rh   = coords[:,522:522+21]
    pts  = np.concatenate([face, lh, rh], axis=1)  # (T,510,2)
    # Compute frame-wise differences (velocity)
    diffs = np.linalg.norm(pts[1:] - pts[:-1], axis=2)  # (T-1, 510)
    frame_feats = diffs  # using raw differences; you can also add other stats per frame
    # Convert to torch tensor and apply attention pooling
    frame_feats = torch.from_numpy(frame_feats).float()  # shape: (T-1, 510)
    attn_pool = AttentionPooling(in_dim=frame_feats.shape[1])
    with torch.no_grad():
        pooled = attn_pool(frame_feats)  # (510,)
    pooled_np = pooled.numpy()
    pooled_np = np.sign(pooled_np) * np.sqrt(np.abs(pooled_np) + 1e-8)
    return pooled_np / (np.linalg.norm(pooled_np) + 1e-6)

Compute Attention-based Features in Parallel

In [None]:
import concurrent.futures

def compute_att_feat(uid):
    # This function wraps your existing attention pooling feature computation.
    return pose_velocity_feat_attention(uid)

# Use ThreadPoolExecutor to compute training velocity features in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
    X_vel_att_list = list(executor.map(compute_att_feat, proto_df.uid))
X_vel_att = np.stack(X_vel_att_list)

# Use ThreadPoolExecutor to compute test velocity features in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
    X_vel_att_te_list = list(executor.map(compute_att_feat, test_df.uid))
X_vel_att_te = np.stack(X_vel_att_te_list)

# L2-normalize the features as before
from sklearn.preprocessing import normalize
X_vel_att_n = normalize(X_vel_att, axis=1)
X_vel_att_te_n = normalize(X_vel_att_te, axis=1)

Score Computation and Fusion (Attention-based)

In [None]:
# Use the same PCA-whitened I3D (X_pw, X_pw_te) from above.
S_att = normalize(X_pw_te, axis=1).dot(normalize(X_pw, axis=1).T)  # I3D sim
S_att_vel = normalize(X_vel_att_te_n, axis=1).dot(normalize(X_vel_att_n, axis=1).T)  # velocity sim
# Fuse scores with best fusion weights (for demonstration, let α=0.7, β=0.3)
S_fuse_att = 0.7 * S_att + 0.3 * S_att_vel

In [None]:
print("\n=== Option 2: Attention-based Pooling for Velocity Features ===")
for k in (1, 5, 10):
    print(f"Top-{k}: {topk_from_S(S_fuse_att, y_tr, y_te, k):.2f}%")


=== Option 2: Attention-based Pooling for Velocity Features ===
Top-1: 18.86%
Top-5: 24.29%
Top-10: 27.22%


Baseline Pose Velocity Feature Extraction (Mean + Max)

In [None]:
def pose_velocity_feat(uid):
    """
    Load the pose file for uid (expected shape: (T,1,576,3)), squeeze to (T,576,3),
    extract face and hands landmarks, compute per‐frame velocity (L2 norm of differences),
    then aggregate by computing the mean and maximum across time.
    Finally apply signed‑sqrt and L2‑normalization.

    - Face landmarks: indices 33 to 33+468 (468 points)
    - Left hand: indices 501 to 501+21 (21 points)
    - Right hand: indices 522 to 522+21 (21 points)
    Combined, this gives 510 points.
    The feature dimension is then 2*510 = 1020.
    """
    # Read the pose file
    buf = open(os.path.join(VIDEO_POSE_DIR, f"{uid}.pose"), "rb").read()
    p = Pose.read(buf)
    # Get coordinates: (T,576,3) and take only the first 2 dimensions (x,y)
    coords = p.body.data.squeeze(1)[...,:2]  # (T,576,2)

    # If the sequence is too short, use a zero vector as fallback
    if coords.shape[0] < 2:
        feat = np.zeros(510 * 2)
        return feat / (np.linalg.norm(feat) + 1e-6)

    # Extract face and hands (use indices as per MediaPipe ordering)
    # Face: indices 33:33+468, Left Hand: indices 501:501+21, Right Hand: indices 522:522+21
    face = coords[:, 33:33+468]      # (T,468,2)
    lh   = coords[:, 501:501+21]      # (T,21,2)
    rh   = coords[:, 522:522+21]      # (T,21,2)
    pts  = np.concatenate([face, lh, rh], axis=1)  # (T,510,2)

    # Compute frame-wise differences (velocities)
    diffs = np.linalg.norm(pts[1:] - pts[:-1], axis=2)  # (T-1,510)
    mean_sp = diffs.mean(axis=0)  # (510,)
    max_sp  = diffs.max(axis=0)   # (510,)
    feat = np.concatenate([mean_sp, max_sp])  # (1020,)

    # Apply signed sqrt and L2 normalization
    feat = np.sign(feat) * np.sqrt(np.abs(feat) + 1e-8)
    norm = np.linalg.norm(feat)
    return feat / norm if norm > 0 else feat


In [None]:
X_vel    = np.stack([pose_velocity_feat(u) for u in proto_df.uid])
X_vel_te = np.stack([pose_velocity_feat(u) for u in test_df.uid])

# L2-normalize these velocity features
X_vel_n    = normalize(X_vel, axis=1)
X_vel_te_n = normalize(X_vel_te, axis=1)

# OPTION 3: Nonlinear Fusion via a Small MLP (Late Fusion)


Nonlinear MLP Fusion of Similarity Scores

In [None]:
import torch
import torch.nn as nn

class NonlinearFusionMLP(nn.Module):
    def __init__(self, sim_dim):
        """
        sim_dim: dimension of similarity vector (could be the number of training samples)
        Here we assume we are fusing the flattened scores from two modalities.
        """
        super(NonlinearFusionMLP, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 1)

    def forward(self, s1, s2):
        # s1, s2: similarity scores (scalars) or stacked along dim=1
        # For each query-gallery pair, fuse the two scores.
        x = torch.cat([s1, s2], dim=1)
        x = self.relu(self.fc1(x))
        out = self.fc2(x)
        return out

In [None]:
S_i3d = normalize(X_pw_te, axis=1).dot(normalize(X_pw, axis=1).T)   # I3D similarity from PCA-whitened features
S_vel = normalize(X_vel_te_n, axis=1).dot(normalize(X_vel_n, axis=1).T)  # Velocity similarity (using baseline pose_velocity_feat)

In [None]:
fusion_model = NonlinearFusionMLP(sim_dim=2).cuda()

# Prepare the scores as torch tensors
S_i3d_t = torch.from_numpy(S_i3d).float().cuda()
S_vel_t = torch.from_numpy(S_vel).float().cuda()

In [None]:
with torch.no_grad():
    # Concatenate similarity scores then reshape to (-1,2)
    fuse_input = torch.cat([S_i3d_t.unsqueeze(2), S_vel_t.unsqueeze(2)], dim=2).view(-1, 2)
    # Split the input into two tensors along the columns
    s1 = fuse_input[:, 0].unsqueeze(1)  # (N, 1)
    s2 = fuse_input[:, 1].unsqueeze(1)  # (N, 1)
    fused_scores = fusion_model(s1, s2)
    S_fused_mlp = fused_scores.view(S_i3d.shape)

S_fused_mlp = S_fused_mlp.cpu().numpy()

In [None]:
print("\n=== Option 3: Nonlinear Fusion with MLP ===")
for k in (1, 5, 10):
    print(f"Top-{k}: {topk_from_S(S_fused_mlp, y_tr, y_te, k):.2f}%")


=== Option 3: Nonlinear Fusion with MLP ===
Top-1: 0.00%
Top-5: 0.09%
Top-10: 0.09%


Hyperparameter Search for GeM Pooling (for general use)

In [None]:
# Hyperparameter search for GeM pooling
p_values = [1.0, 2.0, 3.0, 4.0, 5.0]
best_p = None
best_acc = 0.0

for p in p_values:
    X_imp_GeM = np.stack([improved_i3d_feat_GeM(u, p=p) for u in proto_df.uid])
    X_imp_te_GeM = np.stack([improved_i3d_feat_GeM(u, p=p) for u in test_df.uid])
    # You can choose to use PCA-whitened features on top
    pca = PCA(whiten=True, n_components=512).fit(X_imp_GeM)
    X_pw = pca.transform(X_imp_GeM)
    X_pw_te = pca.transform(X_imp_te_GeM)
    S_temp = normalize(X_pw_te, axis=1).dot(normalize(X_pw, axis=1).T)
    acc1 = topk_from_S(S_temp, y_tr, y_te, 1)
    print(f"p={p} Top-1: {acc1:.2f}%")
    if acc1 > best_acc:
        best_acc, best_p = acc1, p

print(f"Best GeM pooling p: {best_p} with Top-1: {best_acc:.2f}%")

p=1.0 Top-1: 19.26%
p=2.0 Top-1: 19.30%
p=3.0 Top-1: 18.91%
p=4.0 Top-1: 18.86%
p=5.0 Top-1: 18.86%
Best GeM pooling p: 2.0 with Top-1: 19.30%
