<a href="https://colab.research.google.com/github/tasnimislamraisa/Python_Learning/blob/deep-Learning/SilentNetUpdate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import cv2
import gdown
import imageio
import numpy as np
import tensorflow as tf
from typing import List
from matplotlib import pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler


In [3]:
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')


Downloading...
From (original): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL
From (redirected): https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL&confirm=t&uuid=3d4b042b-aa8c-4f62-b4d8-e80d35d7f9f6
To: /content/data.zip
100%|██████████| 423M/423M [00:03<00:00, 122MB/s]


['data/',
 'data/alignments/',
 'data/alignments/s1/',
 'data/alignments/s1/bbaf2n.align',
 'data/alignments/s1/bbaf3s.align',
 'data/alignments/s1/bbaf4p.align',
 'data/alignments/s1/bbaf5a.align',
 'data/alignments/s1/bbal6n.align',
 'data/alignments/s1/bbal7s.align',
 'data/alignments/s1/bbal8p.align',
 'data/alignments/s1/bbal9a.align',
 'data/alignments/s1/bbas1s.align',
 'data/alignments/s1/bbas2p.align',
 'data/alignments/s1/bbas3a.align',
 'data/alignments/s1/bbaszn.align',
 'data/alignments/s1/bbaz4n.align',
 'data/alignments/s1/bbaz5s.align',
 'data/alignments/s1/bbaz6p.align',
 'data/alignments/s1/bbaz7a.align',
 'data/alignments/s1/bbbf6n.align',
 'data/alignments/s1/bbbf7s.align',
 'data/alignments/s1/bbbf8p.align',
 'data/alignments/s1/bbbf9a.align',
 'data/alignments/s1/bbbm1s.align',
 'data/alignments/s1/bbbm2p.align',
 'data/alignments/s1/bbbm3a.align',
 'data/alignments/s1/bbbmzn.align',
 'data/alignments/s1/bbbs4n.align',
 'data/alignments/s1/bbbs5s.align',
 'data/al

In [4]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)


In [5]:
def load_video(path: str) -> List[float]:
    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236, 80:220, :])
    cap.release()
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

def load_alignments(path: str) -> List[str]:
    with open(path, 'r') as f:
        lines = f.readlines()
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil':
            tokens = [*tokens, ' ', line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

def load_data(path: tf.Tensor):
    path_str = path.numpy().decode()
    file_name = path_str.split('/')[-1].split('.')[0]
    video_path = os.path.join('data', 's1', f'{file_name}.mpg')
    alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)
    return frames, alignments


In [6]:
def mappable_function(path: tf.Tensor):
    video, alignment = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    video.set_shape([None, 46, 140, 1])
    alignment.set_shape([None])
    return video, alignment

data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500)
data = data.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)

train = data.padded_batch(2, padded_shapes=([None, 46, 140, 1], [None]))
test = data.padded_batch(2, padded_shapes=([None, 46, 140, 1], [None]))

train = train.prefetch(tf.data.AUTOTUNE)
test = test.prefetch(tf.data.AUTOTUNE)


In [7]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    return tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)


In [8]:
from tensorflow.keras import layers, models

inputs = layers.Input(shape=(None, 46, 140, 1))

x = layers.Conv3D(128, 3, padding='same', activation='relu')(inputs)
x = layers.MaxPool3D((1, 2, 2))(x)

x = layers.Conv3D(256, 3, padding='same', activation='relu')(x)
x = layers.MaxPool3D((1, 2, 2))(x)

x = layers.Conv3D(75, 3, padding='same', activation='relu')(x)
x = layers.MaxPool3D((1, 2, 2))(x)

x = layers.TimeDistributed(layers.Flatten())(x)
x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, kernel_initializer='Orthogonal'))(x)
x = layers.Dropout(0.5)(x)
x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, kernel_initializer='Orthogonal'))(x)
x = layers.Dropout(0.5)(x)

outputs = layers.Dense(char_to_num.vocabulary_size() + 1, activation='softmax')(x)

model = models.Model(inputs, outputs)
model.compile(optimizer='adam', loss=CTCLoss)
model.summary()


In [9]:
os.makedirs("models", exist_ok=True)

checkpoint_callback = ModelCheckpoint(
    filepath=os.path.join('models', 'checkpoint.weights.h5'),
    monitor='loss',
    save_weights_only=True,
    save_best_only=False
)

def scheduler(epoch, lr):
    return lr if epoch < 30 else lr * tf.math.exp(-0.1)

schedule_callback = LearningRateScheduler(scheduler)


In [10]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset, num_to_char):
        super().__init__()
        self.dataset = dataset.as_numpy_iterator()
        self.num_to_char = num_to_char

    def on_epoch_end(self, epoch, logs=None):
        data = self.dataset.next()
        frames = data[0]
        y_true = data[1]

        y_pred = self.model.predict(frames)
        decoded = tf.argmax(y_pred[0], axis=-1)
        predicted_text = tf.strings.reduce_join(self.num_to_char(decoded)).numpy().decode()
        actual_text = tf.strings.reduce_join(self.num_to_char(y_true[0])).numpy().decode()

        print(f'\n[Epoch {epoch + 1}] 🔮 Prediction: {predicted_text}')
        print(f'[Epoch {epoch + 1}] ✅ Ground Truth: {actual_text}')

        plt.imshow(frames[0][35, :, :, 0], cmap='gray')
        plt.title("Sample Frame from Video")
        plt.axis('off')
        plt.show()

example_callback = ProduceExample(test, num_to_char)


In [None]:
model.fit(
    train,
    validation_data=test,
    epochs=10,
    callbacks=[checkpoint_callback, schedule_callback, example_callback]
)


Epoch 1/10
[1m 17/500[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m7:30:00[0m 56s/step - loss: 110.9709

In [None]:
import gdown
import zipfile

url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
output = 'checkpoints.zip'
gdown.download(url, output, quiet=False)

with zipfile.ZipFile(output, 'r') as zip_ref:
    zip_ref.extractall('models')


In [None]:
model.load_weights('models/checkpoint.weights.h5')

In [None]:
test_data = test.as_numpy_iterator()
sample = test_data.next()
yhat = model.predict(sample[0])  # shape: (batch_size, time, vocab_size + 1)

In [None]:
print('~' * 100, 'REAL TEXT')

ground_truths = [
    tf.strings.reduce_join(num_to_char(tf.cast(sentence, tf.int32))).numpy().decode()
    for sentence in sample[1]
]

for text in ground_truths:
    print("✅", text)


In [None]:
print('~' * 100, 'PREDICTIONS')

input_lengths = [yhat.shape[1]] * yhat.shape[0]  # [time_steps, time_steps, ...] for each sample

decoded = tf.keras.backend.ctc_decode(yhat, input_length=input_lengths, greedy=True)[0][0]

pred_texts = [
    tf.strings.reduce_join(num_to_char(tf.cast(sentence, tf.int32))).numpy().decode()
    for sentence in decoded
]

for text in pred_texts:
    print("🔮", text)


# **Test on A Video**

In [None]:
sample = load_data(tf.convert_to_tensor('./data/s1/bras9a.mpg'))  # or use Path() if preferred

In [None]:
print('~' * 100, 'REAL TEXT')
ground_truth = tf.strings.reduce_join(num_to_char(tf.cast(sample[1], tf.int32))).numpy().decode()
print("✅", ground_truth)

In [None]:
# Predict on single sample (add batch dimension)
yhat = model.predict(tf.expand_dims(sample[0], axis=0))  # shape: (1, time, vocab+1)

# Decode prediction using CTC greedy decoder
input_len = [yhat.shape[1]]
decoded = tf.keras.backend.ctc_decode(yhat, input_length=input_len, greedy=True)[0][0]


In [None]:
print('~' * 100, 'PREDICTIONS')
predicted_text = tf.strings.reduce_join(num_to_char(decoded[0])).numpy().decode()
print("🔮", predicted_text)
