In [25]:
!pip install face-alignment



In [0]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.utils.data import DataLoader
import math
import os
import sys
#from dataset import MyDataset
import numpy as np
import time
#from model import LipNet
import torch.optim as optim
import re
import json
import tempfile
import shutil
import cv2
import face_alignment

In [27]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/My\ Drive/Comp4471\ Project/Evaluation

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive/Comp4471 Project/Evaluation


In [0]:
# Identify the file for loading the input video. Put your file in the folder evaluation_vids and change file_name to the 
# corresponding file's name

file_name = '2.mp4'
file_path = 'evaluation_vids/' + file_name

In [0]:
class LipNet(torch.nn.Module):
# Define the structure of the Lipnet module as given in the paper of LipNet.
# This will be loaded with the pretrained weights in the evaluation script
    
    def __init__(self, drop_prob =0.5):

        self.dropout_probability  = drop_prob
        super(LipNet, self).__init__()
        self.conv1 = nn.Conv3d(3, 32, (3, 5, 5), (1, 2, 2), (1, 2, 2))
        self.pool1 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        self.conv2 = nn.Conv3d(32, 64, (3, 5, 5), (1, 1, 1), (1, 2, 2))  # Original paper says (32, 64, (3, 5, 5), (1, 2, 2), (1, 2, 2))
        self.pool2 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        self.conv3 = nn.Conv3d(64, 96, (3, 3, 3), (1, 1, 1), (1, 1, 1))     
        self.pool3 = nn.MaxPool3d((1, 2, 2), (1, 2, 2))
        
        self.gru1  = nn.GRU(96*4*8, 256, 1, bidirectional=True)
        self.gru2  = nn.GRU(512, 256, 1, bidirectional=True)
        
        self.FC    = nn.Linear(512, 27+1)

        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(self.dropout_probability)        
        self.dropout3d = nn.Dropout3d(self.dropout_probability)  

    def forward(self, x):
        
        x = self.conv1(x)
        x = self.relu(x)
        x = self.dropout3d(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu(x)
        x = self.dropout3d(x)        
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.relu(x)
        x = self.dropout3d(x)        
        x = self.pool3(x)
        
        x = x.permute(2, 0, 1, 3, 4).contiguous()
        x = x.view(x.size(0), x.size(1), -1)
        
        self.gru1.flatten_parameters()
        self.gru2.flatten_parameters()
        
        x, h = self.gru1(x)        
        x = self.dropout(x)
        x, h = self.gru2(x)   
        x = self.dropout(x)
                
        x = self.FC(x)
        x = x.permute(1, 0, 2).contiguous()
        return x

In [0]:
def load_video(video_file):
    # Loads the video file using cv2 Video Capture, as a list of frames. Iterated over the frames 
    # and extracts the lip region for each frame. Finally, returns a list of the lip extracted
    # frames from the video as output, which will be the input to the LipNet
    
    # Get the frames from the video
    cam = cv2.VideoCapture(video_file)
    array = []
    while(True): 
      ret,frame = cam.read()
      if ret:
        array.append(frame)
      else:
        break
    
    array = list(filter(lambda im: not im is None, array))
    
    # Extract the facial landmarks from each frame in array
    face_landmarks = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda')
    landmarks = [face_landmarks.get_landmarks(I) for I in array]
    final_output = []

    # Iterate over the frames, extract the lip region using coordinates from the face landmarks,
    # append the lip frame (resized to 128x64 for lipnet input) to the final output
    for ld, face in zip(landmarks, array):
        if(ld is not None):
            
            shape = np.array(ld[0])
            start_hor = int(shape[5][0])
            end_hor = int(shape[13][0])
            start_ver = int(np.minimum(shape[5][1], shape[13][1]))
            end_ver = int(shape[9][1])

            img = face[start_ver:end_ver, start_hor:end_hor]
            img = cv2.resize(img, (128, 64), interpolation=cv2.INTER_LANCZOS4)
            final_output.append(img)

    
    final_output = np.stack(final_output, axis=0).astype(np.float32)
    final_output = torch.FloatTensor(final_output.transpose(3, 0, 1, 2)) / 255.0

    return final_output


def output_to_text(output, start_index):
# Takes in the digits of the output from lipnet and converts it into characters. Returns the text output

    characters = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']
    check = -1
    final_output = []
    for i in output:
        if(check != i and i >= start_index):                
            if(len(final_output) > 0 and final_output[-1] == ' ' and characters[i - start_index] == ' '):
                pass
            else:
                final_output.append(characters[i - start_index])                
        check = i
    return ''.join(final_output).strip()

def longest_common_substring(s1, s2):
   # Find the longest substring common between 2 given strings using dynamic programming
   # Used as a helper function for preprocess_text
   
   dp_matrix = [[0] * (1 + len(s2)) for i in range(1 + len(s1))]
   length_longest, end_index = 0, 0
   for x in range(1, 1 + len(s1)):
       for y in range(1, 1 + len(s2)):
           if s1[x - 1] == s2[y - 1]:
               dp_matrix[x][y] = dp_matrix[x - 1][y - 1] + 1
               if dp_matrix[x][y] > length_longest:
                   length_longest = dp_matrix[x][y]
                   end_index = x
           else:
               dp_matrix[x][y] = 0
   return s1[end_index - length_longest: end_index]

def preprocess_text(output_text):
  # Takes in the input text, and uses the longest common substring between the GRID vocabulary
  # to process the text according to the GRID vocabulary. Returns the list of words in the 
  # processed text

  list_words = output_text.split(" ")
  word_1_list = ['BIN', 'LAY', 'PLACE', 'SET']
  word_2_list = ['BLUE', 'GREEN', 'RED', 'WHITE']
  word_3_list = ['AT', 'BY', 'IN', 'WITH']
  word_5_list = ['ONE', 'TWO', 'THREE', 'FOUR', 'FIVE', 'SIX', 'SEVEN', 'EIGHT', 'NINE', 'ZERO']
  word_6_list = ['AGAIN', 'NOW', 'PLEASE', 'SOON']

  for i, item in enumerate(list_words):

    if i == 0:
      lengths = list(map(lambda x: len(longest_common_substring(list_words[i], x)), word_1_list))
      highest_score = np.max(lengths)
      items = []
      for j, item in enumerate(word_1_list):
        if lengths[j] == highest_score:
          if item[0] == list_words[i][0]:
            items.append(item)

      if len(items) == 0:
        for j, item in enumerate(word_6_list):
          if lengths[j] == highest_score:
            list_words[i] = item
      else:
        list_words[i] = items[0]

    if i == 1:
      lengths = list(map(lambda x: len(longest_common_substring(list_words[i], x)), word_2_list))
      highest_score = np.max(lengths)
      items = []
      for j, item in enumerate(word_2_list):
        if lengths[j] == highest_score:
          if item[0] == list_words[i][0]:
            items.append(item)

      if len(items) == 0:
        for j, item in enumerate(word_6_list):
          if lengths[j] == highest_score:
            list_words[i] = item
      else:
        list_words[i] = items[0]

    if i == 2:
      lengths = list(map(lambda x: len(longest_common_substring(list_words[i], x)), word_3_list))
      highest_score = np.max(lengths)
      items = []
      for j, item in enumerate(word_3_list):
        if lengths[j] == highest_score:
          if item[0] == list_words[i][0]:
            items.append(item)

      if len(items) == 0:
        for j, item in enumerate(word_6_list):
          if lengths[j] == highest_score:
            list_words[i] = item
      else:
        list_words[i] = items[0]

    if i == 4:
      lengths = list(map(lambda x: len(longest_common_substring(list_words[i], x)), word_5_list))
      highest_score = np.max(lengths)
      items = []

      for j, item in enumerate(word_5_list):
        if lengths[j] == highest_score:
          if item[0] == list_words[i][0]:
            items.append(item)

      if len(items) == 0:
        for j, item in enumerate(word_6_list):
          if lengths[j] == highest_score:
            list_words[i] = item
      else:
        list_words[i] = items[0]

    if i == 5:
      lengths = list(map(lambda x: len(longest_common_substring(list_words[i], x)), word_6_list))
      highest_score = np.max(lengths)
      items = []
      for j, item in enumerate(word_6_list):
        if lengths[j] == highest_score:
          if item[0] == list_words[i][0]:
            items.append(item)

      if len(items) == 0:
        for j, item in enumerate(word_6_list):
          if lengths[j] == highest_score:
            list_words[i] = item
      else:
        list_words[i] = items[0]
   
  
  return list_words

In [31]:
# Evaluation script
model = LipNet()
model = model.cuda()
pretrained_dict = torch.load('LipNet_unseen_loss_0.44562849402427673_wer_0.1332580699113564_cer_0.06796452465503355.pt')
model_dict = model.state_dict()
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict.keys() and v.size() == model_dict[k].size()}
missed_params = [k for k, v in model_dict.items() if not k in pretrained_dict.keys()]
model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)
    
video = load_video(file_path)
y = model(video[None,...].cuda())
y = y.cpu().numpy().squeeze()
y_args = np.argmax(y, axis = 1).astype('int8')

txt = output_to_text(y_args, 1)

processed_words = preprocess_text(txt)

print("Ground Truth: LAY BLUE WITH B EIGHT PLEASE")
print("Result of Lipnet Evaluation:", processed_words)

Ground Truth: LAY BLUE WITH B EIGHT PLEASE
Result of Lipnet Evaluation: ['LAY', 'BLUE', 'WITH', 'V', 'TWO', 'PLEASE']
