# Setting up the environment:

In [None]:
import cv2
import os
import numpy as np
import imageio
import dlib
import tensorflow as tf
import pathlib
import requests
from typing import List
from matplotlib import pyplot as plt

In [None]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.models import load_model
from concurrent.futures import ThreadPoolExecutor

In [None]:
data_path = pathlib.Path('data')  # Use pathlib for cross-platform compatibility

In [None]:
physical_devices = tf.config.list_physical_devices('GPU')
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

Only ran once to download the data from the google drive and github. The data is already in the repository.
```python
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

url = "https://github.com/JeffTrain/selfie/raw/master/shape_predictor_68_face_landmarks.dat"
file_name = "shape_predictor_68_face_landmarks.dat"
response = requests.get(url)
with open(file_name, 'wb') as f:
    f.write(response.content)
```

*** 

# Preprocessing
I am using the following excerpt from the paper as a reference to preprocess the data:

All videos are 3 seconds long with a frame rate of 25fps. The videos were processed with the DLib face detector, and the iBug face landmark predictor (Sagonas et al., 2013) with 68 landmarks coupled with an online Kalman Filter. Using these landmarks, we apply an affine transformation to extract a mouth-centred crop of size 100 × 50 pixels per frame. We standardise the RGB channels over the whole training set to have zero mean and unit variance.

In [None]:
hog_face_detector = dlib.get_frontal_face_detector()
dlib_facelandmark = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")

def preprocess_video_frame(frame):
    # Convert frame to grayscale
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = hog_face_detector(frame_gray)
    if len(faces) == 0:
        return None
    face = faces[0]
    landmarks = dlib_facelandmark(frame_gray, face)
    if landmarks:
        mouth_points = np.array([[p.x, p.y] for p in landmarks.parts()[48:68]])
        x, y, w, h = cv2.boundingRect(mouth_points)
        if w > 0 and h > 0:  
            # Check if bounding box is valid
            mouth = frame_gray[y:y+h, x:x+w]
            mouth_resized = cv2.resize(mouth, (100, 50))
            return mouth_resized
    return None

def preprocess_video(path: str) -> List[np.ndarray]:
    cap = cv2.VideoCapture(path)
    frames = []
    with ThreadPoolExecutor() as executor:
        futures = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            future = executor.submit(preprocess_video_frame, frame)
            futures.append(future)
        for future in futures:
            result = future.result()
            if result is not None:
                frames.append(result)
    cap.release()

    if frames:
        frames_np = np.stack(frames)
        mean = np.mean(frames_np, axis=0)
        std = np.std(frames_np, axis=0)
        frames_standardized = (frames_np - mean) / std
        return frames_standardized.tolist()
    else:
        return []

## <span style='font-family:sans-serif'>  Creating a vocabulary
This can give us values that can be passed to a loss function by tokenization. This method is inspired by this example https://keras.io/examples/audio/ctc_asr/.

In [None]:
vocab = [letter for letter in "abcdefghijklmnopqrstuvwxyz'!?1234567890 "]

In [None]:
char_to_num = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), 
    oov_token='')
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), 
    oov_token='', 
    invert=True)

print(
    f"The vocabulary contains: {char_to_num.get_vocabulary()}, \n"
    f"And the size of the vocabulary is {char_to_num.vocabulary_size()}"
)

*** 

# Load in data:

In [None]:
test_path = 'data/s1/bbaf2n.mpg'

We are able to find the name of the file by splitting as follows:

In [None]:
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('/')[-1].split('.')[0]

This can be applied on a larger scale to get the names of all the files in the dataset.

### Setting up the loading functions:

In [None]:
def load_alignments(path:str) -> List[str]: 
    with open(path, 'r') as f: 
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path: str): 
    path = bytes.decode(path.numpy())
    file_name = path.split(os.sep)[-1].split('.')[0]
    video_path = os.path.join(data_path, 's1', f'{file_name}.mpg')
    alignment_path = os.path.join(data_path, 'alignments', 's1', f'{file_name}.align')
    frames = preprocess_video(video_path) 
    alignments = load_alignments(alignment_path)
    
    return frames, alignments

Checking the results of the loading functions:

frames, alignments = load_data(tf.convert_to_tensor(test_path))
plt.imshow(frames[20])
plt.show()
print(tf.strings.reduce_join([num_to_char(word) for word in alignments]))

In [None]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

*** 

# Creating the data pipeline:

In [None]:
data = tf.data.Dataset.list_files(str(data_path / 's1' / '*.mpg'))
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
# 75 frames, don't change the size of frames. 40 tokens in the alignments.
data = data.padded_batch(2, padded_shapes=(([75, None, None], [40])))
data = data.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
#splitting data into training and validation
train_data = data.take(450)
val_data = data.skip(450)

Verifying the batching and the shape of the data works as expected:

In [None]:
frames, alignments = data.as_numpy_iterator().next()
val = data.as_numpy_iterator().next(); val[0]
print(frames.shape, alignments.shape)

In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
#75 frames, don't change size of frames. 40 tokens in the alignments.
data = data.padded_batch(2, padded_shapes=(([75, None, None], [40])))
data = data.prefetch(tf.data.experimental.AUTOTUNE)

Making a gif out of a sample video:

In [None]:
sample = str(data_path / 's1' / 'bbaf2n.mpg')
frames = preprocess_video(sample)  # Ensure this returns a list of np.ndarray frames

frames_array = np.array(frames)
min_val = frames_array.min()
max_val = frames_array.max()

with imageio.get_writer("./mouth_movement.gif", mode='I') as writer:
    for frame in frames_array:
        normalized_frame = ((frame - min_val) * (255 / (max_val - min_val))).astype('uint8')
        writer.append_data(normalized_frame)

# Setting up the initial Neural Network:
Using CTC loss to train the model. The model is a simple CNN with a GRU layer. We are using this, as I expect the later-recieved data to not be as clean as the data we are training on. In other words, the model should be constructed for non-alligned data although we are working with aligned training data.
I will go back and set up a more complex model later, but for now, I will use this simple model to have something to work with.

In [None]:
def create_model(input_shape, output_size):
    model = Sequential()
    model.add(Conv3D(128, (3, 3, 3), activation='relu', padding='same', input_shape=input_shape))
    model.add(MaxPool3D((1, 2, 2)))
    model.add(Conv3D(256, (3, 3, 3), activation='relu', padding='same'))
    model.add(MaxPool3D((1, 2, 2)))
    model.add(Conv3D(64, (3, 3, 3), activation='relu', padding='same'))
    model.add(MaxPool3D((1, 2, 2)))
    model.add(TimeDistributed(Flatten()))
    model.add(Bidirectional(LSTM(256, return_sequences=True, kernel_initializer='Orthogonal')))
    model.add(Dropout(0.2))
    model.add(Bidirectional(LSTM(256, return_sequences=True, kernel_initializer='Orthogonal')))
    model.add(Dropout(0.2))
    model.add(Dense(output_size, activation='softmax', kernel_initializer='he_normal'))
    return model

model = create_model(input_shape=(75, 50, 100, 1), output_size=char_to_num.vocabulary_size() + 1)

In [None]:
model.summary()

Checking if the model will return an output:

In [None]:
yhat = model.predict(val[0])
tf.strings.reduce_join([num_to_char(word) for word in tf.argmax(yhat, axis=-1)[0]])

# Traning the model:


In [None]:
def scheduler(epoch, lr):
    if epoch < 32:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

Defining the CTC loss function from https://keras.io/examples/audio/ctc_asr/.

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset):
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None):
        data = self.dataset.next()
        yhat = model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, input_length=np.ones(yhat.shape[0]) * yhat.shape[1], greedy=False)[0][0].numpy()
        for i in range(len(yhat)):
            print('Actual:', tf.strings.reduce_join([num_to_char(word) for word in data[1][i]]).numpy().decode('utf-8'))
            print('Predicted:', tf.strings.reduce_join([num_to_char(word) for word in decoded[i]]).numpy().decode('utf-8'))
            print('*'*100)

In [None]:
latest_checkpoint = 'models/checkpoints.weights.h5'
model.load_weights(latest_checkpoint)

In [None]:
model.compile(optimizer=Adam(learning_rate=1e-3), loss=CTCLoss)
checkpoint = ModelCheckpoint(os.path.join('models', 'checkpoints.weights.h5'), monitor='loss', save_weights_only=True, mode='min', verbose=1)
schedule = LearningRateScheduler(scheduler, verbose=1)
produce_example = ProduceExample(val_data)

In [None]:
model.fit(train_data, validation_data=val_data, epochs=128, callbacks=[checkpoint, schedule, produce_example])

In [None]:
model.save('optimized_model.h5')