<a href="https://colab.research.google.com/github/leeds1219/DL_and_AI_Notes_and_Projects/blob/main/ColabPro_Project_Practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gdown

url = 'https://drive.google.com/u/0/uc?id=1UtkewgHB_tjxEweDsxb4dtfRC4uIByHg&export=download'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
import os
import cv2
import numpy as np
from typing import List
from matplotlib import pyplot as plt

In [None]:
import tensorflow as tf
# Make word tokens a to z, ?!, 0 to 9 and a blank
# the dataset provides intructions
# Removed the capitals since it does not have meanings
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz!?0123456789 "]

# tf.keras.layers.StringLookup(
#    max_tokens=None,
#    num_oov_indices=1,
#    mask_token=None,
#    oov_token='[UNK]',
#    vocabulary=None,
#    idf_weights=None,
#    encoding='utf-8',
#    invert=False,
#    output_mode='int',
#    sparse=False,
#    pad_to_max_tokens=False,
#    **kwargs
#)
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

In [None]:
def load_s1(dir:str) -> List[float]:

# We want to take the path as a str and return the normalized List of float
    Video_Capture = cv2.VideoCapture(dir)
    frames = []

# used opencv to read the video and capture the frames
    for _ in range(int(Video_Capture.get(cv2.CAP_PROP_FRAME_COUNT))):
# ret(a boolean) tells if the reading was successfull(True) or not(False)
        ret, frame = Video_Capture.read()

# changed the video to grayscale save resource
        frame = tf.image.rgb_to_grayscale(frame)

# Cut the mouth part and save since we do not want to use extra resource
# The original LipNet CTC paper used a extract_mouth_batch.py
# To save resource we just cut the lower part of the video
        frames.append(frame[190:236,80:220,:])
    Video_Capture.release()

# normalize the frames
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
def load_alignments(dir:str) -> List[str]:
# Same as video
    with open(dir, 'r') as f:
        lines = f.readlines()

# save the tokens here
    tokens = []

# In the labe the lines separate the words in the alignment folder
    for line in lines:

# Split the lines
        line = line.split()

# If sil it has no meaning so remove
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]

# char_to_num inputs take a single letter so we have to break the tokens into letters
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(dir: str):
# we need decoding to use split functions
    dir = bytes.decode(dir.numpy())

# split the directory of the files
    file_name = dir.split('/')[-1].split('.')[0]

# get the video folder which is /data/s1/*.mpg
    video_dir = os.path.join('data','s1',f'{file_name}.mpg')

# get the alignments which directory is /data/alignments/s1/*.align
    alignment_dir = os.path.join('data','alignments','s1',f'{file_name}.align')

# load them with the function together
# if we separatly load s1 and alignment we have to match them every epoch
    frames = load_s1(video_dir)
    alignments = load_alignments(alignment_dir)
    return frames, alignments

In [None]:
# A sample to test if the funcitons work
sample_dir = '/content/data/s1/bbaf2n.mpg'

In [None]:
# Check the type, shape of the to be input tensors
# Frames
frames, alignments = load_data(tf.convert_to_tensor(sample_dir))

In [None]:
import torch
frames.shape

In [None]:
alignments.shape

In [None]:
# For tensorflow we need py_function to use str
def mappable_function(dir:str) ->List[str]:
    result = tf.py_function(load_data, [dir], (tf.float32, tf.int64))
    return result

In [None]:
# list_files method allow elements of file list to be a independent data
# and shuffles by default
data = tf.data.Dataset.list_files('./data/s1/*.mpg')

# we can also shuffle the data using .shuffle()
# to save resource turned of shuffling every epoch
data = data.shuffle(500, reshuffle_each_iteration=False)

# {nameofdataset}.map(f) creates a new dataset with the given function f
# we can add data augmentations here such as random rotations, resizing etc
# returns the frames ans alignments
# use frames, alignments = data.as_numpy_iterator().next()
data = data.map(mappable_function)

# .padded_batch(batch_size, padded_shapes=None, padding_values=None, drop_remainder=False)
# the TensorShape([75, 46, 140, 1]) we can see that the frames are 75
# and ensure that there are 40 tokens in the alignments if less padded to zero
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))

# preload as the epochs are performed hide Memory latency
# optimize the process!
data = data.prefetch(tf.data.AUTOTUNE)

In [None]:
# Check the input shape keep this!
data.as_numpy_iterator().next()[0][0].shape

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv3D, Activation, MaxPool3D, TimeDistributed, Flatten, Bidirectional, LSTM, Dropout, Dense

# Attention layer need to check dimesion of model and attention heads
#class AttentionLayer(tf.keras.layers.Layer):
#  def __init__(self, **kwargs):
#    super(AttentionLayer, self).__init__(**kwargs)

#  def build(self, input_shape):
#    self.W_q = self.add_weight(name="W_q", shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform')
#    self.W_k = self.add_weight(name="W_k", shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform')
#    self.W_v = self.add_weight(name="W_v", shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform')

#  def call(self, x):
#    q = tf.tensordot(x, self.W_q, axes=[-1, 0]) # Query
#    k = tf.tensordot(x, self.W_k, axes=[-1, 0]) # Key
#    v = tf.tensordot(x, self.W_v, axes=[-1, 0]) # Value

#    attn_logits = tf.matmul(q, k, transpose_b=True)
#    attn_logits = tf.nn.softmax(attn_logits, axis=-1)
#    output = tf.matmul(attn_logits, v)
#    return output

# The Original LipNet paper used Spatial Pooling layer x 3, Bi-GRU x 2, linear x 1 and CTC loss fucntion
class MyLipReadModel(Model):
  def __init__(self, input_shape, vocabulary_size, **kwargs):
    super(MyLipReadModel, self).__init__(**kwargs)

# Conv3D layers to process video data type
    self.conv1 = Conv3D(128, 3, padding='same', input_shape=input_shape)
    self.relu1 = Activation('relu')
    self.maxpool1 = MaxPool3D((1, 2, 2))

    self.conv2 = Conv3D(256, 3, padding='same')
    self.relu2 = Activation('relu')
    self.maxpool2 = MaxPool3D((1, 2, 2))

    self.conv3 = Conv3D(75, 3, padding='same')
    self.relu3 = Activation('relu')
    self.maxpool3 = MaxPool3D((1, 2, 2))

# The original paper used two highway layers
# Since we just want to keep the 75 frames use a TimeDistributed layer and flatten
    self.time_dist = TimeDistributed(Flatten())

# paper used bidirectional LSTM model and a CTC loss function
# LSTM was used in most Language models so why not add it
# The paper used the Orthogonal initializer
# initial Dropout was set to 0.5
    self.lstm1 = Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True))
    self.dropout1 = Dropout(0.5)

    self.lstm2 = Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True))
    self.dropout2 = Dropout(0.5)

# The Best performing paper added an additional attention layer here before the CTC
# atten1 75x28
#    self.attention = AttentionLayer()

    self.dense = Dense(vocabulary_size + 1, kernel_initializer='he_normal', activation='softmax')

# Perform forwards
  def call(self, inputs):
    x = self.conv1(inputs)
    x = self.relu1(x)
    x = self.maxpool1(x)

    x = self.conv2(x)
    x = self.relu2(x)
    x = self.maxpool2(x)

    x = self.conv3(x)
    x = self.relu3(x)
    x = self.maxpool3(x)

    x = self.time_dist(x)

    x = self.lstm1(x)
    x = self.dropout1(x)

    x = self.lstm2(x)
    x = self.dropout2(x)

#    x=self.attention(x)

    output = self.dense(x)
    return output

  def summary(self):
    x = tf.keras.layers.Input(shape = (75, 46, 140, 1))
    model = Model(inputs = x, outputs = self.call(x))
    return model.summary()

input_shape = (75, 46, 140, 1)
vocabulary_size = char_to_num.vocabulary_size()

mymodel = MyLipReadModel(input_shape, vocabulary_size)
mymodel.summary()

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

# Learning rate schedular
# Step Decay
#def step_decay(epoch):
#    start = 0.1
#    drop = 0.5
#    epochs_drop = 5.0
#    lr = start * (drop ** np.floor((epoch)/epochs_drop))
#    return lr
#model = Sequential([Dense(10)])
#model.compile(optimizer=SGD(), loss='mse')
#lr_scheduler = LearningRateScheduler(step_decay, verbose=1)
#history = model.fit(np.arange(10).reshape(10, -1), np.zeros(10),
#                    epochs=10, callbacks=[lr_scheduler], verbose=0)
# Cosine Decay
#cos_decay = tf.keras.experimental.CosineDecay(initial_learning_rate=0.001, decay_steps=50, alpha=0.001)
#model = Sequential([Dense(10)])
#model.compile(optimizer=SGD(cos_decay), loss='mse')
#lr_scheduler = LearningRateScheduler(cos_decay, verbose=1)
#history = model.fit(np.arange(10).reshape(10, -1), np.zeros(10),
#                    epochs=10, verbose=0)
# Cosine Decay Annealling
#cos_decay_ann = tf.keras.experimental.CosineDecayRestarts(initial_learning_rate=0.1, first_decay_steps=10, t_mul=1, m_mul=0.9, alpha=0)
#model = Sequential([Dense(10)])
#model.compile(optimizer=SGD(learning_rate=cos_decay_ann), loss='mse')
#history = model.fit(np.arange(10).reshape(10, -1), np.zeros(10),
#                    epochs=10, verbose=0)
# PyTorch LRs
# ExponetialLR
#optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)
#scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.5)
#import torch
#import torch.nn as nn
#import torch.optim as optim
#from data import AudioDataset, AudioDataLoader
#from matplotlib import pyplot as plt

#class Model(nn.Module):
#    def __init__(self):
#        super(Model, self).__init__()
#        self.linear = nn.Linear(10, 10)
#        self.activation = nn.ReLU()
#    def forward(self, x):
#        return self.activation(self.linear1(x))

# data
#tr_dataset = AudioDatset('tr')
#data_loader = AudioDataLoader(tr_dataset, batch_size=3, shuffle=1)
# model
#model = Model()
# loss
#loss = nn.MSELoss()
# optimizer
#optimizer = optim.Adam(model.parameters(), lr=1e-3)
#scheduler
#scheduler = optim.lr_scheduler.LambdaLR(optimizer=optimizer,
#                                        lr_lambda=lambda epoch: 0.95 ** epoch,
#                                        last_epoch=-1,
#                                        verbose=False)

#epochs=100
#for epoch in range(epochs):
#    for i, (data) in enumerate(data_loader):
#        x_data, y_data = data
#        optimizer.zero_grad()

#        estimated_y = model(x_data)
#        loss = loss(y_data, estimated_y)
#        loss.backward()
#        optimizer.step()
#    scheduler.step() # you can set it like this!
# Cosine annealing
#optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
#scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50, eta_min=0)
#Started with exp and change if it doesn't work
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)