#### **Lip Reading Application**

##### Install/Import dependencies

In [None]:
%%capture
#Install dependencies and confirm installation with !pip list
!pip install opencv-python matplotlib imageio gdown tensorflow-macos silence-tensorflow
!pip list

In [None]:
#Import dependencies
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import cv2

import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio
import logging
logging.getLogger("tensorflow").setLevel(logging.WARNING)
#import tensorflow as tf
#tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
import gdown
from tensorflow import keras
import warnings
warnings.filterwarnings('ignore')
#Neural Network dependencies
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
#Supress irrelevant tensor flow warnings --> I believe these come from the new version of tf and it tells you to ignore them
from silence_tensorflow import silence_tensorflow
silence_tensorflow()

##### Load Data

In [None]:
# %%capture
# #Downloading dataset that was made for creating lip-read models 
# UNCOMMENT FOR FIRST RUN
# url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
# output = 'data.zip'
# gdown.download(url, output, quiet = False)
# gdown.extractall('data.zip')

In [None]:
def load_video(path) -> List[float]:
    '''
    Takes in a path to a video and returns the float values for each frame
        Args: 
            path: str --> path to video that will be passed into model
        Returns:
            List of floats that represents video1
    '''
    cap = cv2.VideoCapture(path)
    frames=[]
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        success, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        #isolates the mouth --> we can also use a lip detector to isolate the mouth
        frames.append(frame[190:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames-mean), tf.float32)/std                 
    

In [None]:
vocab=[x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
char_num = tf.keras.layers.StringLookup(vocabulary = vocab, oov_token="")
num_char = tf.keras.layers.StringLookup(vocabulary = char_num.get_vocabulary(), oov_token="", invert = True)
print(f'The vocab is: {char_num.get_vocabulary()}'
      f'(size={char_num.vocabulary_size()})')

The vocab is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '](size=40)


In [None]:
def load_alignments(path) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2]!='sil':
            tokens=[*tokens,' ', line[2]]
    return char_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path):
    path = bytes.decode(path.numpy())
    file_name = path.split('/')[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments', 's1',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)
    return frames, alignments

In [None]:
def mappable_function(path) -> List[str]:
    return tf.py_function(load_data,[path], (tf.float32,tf.int64))

##### Tensorflow Data Pipeline

In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
#batching into groups of 2, padding 75 frames with 40 tokens
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)
train=data.take(450)
test=data.skip(450)

In [None]:
#the data is now in batches of 2 alignments and 2 frames
frames, alignments = data.as_numpy_iterator().next()
alignments, len(frames)

(array([[16, 12,  1,  3,  5, 39,  2, 12, 21,  5, 39,  9, 14, 39,  9, 39,
         26,  5, 18, 15, 39, 14, 15, 23,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0],
        [ 2,  9, 14, 39,  7, 18,  5,  5, 14, 39, 23,  9, 20,  8, 39, 21,
         39, 19,  9, 24, 39, 14, 15, 23,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0]]),
 2)

In [None]:
sample = data.as_numpy_iterator()
val=sample.next()

##### Tensorflow Neural Network

In [None]:

model = Sequential()
model.add(Conv3D(128,3,input_shape=(75,46,140,1),padding='same',activation='relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256,3,padding='same',activation='relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75,3,padding='same',activation='relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))
model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))
model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))


In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 46, 140, 128)  3584      
                                                                 
 max_pooling3d (MaxPooling3D  (None, 75, 23, 70, 128)  0         
 )                                                               
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 75, 11, 35, 256)  0         
 3D)                                                             
                                                                 
 conv3d_2 (Conv3D)           (None, 75, 11, 35, 75)    518475    
                                                                 
 max_pooling3d_2 (MaxPooling  (None, 75, 5, 17, 75)    0

In [None]:
%%capture
#check what our model returns
yhat = model.predict(val[0])

In [None]:
#Prediction by the model before training
tf.strings.reduce_join([num_char(tf.argmax(x)) for x in yhat[0]])

<tf.Tensor: shape=(), dtype=string, numpy=b'qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqjjqqqqqqqqqqqqqqqqqqqqqqqqqtttttttvvvvvv'>

##### Training the Model

In [None]:
def learning_rate(epoch, lr):
    if epoch<30:
        return lr
    else:
        return lr*tf.math.exp(-.1)

In [None]:
#Loss function from keras Automatic Speech Recognition using CTC
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype='int64')
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback): 
    def __init__(self, dataset) -> None: 
        self.dataset = dataset.as_numpy_iterator()
    
    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):           
            print('Original:', tf.strings.reduce_join(num_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
opt = keras.optimizers.legacy.Adam(learning_rate=0.0001)
model.compile(optimizer=opt, loss=CTCLoss)

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)
schedule_callback = LearningRateScheduler(learning_rate)
example_callback = ProduceExample(data)

In [None]:
#Training data on this machine would take many days, so a pre-trained model (with the exact same parameters @96 epochs) will be loaded for ease of use

#model.fit(train, validation_data=test, epochs=96, callbacks=[checkpoint_callback, schedule_callback, example_callback])

##### Making Predictions With Our Model

In [None]:
# url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
# checkpoints = 'checkpoints.zip'
# gdown.download(url, checkpoints, quiet=False)
# gdown.extractall('checkpoints.zip', 'models')

In [None]:
model.load_weights('models/checkpoint')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus>

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
#capture
#take a sample video so we can make predictions on it
sample = test_data.next()

In [None]:
yhat = model.predict(sample[0])



In [None]:
#Real text of the video

real_text_outputs = [tf.strings.reduce_join([num_char(word) for word in sentence]) for sentence in sample[1]]
for output in real_text_outputs:
    print(output.numpy().decode())


lay white sp by l eight please
set white at i two please


In [None]:
#decode predictions and print out what the model predicts
decoded_text = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()
predicted_text_output = [tf.strings.reduce_join([num_char(word) for word in sentence]) for sentence in decoded_text]
for output in predicted_text_output:
    print(output.numpy().decode())

lay white sp by l eight please
set white at i two please


##### Test on Video from Data Folder

In [None]:
custom_sample = load_data(tf.convert_to_tensor('data/s1/lbad9a.mpg'))

In [None]:
yhat = model.predict(tf.expand_dims(custom_sample[0], axis=0))



In [None]:
print('REAL TEXT:')
print([tf.strings.reduce_join([num_char(word) for word in sentence]) for sentence in [custom_sample[1]]][0].numpy().decode())

REAL TEXT:
lay blue at d nine again


In [None]:
print('PREDICTION:')
decoded_text = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
predicted_text_output = [tf.strings.reduce_join([num_char(word) for word in sentence]) for sentence in decoded_text]
print(predicted_text_output[0].numpy().decode())

PREDICTION:
lay blue at d nine again
