In [126]:
import sys
import tensorflow.keras
import tensorflow as tf
import numpy as np
import os
import cv2
from typing import List
from matplotlib import pyplot as plt
import imageio
from tensorflow.keras.callbacks import TensorBoard


In [127]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)


In [128]:
def load_video(path: str) -> List[float]:
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        frames.append(frame[160:260, 50:250])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    
    frames = tf.cast(frames, tf.float32)
    mean = tf.cast(mean, tf.float32)
    
    result = (frames - mean) / std
    
    return result


In [129]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [130]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")

num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert = True)

In [131]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [132]:
def load_data(path: str):
    path = bytes.decode(path.numpy())
    file_name = path.split("\\")[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments

In [133]:
def mappable_function(path:str) -> List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

In [134]:
data = tf.data.Dataset.list_files(r'data\s1\*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)

train = data.take(450)
test = data.skip(450)

In [135]:
frames, alignments = data.as_numpy_iterator().next()

In [136]:
len(frames)

2

In [140]:
from tensorflow.keras import layers, models, optimizers

In [141]:
#reference: Training Strategies for Improved Lip-Reading, Pingchuan Ma, Yujiang Wang, Stavros Petridis, Jie Shen, Maja Pantic Imperial College London, Meta AI, UK

In [142]:
# Define the MS-TCN block
def mstcn_block(x, filters, kernel_sizes):
    branches = []
    for kernel_size in kernel_sizes:
        branch = layers.Conv3D(filters, 
                               (1, kernel_size, kernel_size), 
                               padding='same', 
                               activation='relu')(x)
        branches.append(branch)
    return layers.concatenate(branches, axis=4)

# Define the DC-TCN block
def dctcn_block(x, filters, kernel_sizes, dilation_rates):
    branches = []
    for kernel_size, dilation_rate in zip(kernel_sizes, dilation_rates):
        branch = layers.Conv3D(filters, 
                               (1, kernel_size, kernel_size), 
                               padding='same', 
                               dilation_rate=(1, dilation_rate, dilation_rate), 
                               activation='relu')(x)
        branches.append(branch)
    return layers.concatenate(branches, axis=4)
        

In [143]:
import tensorflow_addons as tfa

In [144]:
from tensorflow.keras import layers, models

# DC-TCN block
def build_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape[1:])
    x = inputs

    # Temporal Models
    for _ in range(2):
        x = dctcn_block(x, filters=128, kernel_sizes=[3, 5, 7], dilation_rates=[1, 2, 5])
    
    # Reshape before GlobalAveragePooling3D
    x = layers.Reshape((-1, input_shape[2], input_shape[3], 128))(x)
    
    # Global average pooling along the time and spatial dimensions
    x = layers.GlobalAveragePooling3D()(x)
    
    # Output layer with the desired output size (41)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs, outputs)
    
    # AdamW optimizer
    optimizer = tfa.optimizers.AdamW(learning_rate=3e-4, weight_decay=1e-4)
    model.compile(optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [145]:
model = build_model((None, None, 100, 200, 1), 40)
model.summary()

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, None, 100,   0           []                               
                                200, 1)]                                                          
                                                                                                  
 conv3d_24 (Conv3D)             (None, None, 100, 2  1280        ['input_5[0][0]']                
                                00, 128)                                                          
                                                                                                  
 conv3d_25 (Conv3D)             (None, None, 100, 2  3328        ['input_5[0][0]']                
                                00, 128)                                                    

In [146]:
model.input_shape

(None, None, 100, 200, 1)

In [147]:
model.output_shape

(None, 40)

In [148]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [149]:
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [150]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [151]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)
schedule_callback = LearningRateScheduler(scheduler)

In [152]:
example_callback = ProduceExample(test)

In [153]:
len(train)

450

In [154]:
try:
    tensorboard_callback = TensorBoard(log_dir='./logs', histogram_freq=1, profile_batch='500,520')
    model.fit(train, validation_data=test, epochs=100, batch_size=32, callbacks=[checkpoint_callback, schedule_callback, example_callback, tensorboard_callback])

    tf.debugging.experimental.disable_dump_debug_info()
except Exception as e:
    print("An error occurred during training:")
    print(str(e))

Epoch 1/100


An error occurred during training:
Graph execution error:

Detected at node 'model_4/conv3d_29/Conv3D/SpaceToBatchND' defined at (most recent call last):
    File "c:\Users\Admin\miniconda3\envs\tf\lib\runpy.py", line 196, in _run_module_as_main
      return _run_code(code, main_globals, None,
    File "c:\Users\Admin\miniconda3\envs\tf\lib\runpy.py", line 86, in _run_code
      exec(code, run_globals)
    File "c:\Users\Admin\miniconda3\envs\tf\lib\site-packages\ipykernel_launcher.py", line 17, in <module>
      app.launch_new_instance()
    File "c:\Users\Admin\miniconda3\envs\tf\lib\site-packages\traitlets\config\application.py", line 1053, in launch_instance
      app.start()
    File "c:\Users\Admin\miniconda3\envs\tf\lib\site-packages\ipykernel\kernelapp.py", line 737, in start
      self.io_loop.start()
    File "c:\Users\Admin\miniconda3\envs\tf\lib\site-packages\tornado\platform\asyncio.py", line 215, in start
      self.asyncio_loop.run_forever()
    File "c:\Users\Admin\mini