In [None]:
!pip install gradio

Collecting gradio
  Downloading gradio-4.12.0-py3-none-any.whl (16.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.6/16.6 MB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl (15 kB)
Collecting fastapi (from gradio)
  Downloading fastapi-0.108.0-py3-none-any.whl (92 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.0/92.0 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ffmpy (from gradio)
  Downloading ffmpy-0.3.1.tar.gz (5.5 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gradio-client==0.8.0 (from gradio)
  Downloading gradio_client-0.8.0-py3-none-any.whl (305 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m305.1/305.1 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting httpx (from gradio)
  Downloading httpx-0.26.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
!pip install opencv-python matplotlib imageio gdown tensorflow



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install unrar

Collecting unrar
  Downloading unrar-0.4-py3-none-any.whl (25 kB)
Installing collected packages: unrar
Successfully installed unrar-0.4


In [None]:

!unrar x '/content/drive/MyDrive/videos.rar'


UNRAR 6.11 beta 1 freeware      Copyright (c) 1993-2022 Alexander Roshal


Extracting from /content/drive/MyDrive/videos.rar

Creating    videos                                                    OK
Creating    videos/s10_alignments                                     OK
Extracting  videos/s10_alignments/bbab8n.align                             0%  OK 
Extracting  videos/s10_alignments/bbab9s.align                             0%  OK 
Extracting  videos/s10_alignments/bbac1a.align                             0%  OK 
Extracting  videos/s10_alignments/bbaczp.align                             0%  OK 
Extracting  videos/s10_alignments/bbai2n.align                             0%  OK 
Extracting  videos/s10_alignments/bbai3s.align                             0%  OK 
Extracting  videos/s10_alignments/bbai4p.align                             0%  OK 
Extracting  videos/s10_alignments/bbai5a.align                             0%

In [None]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [None]:
def load_video(path:str) -> List[float]:
  try:
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        if frame is None:
          continue
        frame = tf.image.rgb_to_grayscale(frame)

        frames.append(frame[190:236,80:220,:])

    cap.release()

    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std
  except Exception as e:
    pass

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '] (size =40)


In [None]:
def load_alignments(path:str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path: str):
    path = bytes.decode(path.numpy())
    file_name = path.split('/')[-1].split('.')[0]
    speaker_name = path.split('/')[-2].split('_')[0]
    video_name = "/".join(path.split("/")[:-2])
    alignment_path = os.path.join(video_name,speaker_name + "_alignments", file_name +".align")
    video_path = os.path.join(video_name,speaker_name + "_video",file_name+".mpg")
    frames = load_video(video_path)
    if frames is not None:
      alignments = load_alignments(alignment_path)


    return frames, alignments

In [None]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

In [None]:
data2 = tf.data.Dataset.list_files('./videos/*/*.mpg')
print(data2)
data2 = data2.shuffle(500, reshuffle_each_iteration=False)
data2 = data2.map(mappable_function)
data2 = data2.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data2 = data2.prefetch(tf.data.AUTOTUNE)
print(len(data2))

<_ShuffleDataset element_spec=TensorSpec(shape=(), dtype=tf.string, name=None)>
634


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, GRU,LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))


model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))


model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))


shape = model.layers[-1].output_shape
model.add(Reshape((shape[-1],shape[1]*shape[2]*shape[3])))

model.add(Bidirectional(GRU(128,return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(GRU(128,return_sequences=True)))
model.add(Dropout(.5))

# model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
# model.add(Dropout(.5))

# model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
# model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))


In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)

In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('/content/drive/MyDrive/models','checkpoint'), monitor='loss', save_weights_only=True)

In [None]:
schedule_callback = LearningRateScheduler(scheduler)

In [None]:
train = data2.take(400)
test = data2.skip(400)
example_callback = ProduceExample(test)

In [None]:
model.load_weights('/content/drive/MyDrive/models/checkpoint')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7eb3200f27a0>

In [None]:
sample = load_data(tf.convert_to_tensor('/content/videos/s10_video/bbaczp.mpg'))

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REAL TEXT


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin blue at c zero please'>]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))



In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PREDICTIONS


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin blue at c zero please'>]

In [None]:
import gradio as gr
import glob
import cv2


video_files = glob.glob("/content/videos/*/*.mpg")
video_choices = {file.split("/")[-1]: file for file in video_files}
video_choice_input = gr.Dropdown(choices=list(video_choices.keys()), label="Select a video")

output_text = gr.Textbox(label="Real")
output_text2 = gr.Textbox(label="Predictions")
vid=gr.Video(label="Video")
#lip=gr.Image(label="Lip movement")


def predict_on_video(file):
    sample= load_data(tf.convert_to_tensor(file))
    yhat = model.predict(tf.expand_dims(sample[0], axis=0))
    decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
    predictions = [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
    return [prediction.numpy().decode() for prediction in predictions],[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]][0].numpy().decode()


def video_prediction(video_choice):
    predictions,real= predict_on_video(video_choices[video_choice])

    sample,ann = load_data(tf.convert_to_tensor(video_choices[video_choice]))
    #img= sample[0][0][randint(0,75)]  # Assuming sample[0][0][35] is an image
    # vid_path=video_choices[video_choice]
    # imageio.mimsave(video_choice[:-4]+'.gif', sample, fps=10)
    # vid_name=video_choice[:-4]+'.gif'


    vid=gr.Video(video_choices[video_choice])
    return real,predictions[0],vid
     #{
        #'Real Text': [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]][0].numpy().decode(),
        # 'Predictions': predictions[0],
        # 'Real Text':real,
      #  'Sample':sample
        # 'vid_choice':video_choice,
        # 'path':video_choices[video_choice]
    #}

gr.Interface(fn=video_prediction, inputs=[video_choice_input], outputs=[output_text,output_text2,vid], title="Lip Net").launch(debug=True)

Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://78c9132e205a77398b.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)
























