In [33]:
import gdown
import json
import os
import cv2
import torch
import torch.nn as nn
from torch.nn import init
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
import glob
from pathlib import Path

In [2]:
def save_json(data, filename):
    with open(filename, 'w') as file:
        json.dump(data, file)

# Dataset

In [4]:
output = 'data.zip'
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'

if not os.path.isfile(output):
    print("Initializing data ingestion...")
    gdown.download(url, output, quiet=False)
    gdown.extractall('data.zip')

print("Data Installed...\n Saved in data/")

Data Installed...
 Saved in data/


# Model

In [5]:
class Conv3DLSTMModel(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super(Conv3DLSTMModel, self).__init__()

        self.conv1 = nn.Conv3d(1, 128, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        self.conv2 = nn.Conv3d(128, 256, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        self.conv3 = nn.Conv3d(256, 75, kernel_size=3, padding=1)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        self.lstm1 = nn.LSTM(input_size=75 * 5 * 17, hidden_size=hidden_size,
                              batch_first=True, bidirectional=True)
        self.dropout1 = nn.Dropout(0.5)

        self.lstm2 = nn.LSTM(input_size=128 * 2, hidden_size=hidden_size,
                             batch_first=True, bidirectional=True)
        self.dropout2 = nn.Dropout(0.5)

        self.dense = nn.Linear(128 * 2, vocab_size)
        self.__initweights__()    

    def __initweights__(self):
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.LSTM):
                for name, param in m.named_parameters():
                    if 'weight_ih' in name:
                        nn.init.kaiming_normal_(param.data, mode='fan_out', nonlinearity='relu')
                    elif 'weight_hh' in name:
                        nn.init.kaiming_normal_(param.data, mode='fan_out', nonlinearity='relu')
                    elif 'bias' in name:
                        param.data.fill_(0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.constant_(m.bias, 0)
                
    def forward(self, x):
        # Apply the sequence of conv, relu activations and max pooling
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = torch.relu(x)
        x = self.pool3(x)
        # Flatten the dimensions other than batch and sequence length (depth)
        batch_size, _, D, H, W = x.size()
        x = x.permute(0, 2, 1, 3, 4)  # Swap the depth and channel dimensions
        x = x.reshape(batch_size, D, -1)  # Flatten the spatial dimensions
        # Bidirectional LSTM layers
        x, _ = self.lstm1(x)
        x = self.dropout1(x)

        x, _ = self.lstm2(x)
        x = self.dropout2(x)

        # To apply the dense layer, we need to consider only the last output of the sequence.
        x = self.dense(x)

        return x
    
class Conv3DLSTMModelMini(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super(Conv3DLSTMModelMini, self).__init__()

        self.conv1 = nn.Conv3d(1, 64, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        self.conv2 = nn.Conv3d(64, 128, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool3d(kernel_size=(1, 2, 2))

        self.lstm1 = nn.LSTM(input_size=128 * 11 * 35, hidden_size=hidden_size,
                              batch_first=True, bidirectional=True)
        self.dropout1 = nn.Dropout(0.3)

        self.dense = nn.Linear(64 * 2, vocab_size)

    def forward(self, x):
        x = self.pool1(torch.relu(self.conv1(x)))
        x = self.pool2(torch.relu(self.conv2(x)))
        batch_size, _, D, H, W = x.size()
        x = x.permute(0, 2, 1, 3, 4)  
        x = x.reshape(batch_size, D, -1)  # Flatten the spatial dimensions

        # Bidirectional LSTM layers
        x, _ = self.lstm1(x)
        x = self.dropout1(x)

        x = self.dense(x)

        return x

class LipNet(nn.Module):
    def __init__(self, vocab_size=40, hidden_size=256):
        super(LipNet, self).__init__()
        # Adjustments for the number of initial channels if needed
        self.conv1 = nn.Conv3d(1, 32, (3, 5, 5), (1, 2, 2), (1, 2, 2))
        self.pool1 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        self.conv2 = nn.Conv3d(32, 64, (3, 5, 5), (1, 1, 1), (1, 2, 2))
        self.pool2 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        self.conv3 = nn.Conv3d(64, 96, (3, 3, 3), (1, 1, 1), (1, 1, 1))     
        self.pool3 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        # Adjust the input size according to the output from the last conv layer
        self.lstm1 = nn.LSTM(96*2*8, hidden_size, 1, batch_first=True, bidirectional=True)
        self.lstm2 = nn.LSTM(512, hidden_size, 1, batch_first=True, bidirectional=True)
        
        self.dense = nn.Linear(512, vocab_size)
        self.dropout_p = 0.5

        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(self.dropout_p)
        self.dropout3d = nn.Dropout3d(self.dropout_p)  
        self._init_weights_()
    
    def _init_weights_(self):
        
        init.kaiming_normal_(self.conv1.weight, nonlinearity='relu')
        init.constant_(self.conv1.bias, 0)
        
        init.kaiming_normal_(self.conv2.weight, nonlinearity='relu')
        init.constant_(self.conv2.bias, 0)
        
        init.kaiming_normal_(self.conv3.weight, nonlinearity='relu')
        init.constant_(self.conv3.bias, 0)        
        
        init.kaiming_normal_(self.dense.weight, nonlinearity='sigmoid')
        init.constant_(self.dense.bias, 0)
        
        # Initialization for LSTM weights/biases can be more complex
        # Here is a simple version which you can refine depending on your needs
        for lstm in (self.lstm1, self.lstm2):
            for name, param in lstm.named_parameters():
                if 'weight_ih' in name:
                    torch.nn.init.xavier_uniform_(param.data)
                elif 'weight_hh' in name:
                    torch.nn.init.orthogonal_(param.data)
                elif 'bias' in name:
                    param.data.fill_(0)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.relu(x)
        x = self.dropout3d(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu(x)
        x = self.dropout3d(x)        
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.relu(x)
        x = self.dropout3d(x)        
        x = self.pool3(x)

        x = x.permute(0, 2, 1, 3, 4)  # B, T, C, H, W
        x = x.contiguous().view(x.size(0), x.size(1), -1)  # Flatten the spatial dimensions
        
        x, _ = self.lstm1(x)
        x = self.dropout(x)
        x, _ = self.lstm2(x)
        x = self.dropout(x)
                
    
        x = self.dense(x)
    
        return x
    


# Utilization Function

In [22]:
import numpy as np 
import itertools

def get_stoi(file):
  """
  Args:
  file (str): The file path of the repective algnments under data/ folder
  
  Returns:
  vocab (np.array([])): Returns an array Representing the string with indices 
  corresponding to the characters in actual vocabulary.
  """
  vocabulary = "abcdefghijklmnopqrstuvwxyz1234567890!?' "
  f = open(file, "r")
  list = []
  vocab = []
  for line in f:
    line = line.strip()
    txt = line.split(" ")[2]
    if txt != "sil":
        list.append(txt)
        list.append(" ")
  for ls in list:
    for t in ls:
      if t in vocabulary:
          vocab.append(vocabulary.index(t))
  # The length of the sequence is 35
  if len(vocab) < 35:
    for i in range(35-len(vocab)):
      vocab.append(38)
  vocab = np.array(vocab)
  return vocab

def itos(vec):
  """"
  Args:
  vec(np.array([])): An integer array corresponding to the actual character in the vocabulary
  Returns:
  sentence(str): The actual string/sentence
  """
  vocabulary = "abcdefghijklmnopqrstuvwxyz1234567890!?' "
  sentence = ""
  for elem in vec:
    sentence += vocabulary[elem]

  return sentence

def ctc_decode(input_string):
    """
    Its a simple implementation of the actual ctc decode function to 
    convert decode the strings.
    Args:
    input_string(str): The actual string of frame length constant (75)
    Returns:
    decoded_string(str): Returns CTC decoded string.
    """
    # Split the string to process each word separately
    words = input_string.split()
    decoded_words = []

    for word in words:
        # Collapse repeated characters in each word.
        collapsed_word = ''.join(char for char, _ in itertools.groupby(word))
        decoded_words.append(collapsed_word)

    # Rejoin the decoded words into a single string.
    decoded_string = ' '.join(decoded_words)
    return decoded_string

# Custom Dataset Function

In [20]:
from torch.utils.data import Dataset

class CustomDataset(Dataset):
  def __init__(self , files):
    self.files = files

  def __len__(self):
    return len(self.files)

  def __getitem__(self, idx):
    path = self.files[idx]
    vocab = get_stoi(path)
    mpgpath = os.getcwd() + "/data/s1/"
    mpgpath = mpgpath + path.split("/")[5].split(".")[0] + ".mpg"
    frames = []
    cap = cv2.VideoCapture(mpgpath)
    ret = True
    size = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    while ret:
      ret, img = cap.read() # read one frame from the 'capture' object; img is (H, W, C)
      if ret:
         img = cv2.cvtColor(img , cv2.COLOR_BGR2GRAY)
         img = np.reshape(img , ( img.shape[0] , img.shape[1] , 1 ))
         frames.append(img[190:236, 80:220, :])

    for _ in range(75-int(size)):
        frames.append(np.zeros((46, 140, 1)))
    mpg = np.stack(frames, axis=0)
    frames = torch.from_numpy(mpg)
    frames = torch.permute(frames , (3 , 0 , 1 , 2))
    return frames , vocab

In [21]:
def train_one_epoch(optimizer,
                    train_loader,
                    ctc_loss,
                    device,
                    model):
    total_loss = 0
    for frame, align in tqdm(train_loader):
        frame = frame.type(torch.FloatTensor)
        frame = frame.to(device)
        y = np.array(align)
        y = torch.from_numpy(y)
        y = y.to(device)
        
        pred = model(frame)

        probs = pred.permute(1,0,2) # (B, T, C) -> (T, B, C)

        target_lengths = []
        y_true = []

        for seq in y:
            length = (seq != 38).sum()
            y_true.extend(seq[:length].tolist())
            target_lengths.append(length)

        target_lengths = torch.tensor(target_lengths, dtype=torch.long).to(device)
        targets = torch.tensor(y_true, dtype=torch.long).to(device)

        # All input sequences use the full 75 timesteps
        input_lengths = torch.full((frame.size(0),), 75, dtype=torch.long).to(device)

        loss = ctc_loss(probs, targets, input_lengths, target_lengths)
        total_loss += loss.item()
        # backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        word = [ ]
        words= [ ]

        for i in range(y.shape[0]):
            for n in range(75):
                max = torch.argmax(pred[i][n][:])
                word.append(max.cpu().detach().numpy())
            words.append(word)
            word = []
        words = np.stack(words , axis=0)
    
    # prinint the first sentence of last batch
    print(f"Predicted Sentence: {itos(words[0])}")
    print("CTC Decoded Sentence:", ctc_decode(itos(words[0])))
    print("Original Sentence:", itos(y[0]) )
    
    total_loss /= len(train_loader)
    return total_loss, words[0]

In [23]:
def valid_one_epoch(valid_loader,
                    ctc_loss,
                    device,
                    model):
    total_loss = 0
    for frame, align in tqdm(valid_loader):
        frame = frame.type(torch.FloatTensor)
        frame = frame.to(device)
        y = np.array(align)
        y = torch.from_numpy(y)
        y = y.to(device)

        probs = model(frame).permute(1,0,2) # (B, T, C) -> (T, B, C)

        target_lengths = []
        y_true = []

        for seq in y:
            length = (seq != 38).sum()
            y_true.extend(seq[:length].tolist())
            target_lengths.append(length)

        target_lengths = torch.tensor(target_lengths, dtype=torch.long).to(device)
        targets = torch.tensor(y_true, dtype=torch.long).to(device)

        # All input sequences use the full 75 timesteps
        input_lengths = torch.full((frame.size(0),), 75, dtype=torch.long).to(device)

        loss = ctc_loss(probs, targets, input_lengths, target_lengths)
        total_loss += loss.item()

    total_loss /= len(valid_loader)
    return total_loss

In [25]:
def train_lipnet(EPOCHS=EPOCHS,
                 lr=LR,
                 hidden_size=HIDDEN_SIZE,
                 model = MODEL,
                 batch_size=BATCH_SIZE,
                 num_workers=WORKERS,
                 device=DEVICE):
    
    vocab_size = 40 
    MODEL_PATH = Path('models')
    MODEL_PATH.mkdir(parents=True, exist_ok=True)
    MODEL_NAME = model +".pth"
    MODEL_SAVE_PATH = MODEL_PATH/MODEL_NAME

    dataset_path = os.getcwd() + "/data/alignments/s1/*.align"
    files = glob.glob(dataset_path)

    criterion = nn.CTCLoss(blank=39)

    if model == 'conv3dlstm':
        model = Conv3DLSTMModel(vocab_size, hidden_size).to(device)
    elif model == 'conv3dlstmmini':
        model = Conv3DLSTMModelMini(vocab_size, hidden_size).to(device)
    else:
        model = LipNet(vocab_size, hidden_size).to(device)

    if os.path.isfile(MODEL_SAVE_PATH):
        print("Loading model..")
        model.load_state_dict(torch.load(f=MODEL_SAVE_PATH)) 
    
    optimizer = optim.Adam(model.parameters(), lr)

    train_data = CustomDataset(files[:900])
    valid_data = CustomDataset(files[900:])

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)

    summary_loss = {"train_loss" : [],
                    "valid_loss" : []}
    for epoch in range(EPOCHS):
        print(f"EPOCH {epoch}")
        print(optimizer.param_groups[0]["lr"])
        
        train_loss, words = train_one_epoch(optimizer=optimizer,
                                               train_loader=train_loader,
                                               ctc_loss=criterion,
                                               device=device,
                                               model=model)
        
        valid_loss = valid_one_epoch(valid_loader=valid_loader,
                                     ctc_loss=criterion,
                                     device=device,
                                     model=model)
        
        print(f"Train Loss: {train_loss} Valid Loss: {valid_loss}")
        
        if ((epoch + 1) > ((epoch // 100) * 100 + 60)) and ((epoch + 1) <= ((epoch // 100) + 1) * 100):
            optimizer.param_groups[0]["lr"] *= np.exp(-0.1)
        else:
            optimizer.param_groups[0]["lr"] = lr

        if os.path.isfile(MODEL_SAVE_PATH):
            print("Saving model..")
            os.remove(MODEL_SAVE_PATH)
            torch.save(obj=model.state_dict(), f=MODEL_SAVE_PATH)

        summary_loss["train_loss"].append(train_loss)
        summary_loss["valid_loss"].append(valid_loss)
        

    return summary_loss

In [34]:
EPOCHS = 300
LR = 0.001
HIDDEN_SIZE = 256
MODEL = 'lipnet-lstm'
BATCH_SIZE = 8
WORKERS = 4
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

summary = train_lipnet(EPOCHS=EPOCHS,
                       lr=LR,
                       hidden_size=HIDDEN_SIZE,
                       model = MODEL,
                       batch_size=BATCH_SIZE,
                       num_workers=WORKERS,
                       device=DEVICE)

  model.load_state_dict(torch.load(f=MODEL_SAVE_PATH))


Loading model..
EPOCH 0
0.001


  7%|▋         | 8/113 [00:49<10:49,  6.19s/it]


KeyboardInterrupt: 