In [3]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [4]:
tf.config.list_physical_devices('GPU')

[]

In [5]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

In [6]:
#import gdown
#url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
#output = 'data.zip'
#gdown.download(url, output, quiet=False)
#gdown.extractall('data.zip')


# 1: DATA LOADING FUNCTIONS


In [7]:
def load_video(path:str) -> List[float]: 
    """
    Converts rgb image to grayscale and
    standardizes it.
    
    input:
    video path as string and stores it
    as a list of float
    
    Returns:
    stadardize .
    """

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        # converting the rgb image to grayscale to reduce
        # computation power
        frame = tf.image.rgb_to_grayscale(frame)
        
        # isolating the lip section
        frames.append(frame[190:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    # standardizing the data
    return tf.cast((frames - mean), tf.float32) / std

In [5]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [6]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '] (size =40)


In [7]:
def load_alignments(path:str) -> List[str]: 
    with open(path, 'r') as f:
        """
    Loads alignments and store them as 
    list of strings than splits them and
    extracts the ending word than maps them
    to a numerical value using another function.
    
    input:
    alignment path as string and stores it
    as a list of strings
    
    Returns:
    Numerical conversion of the letter.
    """
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        # sil stands for silence
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [8]:
def load_data(path: str): 
    """
    loads path and returns image frames 
    and alignments in numpy array formats
    input:
    path of data
    
    Returns:
    Numpy arrays.
    """
    path = bytes.decode(path.numpy())
    #file_name = path.split('/')[-1].split('.')[0]
    # File name splitting for windows
    file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    frames = load_video(video_path) 
    alignments = load_alignments(alignment_path)
    
    return frames, alignments

In [9]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

# 2: DATA  PIPELINE

In [10]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)
# Added for split 
train = data.take(450)
test = data.skip(450)

In [11]:
frames, alignments = data.as_numpy_iterator().next()

In [12]:
alignments

array([[ 2,  9, 14, 39,  2, 12, 21,  5, 39,  1, 20, 39, 19, 39, 20,  8,
        18,  5,  5, 39,  1,  7,  1,  9, 14,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0],
       [12,  1, 25, 39, 18,  5,  4, 39,  1, 20, 39,  5, 39, 20,  8, 18,
         5,  5, 39, 19, 15, 15, 14,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0]], dtype=int64)

In [13]:
sample = data.as_numpy_iterator()

In [14]:
val = sample.next(); val[0]

array([[[[[1.4096433 ],
          [1.4096433 ],
          [1.3725474 ],
          ...,
          [9.0513935 ],
          [9.125586  ],
          [9.162682  ]],

         [[1.4096433 ],
          [1.4096433 ],
          [1.3725474 ],
          ...,
          [8.383668  ],
          [9.273969  ],
          [9.311065  ]],

         [[1.3354515 ],
          [1.3354515 ],
          [1.3354515 ],
          ...,
          [9.199778  ],
          [0.03709587],
          [0.03709587]],

         ...,

         [[0.9644928 ],
          [0.9644928 ],
          [0.9644928 ],
          ...,
          [0.03709587],
          [0.03709587],
          [0.03709587]],

         [[0.9644928 ],
          [0.9644928 ],
          [0.9644928 ],
          ...,
          [9.459449  ],
          [0.03709587],
          [0.03709587]],

         [[0.9644928 ],
          [0.9644928 ],
          [0.9644928 ],
          ...,
          [9.459449  ],
          [0.03709587],
          [0.        ]]],


        [[[1.3725

# 3: DESIGN DEEP NEURAL NETWORK

In [15]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [16]:
data.as_numpy_iterator().next()[0][0].shape

(75, 46, 140, 1)

In [17]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [18]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 75, 46, 140, 128   3584      
                             )                                   
                                                                 
 activation (Activation)     (None, 75, 46, 140, 128   0         
                             )                                   
                                                                 
 max_pooling3d (MaxPooling3  (None, 75, 23, 70, 128)   0         
 D)                                                              
                                                                 
 conv3d_1 (Conv3D)           (None, 75, 23, 70, 256)   884992    
                                                                 
 activation_1 (Activation)   (None, 75, 23, 70, 256)   0         
                                                        

In [19]:
yhat = model.predict(val[0])



In [20]:
tf.strings.reduce_join([num_to_char(x) for x in tf.argmax(yhat[0],axis=1)])

<tf.Tensor: shape=(), dtype=string, numpy=b'zznnnnnnnnnnnnnnnnnnnnz8888z888888888888888888888nnnnnnnnnnnnnnnnnnnnnnnnnn'>

In [21]:
tf.strings.reduce_join([num_to_char(tf.argmax(x)) for x in yhat[0]])

<tf.Tensor: shape=(), dtype=string, numpy=b'zznnnnnnnnnnnnnnnnnnnnz8888z888888888888888888888nnnnnnnnnnnnnnnnnnnnnnnnnn'>

In [22]:
model.input_shape

(None, 75, 46, 140, 1)

In [23]:
model.output_shape

(None, 75, 41)

# 4 Setup Training Options and Train

In [25]:
def scheduler(epoch, lr):
    ''' this fucntion is used to reduce learning rate 
    after 30 epochs to come up with a better solution'''
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [26]:
def CTCLoss(y_true, y_pred):
    
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [28]:
model.compile(optimizer=Adam(learning_rate=0.1), loss=CTCLoss)

In [29]:
# This call back is commonly used to save weights at certain intervals.
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True) 

In [30]:
# This callback is used to adjust the learning rate during training according to a defined 
# scheduler
schedule_callback = LearningRateScheduler(scheduler)

In [2]:
model.fit(train, validation_data=test, epochs=100, callbacks=[checkpoint_callback, schedule_callback])

# 5: MAKE PREDICTIONS

In [None]:
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip', 'models')

In [None]:
model.load_weights('models/checkpoint')

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
sample = test_data.next()

In [None]:
yhat = model.predict(sample[0])

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]