In [1]:
# TODO : CTC Loss
from typing import List

import cv2
import gdown
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, Lambda

#import tensorflow as tf

In [1]:
import os
import cv2
import dlib
import numpy as np
from tqdm import tqdm

In [46]:
# np.save('data.npy', data)
# np.save('labels.npy', labels)

In [2]:
df3 = pd.read_csv("face_landmarks_data3.csv", index_col=0)
df4 = pd.read_csv("face_landmarks_data4.csv", index_col=0)
df5 = pd.read_csv("face_landmarks_data5.csv", index_col=0)
df = pd.concat([df3, df4, df5], ignore_index=True)

In [4]:
video_path_counts = df['video_path'].value_counts()

video_paths_to_keep = video_path_counts[video_path_counts == 3000].index

df = df[df['video_path'].isin(video_paths_to_keep)]

df_filtered = df.copy()
chunks = []
num_frames_per_chunk = 3000
for video_path, group in df_filtered.groupby('video_path'):
    num_frames = len(group)
    num_chunks = num_frames // num_frames_per_chunk
    for i in range(num_chunks):
        chunk = group.iloc[i*num_frames_per_chunk:(i+1)*num_frames_per_chunk]
        chunk_reshaped = chunk[['x', 'y', 'z']].values.reshape(-1, 75, 40*3)
        chunks.append(chunk_reshaped)

input_data = np.concatenate(chunks, axis=0)

In [6]:
input_data.shape

(2977, 75, 120)

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split

In [8]:
def create_vocab():
    vocab = "abcdefghijklmnopqrstuvwxyz123456789 "
    return vocab


def char_to_int(char):
    # shift 1 
    vocab = "abcdefghijklmnopqrstuvwxyz123456789 "
    return vocab.index(char) + 1 if char in vocab else -1

def int_to_char(index):
    # shift 1 
    vocab = "abcdefghijklmnopqrstuvwxyz123456789 "
    return vocab[index - 1] if 1 <= index <= len(vocab) else ''

def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens.extend([*line[2]])
            tokens.append(' ')
    return [char_to_int(token) for token in tokens]

all_alignments = []
for video_path in df['video_path'].unique():
    datapath = video_path.split('/')[0]
    speaker_path = video_path.split('/')[-1].split('\\')[0]
    vid_path = video_path.split('/')[-1].split('\\')[-1].split('.')[0]
    
    alignment_path = os.path.join(f'{datapath}','align',f'{vid_path}.align')
    alignments = load_alignments(alignment_path) 
    all_alignments.append(alignments)


In [14]:
max_len = max(len(label) for label in all_alignments)
label_data = [np.pad(label, (0, max_len - len(label)), 'constant', constant_values=0) for label in all_alignments]

# Convert label_data to numpy array
label_data = np.array(label_data)

In [15]:
label_data

array([[ 2,  9, 14, ...,  0,  0,  0],
       [ 2,  9, 14, ...,  0,  0,  0],
       [ 2,  9, 14, ...,  0,  0,  0],
       ...,
       [19,  5, 20, ...,  0,  0,  0],
       [19,  5, 20, ...,  0,  0,  0],
       [19,  5, 20, ...,  0,  0,  0]])

In [18]:
input_data_tensor = torch.tensor(input_data, dtype=torch.float32)
input_data_tensor.shape

torch.Size([2977, 75, 120])

In [21]:
label_data_tensor = torch.tensor(label_data, dtype=torch.long)

In [23]:
dataset = TensorDataset(input_data_tensor, label_data_tensor)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [31]:
vocab = create_vocab()


36

In [42]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

class LipReadingRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LipReadingRNN, self).__init__()
        self.hidden_size = hidden_size
        
        self.input_layer = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.LayerNorm(hidden_size),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.LayerNorm(hidden_size),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.LayerNorm(hidden_size)
        )
        
        self.bilstm = nn.LSTM(hidden_size, hidden_size, num_layers=3, batch_first=True, bidirectional=True, dropout=0.3)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        
        self._initialize_weights()
    
    def _initialize_weights(self):
        for layer in self.input_layer:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)
        
        for name, param in self.bilstm.named_parameters():
            if 'weight' in name:
                nn.init.xavier_uniform_(param)
            elif 'bias' in name:
                nn.init.zeros_(param)
                
        nn.init.xavier_uniform_(self.fc.weight)
        nn.init.zeros_(self.fc.bias)

    def forward(self, x):
        x = self.input_layer(x)  
        h0 = torch.zeros(6, x.size(0), self.hidden_size).to(x.device)  # 6 for 3 layers bidirectional
        c0 = torch.zeros(6, x.size(0), self.hidden_size).to(x.device)
        
        out, _ = self.bilstm(x, (h0, c0))
        out = self.dropout(out)
        out = self.fc(out)  
        
        return out

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

input_size = 120
hidden_size = 256
num_classes = len(vocab) + 1
model = LipReadingRNN(input_size, hidden_size, num_classes).to(device)
criterion = nn.CTCLoss(blank=0, reduction='mean', zero_infinity=True)
optimizer = optim.AdamW(model.parameters(), lr=1e-4)
scheduler = StepLR(optimizer, step_size=200, gamma=0.1)


num_epochs = 100
model.train()
for epoch in range(num_epochs):
    total_loss = 0
    model.train()
    for sequences, labels in train_loader:
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        outputs = model(sequences)
        
        outputs = outputs.permute(1, 0, 2)
        input_lengths = torch.full((sequences.size(0),), sequences.size(1), dtype=torch.long)
        target_lengths = torch.tensor([label[label != 0].size(0) for label in labels], dtype=torch.long)
        loss = criterion(outputs.log_softmax(2), labels, input_lengths, target_lengths)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()

    scheduler.step()

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for sequences, labels in test_loader:
            sequences = sequences.to(device)
            labels = labels.to(device)
            sequences = sequences.view(sequences.size(0), sequences.size(1), -1) # (batch_size, seq_len, input_size)

            outputs = model(sequences)

            outputs = outputs.permute(1, 0, 2)
            input_lengths = torch.full((sequences.size(0),), sequences.size(1), dtype=torch.long)
            target_lengths = torch.tensor([label[label != 0].size(0) for label in labels], dtype=torch.long)
            loss = criterion(outputs.log_softmax(2), labels, input_lengths, target_lengths)

            val_loss += loss.item()
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {total_loss/len(train_loader):.4f}, Validation Loss: {val_loss/len(test_loader):.4f}')


Epoch [1/100], Training Loss: 3.4547, Validation Loss: 2.8069
Epoch [2/100], Training Loss: 2.7178, Validation Loss: 2.6307
Epoch [3/100], Training Loss: 2.5199, Validation Loss: 2.4286
Epoch [4/100], Training Loss: 2.3810, Validation Loss: 2.2826
Epoch [5/100], Training Loss: 2.2572, Validation Loss: 2.1659
Epoch [6/100], Training Loss: 2.1302, Validation Loss: 2.0338
Epoch [7/100], Training Loss: 2.0233, Validation Loss: 1.9185
Epoch [8/100], Training Loss: 1.9403, Validation Loss: 1.8252
Epoch [9/100], Training Loss: 1.8753, Validation Loss: 1.7709
Epoch [10/100], Training Loss: 1.8223, Validation Loss: 1.7115


KeyboardInterrupt: 

In [43]:
import jiwer

def calculate_wer(reference, hypothesis):
    """
    Calculate the Word Error Rate (WER).
    - reference: The ground truth string.
    - hypothesis: The predicted string.
    Returns the WER as a float.
    """
    return jiwer.wer(reference, hypothesis)

def calculate_cer(reference, hypothesis):
    """
    Calculate the Character Error Rate (CER).
    - reference: The ground truth string.
    - hypothesis: The predicted string.
    Returns the CER as a float.
    """
    return jiwer.cer(reference, hypothesis)

def ctc_greedy_decoder(output, int_to_char, blank_label):
    """
    Decodes the output of the network using a greedy approach.
    - output: The raw output from the network.
    - int2char: A dictionary mapping indices to characters.
    - blank_label: The index of the blank label.
    Returns a list of decoded words.
    """
    decoded_words = []
    for batch in output:
        word = []
        prev_char = None
        for i in batch:
            char_idx = i.item()
            if char_idx != blank_label and (prev_char is None or char_idx != prev_char):
                word.append(int_to_char(char_idx))
            prev_char = char_idx
        decoded_words.append(''.join(word))
    return decoded_words

# Ensure the blank label is mapped to a space character if needed
# int2char[blank_label] = ' '
model.eval()
with torch.no_grad():
    total_val_loss = 0
    all_golden_words = []
    all_predicted_words = []
    for sequences, labels in test_loader:
        sequences = sequences.view(sequences.size(0), sequences.size(1), -1)  # Flatten the input dimensions
        
        # Print the shape of sequences
        print(f'Validation Batch, sequences shape: {sequences.shape}')
        
        # Move tensors to the appropriate device
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        outputs = model(sequences)
        
        # Convert outputs to predicted indices
        max_indices = torch.argmax(outputs, dim=2)
        
        # Decode the predicted indices to words
        predicted_words = ctc_greedy_decoder(max_indices, int_to_char, 0)
        
        # Convert golden labels to characters
        golden_words = []
        for label in labels:
            word = []
            for i in label:
                if i.item() != 0:  # Ignore the padding (blank) label
                    word.append(int_to_char(i.item()))
            golden_words.append(''.join(word))
        
        # Collect all golden and predicted words
        all_golden_words.extend(golden_words)
        all_predicted_words.extend(predicted_words)
        
        # Print the golden and predicted labels
        for i in range(len(golden_words)):
            print(f"Golden: {golden_words[i]}")
            print(f"Predicted: {predicted_words[i]}")
            
        # Only process the first batch for printing
        break


Validation Batch, sequences shape: torch.Size([32, 75, 120])
Golden: lay white in e zero now 
Predicted: le re it  n 
Golden: place white with k three please 
Predicted: le re it  n 
Golden: place white with q seven again 
Predicted: le re it  n 
Golden: lay white by l five please 
Predicted: le re i  n 
Golden: set red at b one again 
Predicted: le re it  n 
Golden: set blue by a nine again 
Predicted: le re it  n 
Golden: lay blue by k two soon 
Predicted: le re i  n 
Golden: place blue by p one again 
Predicted: le re it  n 
Golden: bin white by a two please 
Predicted: le re it  n 
Golden: bin blue in e five now 
Predicted: le re i  n 
Golden: lay red in q six please 
Predicted: le re it  n 
Golden: bin green by u two again 
Predicted: le re i  n 
Golden: bin green in g six please 
Predicted: le re it  n 
Golden: set green with j one soon 
Predicted: le re it  n 
Golden: bin red by m six again 
Predicted: le re it  n 
Golden: set red at n four now 
Predicted: le re it  n 
Golden: p