In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
import imageio
import matplotlib.pyplot as plt


In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

Data Collection

In [None]:
import gdown
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
def load_video(path:str) -> List[float]: 

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
def load_alignments(path:str) -> tf.RaggedTensor:
    with open(path, 'r') as f: 
        lines = f.readlines() 
    tokens = ""
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens += " " + line[2]
    tokens = tokens.strip()
    return char_to_num(tf.strings.unicode_split(tokens, input_encoding='UTF-8'))


In [None]:
# # the data is in the structure: data/speaker1/alignments and data/speaker1/s1 and data/speaker2/alignments and data/speaker2/s2 and so on
# def load_data(path: str): 
#     path = bytes.decode(path.numpy())
#     #file_name = path.split('/')[-1].split('.')[0]
#     # File name splitting for windows
#     file_name = path.split('\\')[-1].split('.')[0]
#     video_path = os.path.join('data','s1',f'{file_name}.mpg')
#     alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
#     frames = load_video(video_path) 
#     alignments = load_alignments(alignment_path)
    
#     return frames, alignments

def load_data(path: str):
    path = bytes.decode(path.numpy())
    # Get the speaker number from the path
    speaker = path.split('\\')[-3]
    # File name splitting for windows
    file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data', speaker, f's{speaker}', f'{file_name}.mpg')
    alignment_path = os.path.join('data', speaker, 'alignments', f'{file_name}.align')
    frames = load_video(video_path) 
    alignments = load_alignments(alignment_path)
    
    return frames, alignments



In [None]:
test_path = 'D:\\projects\\Lip ReaderAI\\data\\speaker1\\s1\\bbaf2n.mpg'

In [None]:
load_data(tf.convert_to_tensor(test_path))

In [None]:
# def mappable_function(path:str) ->List[str]:
#     result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
#     return result

def mappable_function(path:str) -> List[str]:
    
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result


In [None]:
# data = tf.data.Dataset.list_files('./data/s1/*.mpg')
# data = data.shuffle(500)
# data = data.map(mappable_function)

# # Get the maximum shape of each tensor in the dataset
# max_shapes = {"frames": [None, None, None, None], "alignments": [None]}
# for frames, alignments in data:
#     max_shapes["frames"] = [max(frames.shape[i], max_shapes["frames"][i] or 0) for i in range(len(frames.shape))]
#     max_shapes["alignments"] = [max(alignments.shape[i], max_shapes["alignments"][i] or 0) for i in range(len(alignments.shape))]

# # Use the maximum shape to pad the tensors in the dataset
# data = data.padded_batch(2, padded_shapes=(tf.TensorShape(max_shapes["frames"]), tf.TensorShape(max_shapes["alignments"])))
# data = data.prefetch(tf.data.AUTOTUNE)

data = tf.data.Dataset.list_files('./data/speaker*/s*/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)

max_shapes = {"frames": None, "alignments": None}
for frames, alignments in data:
    if max_shapes["frames"] is None:
        max_shapes["frames"] = frames.shape
    else:
        max_shapes["frames"] = tuple(max(frames.shape[i], max_shapes["frames"][i]) for i in range(len(frames.shape)))

    if max_shapes["alignments"] is None:
        max_shapes["alignments"] = alignments.shape
    else:
        max_shapes["alignments"] = tuple(max(alignments.shape[i], max_shapes["alignments"][i]) for i in range(len(alignments.shape)))

data = data.padded_batch(2, padded_shapes=(tf.TensorShape(max_shapes["frames"]), tf.TensorShape(max_shapes["alignments"])))
data = data.prefetch(tf.data.AUTOTUNE)



In [None]:
data = tf.data.Dataset.list_files('./data/speaker*/s*/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)

# Compute maximum shapes
max_shapes = {"frames": [0, 0, 0, 0], "alignments": [0]}
for frames, alignments in data:
    max_shapes["frames"] = [max(frames.shape[i], max_shapes["frames"][i]) for i in range(len(frames.shape))]
    max_shapes["alignments"] = [max(alignments.shape[i], max_shapes["alignments"][i]) for i in range(len(alignments.shape))]

# Pad the data
data = data.padded_batch(2, padded_shapes=(tf.TensorShape(max_shapes["frames"]), tf.TensorShape(max_shapes["alignments"])))

# Prefetch the data
data = data.prefetch(tf.data.AUTOTUNE)

In [None]:
test = data.as_numpy_iterator()

In [None]:
val = test.next(); val[0]

In [None]:
imageio.mimsave('test.gif', val[0][1], fps=30)

Model Building:

In [None]:
import tensorflow as tf

In [None]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
data.as_numpy_iterator().next()[0][0].shape

In [None]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [None]:
model.summary()

In [None]:
yhats = model.predict(val[0])

In [None]:
tf.strings.reduce_join([num_to_char(tf.argmax(x)) for x in yhats[1]], separator='')

Training and setup

In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback): 
    def __init__(self, dataset) -> None: 
        self.dataset = dataset.as_numpy_iterator()
    
    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):           
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True) 

In [None]:
schedule_callback = LearningRateScheduler(scheduler, verbose=1)

In [None]:
example_callback = ProduceExample(data)

In [None]:
train = data.take(450)
test = data.skip(450)

In [None]:
model.fit(train,validation_data=test, epochs=100, callbacks=[checkpoint_callback, schedule_callback, example_callback])

In [None]:
model.load_weights(os.path.join('models','checkpoint'))

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_data.next()

In [None]:
sample = test_data.next()

In [None]:
yhat = model.predict(sample[0])

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

In [None]:
print('~'*100, 'ACTUAL')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

From webcam:

In [None]:
#make the same predictions with webcam:
import cv2
import numpy as np
import tensorflow as tf
import imageio

In [None]:
#cap = cv2.VideoCapture(0)


# while True:
#     ret, frame = cap.read()
#     frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#     frame = cv2.resize(frame, (140, 46))
#     frame = frame.reshape(1, 46, 140, 1)
#     yhat = model.predict(frame)
#     decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=True)[0][0].numpy()
#     cv2.imshow('frame', frame)
#     print(tf.strings.reduce_join([num_to_char(word) for word in decoded[0]]).numpy().decode('utf-8'))
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break

#try 2:
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    frame = cv2.resize(frame, (140, 46))
    
    # Pad the frame with zeros to match the expected shape
    padded_frame = np.zeros((75, 46, 140))
    padded_frame[:frame.shape[0], :, :frame.shape[1]] = frame
    
    # Reshape the padded frame to match the expected input shape of the model
    reshaped_frame = np.reshape(padded_frame, (1, 75, 46, 140, 1))
    
    # Make the prediction
    yhat = model.predict(reshaped_frame)
    
    # Decode the prediction
    decoded = tf.keras.backend.ctc_decode(yhat, [75], greedy=True)[0][0].numpy()
    
    # Print the prediction
    print(tf.strings.reduce_join([num_to_char(word) for word in decoded[0]]).numpy().decode('utf-8'))
    
    # Show the frame
    cv2.imshow('Lip Reading', frame)


    # If q is pressed, break the loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break


In [None]:
sample = load_data(tf.convert_to_tensor('.\\data\\s1\\id2_vcd_swwp2s.mpg'))
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]