### **1. Loading the data:**

 Link for the required dataset : https://www.kaggle.com/datasets/rishisrdy/lipreading

 Download the provided data and load the data into the data folder

In [1]:
path = 'data'

### **2. Data Preparation**

In [2]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [3]:
def load_video(path:str) -> List[float]:
  cap = cv2.VideoCapture(path)
  frames = []
  for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
    ret, frame = cap.read()
    frame = tf.image.rgb_to_grayscale(frame)
    frames.append(frame[190:236, 80:220,:])
  cap.release()

  mean = tf.math.reduce_mean(frames)
  std = tf.math.reduce_std(tf.cast(frames, tf.float32))
  return tf.cast((frames - mean), tf.float32) / std

In [4]:
vocab = ["a", "b", "c", "d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")

In [5]:
char_to_num

<keras.layers.preprocessing.string_lookup.StringLookup at 0x15d1d173370>

In [6]:
num_to_char = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="", invert=True)

In [7]:
num_to_char

<keras.layers.preprocessing.string_lookup.StringLookup at 0x15d44407760>

In [8]:
def load_alignments(path:str) -> List[str]:
  with open(path, 'r') as f:
    lines = f.readlines()
  tokens = []
  for line in lines:
    line = line.split()
    if line[2] != 'sil':
      tokens = [*tokens, ' ', line[2]]
  return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'),(-1)))[1:]

In [9]:
def load_data(path:str):
  path = bytes.decode(path.numpy())
  file_name = path.split('\\')[-1].split('.')[0]
  video_path = os.path.join('data', 's1',f'{file_name}.mpg')
  alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
  frames = load_video(video_path)
  alignments = load_alignments(alignment_path)

  return frames, alignments

In [10]:
test_path = 'data\\s1\\bbal6n.mpg'
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('\\')[-1].split('.')[0]

'bbal6n'

In [11]:
frames, alignments = load_data(tf.convert_to_tensor(test_path))

FileNotFoundError: [Errno 2] No such file or directory: 'data\\alignments\\s1\\bbal6n.align'

In [None]:
frames

: 

In [None]:
alignments

: 

In [None]:
plt.imshow(frames[15])

: 

In [None]:
plt.imshow(frames[20])

: 

In [None]:
plt.imshow(frames[50])

: 

In [None]:
frames.shape

: 

In [None]:
alignments.shape

: 

In [None]:
len(frames)

: 

### 3. Video Pre-processing

In [None]:
import sys

: 

Importing necessary libraries

In [None]:
import cv2
import numpy as np
import math
import matplotlib.pyplot as plt
%matplotlib inline

: 

Defining the variables

In [None]:
# Here, we define some colours
SCALAR_BLACK = (0.0,0.0,0.0)
SCALAR_WHITE = (255.0,255.0,255.0)
SCALAR_YELLOW = (0.0,255.0,255.0)
SCALAR_GREEN = (0.0,255.0,0.0)
SCALAR_RED = (0.0,0.0,255.0)
SCALAR_CYAN = (255.0,255.0,0.0)

: 

Function to draw the image

In [None]:
# function to plot n images using subplots
def plot_image(images, captions=None, cmap=None ):
    f, axes = plt.subplots(1, len(images), sharey=True)
    f.set_figwidth(15)
    for ax,image,caption in zip(axes, images, captions):
        ax.imshow(image, cmap)
        ax.set_title(caption)

: 

Capturing movements in the video in frame-wise

In [None]:
SHOW_DEBUG_STEPS  = True

# Reading video
cap = cv2.VideoCapture('data/s1/bbaf3s.mpg')

# if video is not present, show error
if not(cap.isOpened()):
    print("Error reading file")

# Check if you are able to capture the video
ret, fFrame  = cap.read()

# Capturing 2 consecutive frames and making a copy of those frame. Perform all operations on the copy frame.
ret, fFrame1 = cap.read()
ret, fFrame2 = cap.read()
ret, fFrame3 = cap.read()
img1 = fFrame1.copy()
img2 = fFrame2.copy()
img3 = fFrame3.copy()

if(SHOW_DEBUG_STEPS):
    print ('img1 height = ' + str(img1.shape[0]))
    print ('img1 width = ' + str(img1.shape[1]))
    print ('img2 height = ' + str(img2.shape[0]))
    print ('img2 width = ' + str(img2.shape[1]))
    print ('img3 height = ' + str(img3.shape[0]))
    print ('img3 width = ' + str(img3.shape[1]))

# Convert the colour images to greyscale in order to enable fast processing
img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
img3 = cv2.cvtColor(img3, cv2.COLOR_BGR2RGB)

#plotting
plot_image([img1, img2, img3], cmap='gray', captions=["First frame", "Second frame", "Third frame"])


: 

Adding Gaussian blur for smoothening

In [None]:
# Add some Gaussian Blur
img1 = cv2.GaussianBlur(img1,(5,5),0)
img2 = cv2.GaussianBlur(img2,(5,5),0)
img3 = cv2.GaussianBlur(img3,(5,5),0)
#plotting
plot_image([img1, img2, img3], cmap='gray', captions=["GaussianBlur first frame", "GaussianBlur second frame", "GaussianBlur third frame"])

: 

Finding movement in the video

In [None]:
#This imgDiff variable is the difference between consecutive frames, which is equivalent to detecting movement

imgDiff = cv2.absdiff(img1, img2)

# Thresholding the image that is obtained after taking difference. Pixel value below 30 will be set as 0(black) and above as 255(white)
ret,imgThresh = cv2.threshold(imgDiff,30.0,255.0,cv2.THRESH_BINARY)
ht = np.size(imgThresh,0)
wd = np.size(imgThresh,1)
plot_image([imgDiff, imgThresh], cmap='gray', captions = ["Difference between 2 frames", "Difference between 2 frames after threshold"])

: 

### **4. Splitting of data**

Splitting data int train and test and validation sets:

Creating a mapable function:

In [None]:
def mappable_function(path:str) -> List[str]:
  result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
  return result

: 

Splitting:

In [None]:
data = tf.data.Dataset.list_files('data\\s1\\*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75, None, None, None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)

##Added for split
train = data.take(450)
test = data.skip(450)

: 

In [None]:
train

: 

In [None]:
test

: 

In [None]:
print(len(train))

print(len(test))

: 

Printing the preprocessed data:

In [None]:
frames, alignments = data.as_numpy_iterator().next()

: 

In [None]:
frames

: 

In [None]:
sample = data.as_numpy_iterator()

: 

In [None]:
sample

: 

In [None]:
val = sample.next(); val[0]

: 

4. Model Building

Importing Tensorflow libraries

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

: 

Callbacks:

In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

: 

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()
    
    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75, 75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original: ', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction: ', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

: 

Loss Function:

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
    
    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    
    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

: 

Model Building:

In [None]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75, 46, 140, 1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

print('Bidirectional LSTM model is built successfully')

: 

5. Train the models

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)
schedule_callback = LearningRateScheduler(scheduler)
example_callback = ProduceExample(test)

: 

In [None]:
tf.__version__

: 

In [None]:
model.compile(optimizer=Adam(learning_rate=0.002), loss=CTCLoss)

: 

In [None]:
model.fit(train, validation_data=test, epochs=1, callbacks=[checkpoint_callback, schedule_callback, example_callback])

: 

: 