In [1]:
%pip install opencv-python matplotlib imageio gdown tensorflow

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
from matplotlib import pyplot as plt
import imageio

In [3]:
physical_devices=tf.config.list_physical_devices('GPU')

# Check if GPU is available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  0


In [4]:
try:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
    pass

In [None]:
import tensorflow as tf
from keras.layers import SimpleRNN
from keras.models import Sequential
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

model_rnn = Sequential()
model_rnn.add(SimpleRNN(units=20, activation='relu', input_shape=(10,5)))

print(model_rnn.summary())

In [5]:
import gdown


In [None]:
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet=False)
gdown.extractall('data.zip')

In [None]:
def load_video(path:str) -> List[float]: 

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        # frames.append(frame[190:236,80:220,:])
        frames.append(frame[150:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std

In [None]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]


In [None]:
load_video(test_path)

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
def load_alignments(path:str) -> List[str]: 
    with open(path, 'r') as f: 
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [None]:
def load_data(path: str):
    path = bytes.decode(path.numpy())
    file_name = path.split('/')[-1].split('.')[0]
    # File name splitting for windows
    # file_name = path.split('\\')[-1].split('.')[0]
    video_path = os.path.join('data','s1',f'{file_name}.mpg')
    alignment_path = os.path.join('data','alignments','s1',f'{file_name}.align')
    frames = load_video(video_path)
    alignments = load_alignments(alignment_path)

    return frames, alignments

In [7]:
test_path = './data/s1/yyyyy.mpg'


In [None]:
tf.convert_to_tensor(test_path).numpy().decode('utf-8').split('/')[-1].split('.')[0]


In [None]:
frames, alignments = load_data(tf.convert_to_tensor(test_path))


In [None]:
plt.imshow(frames[1])


In [None]:
alignments


In [None]:
tf.strings.reduce_join([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])


In [None]:
def mappable_function(path:str) ->List[str]:
    result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    return result

In [None]:
from matplotlib import pyplot as plt


In [None]:
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500, reshuffle_each_iteration=False)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)
# Added for split
train = data.take(450)
test = data.skip(450)

In [None]:
frames, alignments = data.as_numpy_iterator().next()


In [None]:
data.as_numpy_iterator().next()


In [None]:
sample = data.as_numpy_iterator()


In [None]:
val = sample.next(); val[0][0]


In [None]:
plt.imshow(val[0][0][35])


In [None]:
tf.strings.reduce_join([num_to_char(word) for word in val[1][0]])


In [None]:
from tf.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
data.as_numpy_iterator().next()[0][0].shape


In [None]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,86,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [None]:
model.summary()


In [None]:
yhat = model.predict(val[0])


In [None]:
tf.strings.reduce_join([num_to_char(x) for x in tf.argmax(yhat[0],axis=1)])


In [None]:
tf.strings.reduce_join([num_to_char(tf.argmax(x)) for x in yhat[0]])


In [None]:
model.input_shape


In [None]:
model.output_shape


In [None]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [None]:
class ProduceExample(tf.keras.callbacks.Callback):
    def __init__(self, dataset) -> None:
        self.dataset = dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
from tensorflow.keras.optimizers import legacy


In [None]:
model.compile(optimizer=legacy.Adam(learning_rate=0.0001), loss=CTCLoss)


In [None]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True)


In [None]:
schedule_callback = LearningRateScheduler(scheduler)


In [None]:
example_callback = ProduceExample(test)


In [None]:
model.fit(train, validation_data=test, epochs=2, callbacks=[checkpoint_callback, schedule_callback, example_callback])


In [None]:
test_path = './data/s1/bbc.mp4'


In [8]:
import cv2
import matplotlib.pyplot as plt

# Load pre-trained face cascade
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# Specify the path to the video file
test_path = './data/s1/yyyyy.mpg'

# Open video capture
cap = cv2.VideoCapture(test_path)

# Array to store cropped faces
cropped_faces = []

# Process video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Detect faces in the frame
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

    # Store cropped faces
    for (x, y, w, h) in faces:
        # Crop the frame to the size of the detected face
        cropped_face = frame[y:y+h, x:x+w]
        
        # Apply histogram equalization to enhance pixels
        cropped_face_eq = cv2.equalizeHist(cv2.cvtColor(cropped_face, cv2.COLOR_BGR2GRAY))
        cropped_faces.append(cropped_face_eq)

# Release video capture
cap.release()

# Plot the first cropped face if any were detected
if cropped_faces:
    plt.imshow(cropped_faces[140])  # First cropped face
    plt.title('First Cropped Face (Enhanced)')
    plt.axis('off')
    plt.show()
else:
    print("No faces detected.")


No faces detected.


OpenCV: Couldn't read video stream from file "./data/s1/yyyyy.mpg"
[ERROR:0@628.030] global cap.cpp:166 open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.7.0) /Users/xperience/GHA-OCV-Python/_work/opencv-python/opencv-python/opencv/modules/videoio/src/cap_images.cpp:253: error: (-5:Bad argument) CAP_IMAGES: can't find starting number (in the name of file): ./data/s1/yyyyy.mpg in function 'icvExtractPattern'




In [None]:
for i in range(len(cropped_faces)):
    plt.imshow(cv2.cvtColor(cropped_faces[i], cv2.COLOR_BGR2RGB))  # First cropped face
    plt.title('First Cropped Face (Resized)')
    plt.axis('off')
    plt.show()
else:
    print("No faces detected.")

In [None]:
import cv2
import torch
import numpy as np
from torchvision.transforms import ToTensor
import RRDBNet_arch as arch

# Load pre-trained face cascade
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# Load the pre-trained Real-ESRGAN model
model_path = 'models/RRDB_ESRGAN_x4.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = arch.RRDBNet(3, 3, 64, 23, gc=32).to(device)
model.load_state_dict(torch.load(model_path), strict=True)
model.eval()



In [None]:
from torchvision.transforms import ToTensor, ToPILImage
from PIL import Image
import torch

# Preprocess the input image
input_image = Image.open("me.png")
input_image = input_image.convert("RGB")

preprocess = ToTensor()
input_tensor = preprocess(input_image).unsqueeze(0).to(device)

# Perform image enhancement
with torch.no_grad():
    enhanced_tensor = model(input_tensor).clamp(0.0, 1.0)

# Postprocess the enhanced image
postprocess = ToPILImage()
enhanced_image = postprocess(enhanced_tensor.squeeze(0).cpu())

# Save or display the enhanced image
enhanced_image.save("enhanced_image.jpg")
enhanced_image.show()

In [None]:
enhanced_image.save("enhanced_image.jpg")
enhanced_image.show()