In [None]:
!pip install opencv-python matplotlib gdown tensorflow
!pip install imageio==2.23

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
import imageio
import matplotlib.pyplot as plt
import gdown

In [None]:
physical_device = tf.config.list_physical_devices('GPU')
try:
  tf.config.experimental.set_memory_growth(physical_device[0], True)
except:
  pass

# **Data Loading**

In [None]:
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

Downloading...
From (original): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL
From (redirected): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL&confirm=t&uuid=4adb3894-98f9-4841-a411-3a13393c2255
To: /content/data.zip
100%|██████████| 423M/423M [00:03<00:00, 109MB/s]


['data/',
 'data/alignments/',
 'data/alignments/s1/',
 'data/alignments/s1/bbaf2n.align',
 'data/alignments/s1/bbaf3s.align',
 'data/alignments/s1/bbaf4p.align',
 'data/alignments/s1/bbaf5a.align',
 'data/alignments/s1/bbal6n.align',
 'data/alignments/s1/bbal7s.align',
 'data/alignments/s1/bbal8p.align',
 'data/alignments/s1/bbal9a.align',
 'data/alignments/s1/bbas1s.align',
 'data/alignments/s1/bbas2p.align',
 'data/alignments/s1/bbas3a.align',
 'data/alignments/s1/bbaszn.align',
 'data/alignments/s1/bbaz4n.align',
 'data/alignments/s1/bbaz5s.align',
 'data/alignments/s1/bbaz6p.align',
 'data/alignments/s1/bbaz7a.align',
 'data/alignments/s1/bbbf6n.align',
 'data/alignments/s1/bbbf7s.align',
 'data/alignments/s1/bbbf8p.align',
 'data/alignments/s1/bbbf9a.align',
 'data/alignments/s1/bbbm1s.align',
 'data/alignments/s1/bbbm2p.align',
 'data/alignments/s1/bbbm3a.align',
 'data/alignments/s1/bbbmzn.align',
 'data/alignments/s1/bbbs4n.align',
 'data/alignments/s1/bbbs5s.align',
 'data/al

In [None]:
def load_video(path:str) -> List[float]:
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:]) # can be done using dlib too, focuses on lip (isolating the mouth)
    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '] (size =40)


In [None]:
# char_to_num(['r','i','s','h','a','b','h'])
# num_to_char([18,  9, 19,  8,  1,  2,  8])

In [None]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path: str):
    path = bytes.decode(path.numpy())
    file_name = path.split('/')[-1].split('.')[0]
    # File name splitting for windows
    # file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    if not os.path.exists(video_path):
      raise FileNotFoundError(f"Video not found: {video_path}")
    if not os.path.exists(alignment_path):
      raise FileNotFoundError(f"Alignment not found: {alignment_path}")
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments

In [None]:
# test_path = '.\\data\\s1\\bbal6n.mpg'

In [None]:
# tf.convert_to_tensor(test_path).numpy().decode('utf-8').split("\\")[-1].split(".")[0]

In [None]:
# frames, alignments = load_data(tf.convert_to_tensor(test_path))

In [None]:
# plt.imshow(frames[40])

In [None]:
# tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

In [None]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result
    # wrap inside py function if u r dealig with raw string processing

# **Data Pipeline**

In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([-1, 46, 140, 1],[40]))
data = data.prefetch(tf.data.AUTOTUNE)
# # Added for split
train = data.take(450)
test = data.skip(450)

In [None]:
# data.as_numpy_iterator().next()

In [None]:
sample = data.as_numpy_iterator()

In [None]:
val = sample.next(); val[0]

array([[[[[1.3990705 ],
          [1.5066913 ],
          [1.5066913 ],
          ...,
          [0.32286242],
          [0.35873604],
          [0.35873604]],

         [[1.3990705 ],
          [1.3631968 ],
          [1.5066913 ],
          ...,
          [0.32286242],
          [0.35873604],
          [0.35873604]],

         [[1.4708177 ],
          [1.4708177 ],
          [1.4708177 ],
          ...,
          [0.28698882],
          [0.21524161],
          [0.21524161]],

         ...,

         [[1.0403345 ],
          [1.0403345 ],
          [1.0403345 ],
          ...,
          [0.07174721],
          [0.0358736 ],
          [0.0358736 ]],

         [[1.0044608 ],
          [1.0044608 ],
          [1.0044608 ],
          ...,
          [0.0358736 ],
          [0.        ],
          [0.        ]],

         [[1.0044608 ],
          [1.0044608 ],
          [1.0044608 ],
          ...,
          [0.0358736 ],
          [0.        ],
          [0.        ]]],


        [[[1.4708

In [None]:
imageio.mimsave('./animation.gif', val[0][1], fps=10)

In [None]:
# tf.strings.reduce_join([num_to_char(word) for word in val[1][1]])

# **Deep Neural Network**

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, GlobalMaxPooling3D, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
data.as_numpy_iterator().next()[0][0].shape

(75, 46, 140, 1)

In [None]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

model.add(TimeDistributed(Flatten()))
print("Shape after GlobalAveragePooling3D:", model.output_shape)
# model.add(GlobalMaxPooling3D())
# print("Shape after GlobalAveragePooling3D:", model.output_shape)
# model.add(Reshape((75, 1)))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))
print("Shape after GlobalAveragePooling3D:", model.output_shape)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Shape after GlobalAveragePooling3D: (None, 75, 23, 70, 128)
Shape after GlobalAveragePooling3D: (None, 75, 11, 35, 256)
Shape after GlobalAveragePooling3D: (None, 75, 5, 17, 75)
Shape after GlobalAveragePooling3D: (None, 75, 6375)
Shape after GlobalAveragePooling3D: (None, 75, 256)
Shape after GlobalAveragePooling3D: (None, 75, 256)
Shape after GlobalAveragePooling3D: (None, 75, 41)


In [None]:
model.summary()

In [None]:
yhat = model.predict(val[0])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 894ms/step


# **Setup Training Options and Train**

In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint.weights.h5'), monitor='loss', save_weights_only=True)

In [None]:
schedule_callback = LearningRateScheduler(scheduler)

In [None]:
example_callback = ProduceExample(data)

In [None]:
model.fit(data, epochs=100, callbacks=[checkpoint_callback, schedule_callback, example_callback])

Epoch 1/5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Original: bin green with u seven soon
Prediction: le e e e o
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Original: lay green in f three again
Prediction: le e e e o
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m549s[0m 1s/step - loss: 98.9281 - learning_rate: 1.0000e-04
Epoch 2/5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 230ms/step
Original: lay red in k four please
Prediction: la e e e e on
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Original: bin red in z eight please
Prediction: la e e e e on
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m503s[0m 1

In [None]:
train.as_numpy_iterator().next()[0][0].shape

(75, 46, 140, 1)

# **Prediction**

In [None]:
url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('checkpoints.zip', 'models')

In [None]:
model.load_weights('models/checkpoint.weights.h5')

In [None]:
test_data = test.as_numpy_iterator()
sample = test_data.next()
yhat = model.predict(sample[0])

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

# **Test on a Video**

In [None]:
sample = load_data(tf.convert_to_tensor('./data/s1/bras9a.mpg'))

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]