<a href="https://colab.research.google.com/github/vjardimb/lipnet/blob/main/LipNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#LipNet Implementation

The goal of this notebook is to implement the lip-reading model proposed by [Assael et al](https://arxiv.org/abs/1611.01599). This model is capable of translating what a person is saying without access to an audio signal. The reading is done only by recording the person's face while saying the sentence.

The implementation is made using the dataset used on the orignal paper.

In [None]:
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import gdown
import cv2
import dlib

In [None]:
# check GPU availability
physical_devices = tf.config.list_physical_devices('GPU')
print(physical_devices)

try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


#Get Data

In [None]:
# source_url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
# output = r'data.zip'
# gdown.download(source_url)
# gdown.extractall(output)

# Connect to Google Drive

In [None]:
# from google.colab import drive

# drive.mount('/content/gdrive')

In [None]:
# folder paths
folder_data = 'data'
folder_videos = folder_data + '/s1'
folder_align =  folder_data + '/alignments/s1'
folder_gdrive = 'gdrive/MyDrive'
folder_gdrive_project =  folder_gdrive + '/ML Projects/Lipnet'

# Load Data Functions


In [None]:
def load_video(video_path):
    # initialize face detector and face marker
    hog_face_detector = dlib.get_frontal_face_detector()
    face_landmarker = dlib.shape_predictor(
        folder_gdrive_project + "/shape_predictor_68_face_landmarks.dat"
    )

    cap = cv2.VideoCapture(video_path.numpy().decode("utf-8"))
    frames = []

    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()

        tf_frame = tf.cast(frame, tf.float32)

        # convert to rgb and gray scale
        tf_frame = tf.image.rgb_to_grayscale(tf_frame)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # get faces on the frame and find lip corners
        face = hog_face_detector(gray)

        face_landmarks = face_landmarker(gray, face[0])

        # x and y lip corners coordinates
        left_coords = face_landmarks.part(48).x, face_landmarks.part(48).y
        right_coords = face_landmarks.part(54).x, face_landmarks.part(54).y
        center_x = int((left_coords[0] + right_coords[0])*0.5)
        center_y = int((left_coords[1] + right_coords[1])*0.5)

        cropped_frame = tf_frame[center_y-23:center_y+23, center_x-70:center_x+70, :]
        frames.append(cropped_frame)
    cap.release()

    mean = tf.cast(tf.math.reduce_mean(frames), tf.uint8)
    std = tf.math.reduce_std(frames)

    return tf.cast((frames - mean), tf.float32) / std

In [None]:
characters = "abcdefghijklmnopqrstuvwxyz'?!123456789 "
vocab = [*characters]

encoder = tf.keras.layers.StringLookup(vocabulary=vocab)
decoder = tf.keras.layers.StringLookup(vocabulary=vocab, output_mode="int", invert=True)

In [None]:
video_names = os.listdir('data/s1')
file_names = [name[:-4] for name in video_names]

In [None]:
# test encoder / decoder
data = tf.constant([["a", "c", "d"], ["d", "z", "b"]])

encoder(data).numpy(), decoder(encoder(data).numpy())

(array([[ 1,  3,  4],
        [ 4, 26,  2]]),
 <tf.Tensor: shape=(2, 3), dtype=string, numpy=
 array([[b'a', b'c', b'd'],
        [b'd', b'z', b'b']], dtype=object)>)

In [None]:
def load_align(align_path):
    with open(align_path) as file:
        words = [line.split(' ')[-1][:-1] for line in file.readlines()[1:-1]]
        sentence = ' '.join(words)
        return encoder([*sentence])

In [None]:
def load_data(video_path):
    file_name = video_path.numpy().decode("utf-8").split('/')[-1][:-4]
    align_path = folder_align + '/' + file_name + '.align'

    return load_video(video_path), load_align(align_path)

# Create Data PipeLine

In [None]:
corrupted_videos = [
    'lgbf8n.mpg',
    'pbwx1s.mpg',
    'bbizzn.mpg',
    'bwwuzn.mpg',
    'prii9a.mpg',
    'brwg8p.mpg',
    'brwa4p.mpg',
    'sran9s.mpg',
    'lrarzn.mpg',
    'pbio7a.mpg',
    'sbbh4p.mpg'
]

def filter_corrupted_files(file_path):
    file_name = tf.strings.split(file_path, '/')[-1]
    return tf.math.logical_not(tf.reduce_any(tf.equal(file_name, corrupted_videos)))

In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.filter(filter_corrupted_files)
data = data.shuffle(300, )
data = data.map(lambda x: tf.py_function(load_data, [x], (tf.float32, tf.int64)))
data = data.padded_batch(2, padded_shapes=([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)

# Model Definition

In [None]:
input_shape = (75, 46, 140, 1)

# Input layer
inputs = tf.keras.layers.Input(shape=input_shape)

# Convolutional layers
x = tf.keras.layers.Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same')(inputs)
x = tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2), padding='same')(x)
x = tf.keras.layers.Conv3D(256, kernel_size=(3, 3, 3), activation='relu', padding='same')(x)
x = tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(x)
x = tf.keras.layers.Conv3D(75, kernel_size=(3, 3, 3), activation='relu', padding='same')(x)
x = tf.keras.layers.MaxPooling3D(pool_size=(1, 2, 2))(x)

# flattening in time dim
x = tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten())(x)

# Bidirectional GRU layers
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, kernel_initializer='orthogonal', return_sequences=True))(x)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, kernel_initializer='orthogonal', return_sequences=True))(x)
x = tf.keras.layers.Dropout(0.5)(x)

# Output layer
outputs = tf.keras.layers.Dense(encoder.vocabulary_size()+1, activation='softmax')(x)

# Create model
model = tf.keras.Model(inputs=inputs, outputs=outputs)

In [None]:
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 75, 46, 140, 1)   0         
                             ]                                   
                                                                 
 conv3d (Conv3D)             (None, 75, 46, 140, 128   3584      
                             )                                   
                                                                 
 max_pooling3d (MaxPooling3  (None, 75, 23, 70, 128)   0         
 D)                                                              
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 max_pooling3d_1 (MaxPoolin  (None, 75, 11, 35, 256)   0         
 g3D)                                                        

## Callbacks and Loss Functions

In [None]:
# This function keeps the initial learning rate for the first ten epochs
# and decreases it exponentially after that.
def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

checkpoint_callback = ModelCheckpoint(
    os.path.join(folder_gdrive_project,'model_checkpoints'),
    monitor='loss',
    save_weights_only=True
)

scheduler_callback = LearningRateScheduler(scheduler)

In [None]:
def CTC_Loss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
model.compile(tf.keras.optimizers.Adam(), loss=CTC_Loss)

In [None]:
train = data.take(450)
test = data.skip(450)

In [None]:
history = model.fit(minidataset, validation_data=minidataset, epochs=10, callbacks=[checkpoint_callback, scheduler_callback])

Hardware limitations are preventing the propertraining of the neural network.