### Data Processing

In [None]:
import os
import cv2
import numpy as np

#!pip install yt_dlp

import yt_dlp

import matplotlib.pyplot as plt

def download_youtube_video(url, output_path):
    ydl_opts = {
    'format': 'bestvideo[height=1080][ext=mp4]/mp4',
    'outtmpl': output_path,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])

    return output_path

def video_to_3d_data_frame(video_path):
    video_data = []
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Could not open video.")
        return None

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        video_data.append(rgb_frame)

    cap.release()
    video_data_array = np.array(video_data)
    return video_data_array

def yt_to_data(url):
    # Usage example
    youtube_url = url
    video_path = 'downloaded_video.mp4'
    downloaded_video_path = download_youtube_video(youtube_url, video_path)
    video_data_array = video_to_3d_data_frame(downloaded_video_path)

    # remove the downloaded file after processing
    os.remove(downloaded_video_path)

    return video_data_array


# Usage example
train_url1 = 'https://www.youtube.com/watch?v=j0HoMaaQj9I'
train_url2 = 'https://www.youtube.com/watch?v=1jzJGcRdxPY'
train_url3 = 'https://www.youtube.com/watch?v=Bs58RoTf-g8'
train_url4 = 'https://www.youtube.com/watch?v=XoyYtqi5u54'
#train_url5 = 'https://www.youtube.com/watch?v=rMPkUuMq024'
video_data_array = np.concatenate( (yt_to_data(train_url1),
                                    yt_to_data(train_url2),
                                    yt_to_data(train_url3),
                                    yt_to_data(train_url4),
                                    #yt_to_data(train_url5),
                                    ), axis=0)

test_url = 'https://www.youtube.com/watch?v=YMG_w3XwnrQ'
test_array = yt_to_data(test_url)

In [None]:
import torch
from torch.nn import functional

print(video_data_array.shape)

video_data_array = torch.from_numpy(video_data_array)

### Setting up Training Pairs for Data

In [None]:
def create_training_pairs(video_data_array):
    print("Starting the Numpy!")
    length = len(video_data_array) - 2
    # pre-allocate tensors
    input_frames = torch.zeros((length // 2, 2, *video_data_array.shape[1:]), dtype=video_data_array.dtype)
    target_frames = torch.zeros((length // 2, *video_data_array.shape[1:]), dtype=video_data_array.dtype)

    print("Entering loop")
    for i in range(0, length, 2):
        input_frames[i // 2] = video_data_array[i:i+2]  # two consecutive frames
        target_frames[i // 2] = video_data_array[i+1]   # middle frame
        if i % 100 == 0:
          print(i, "/", length, flush=True)
    print("Finished loop")
    return input_frames, target_frames

input_frames, target_frames = create_training_pairs(video_data_array)
print("Finished the Numpy!")

video_data_array = None # we want to free this from memory now

### Creating the CNN Model

In [None]:
device = "cpu"

#!cat /proc/cpuinfo

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, TensorDataset

class FrameInterpolationCNN(nn.Module):
    def __init__(self):
        super(FrameInterpolationCNN, self).__init__()
        # Encoding layers for individual frames
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1),  # downsample
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        # Decoding layers to interpolate the frame
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256 * 2, 128, kernel_size=3, stride=2, padding=1, output_padding=1), # upsample
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )

    def forward(self, frame1, frame2):
        # Encode both frames separately
        enc1 = self.encoder(frame1)
        enc2 = self.encoder(frame2)

        # Concatenate the encoded frames
        enc = torch.cat((enc1, enc2), dim=1)  # concatenate along the channel dimension

        # Decode to get the interpolated frame
        out = self.decoder(enc)
        return out

class FrameDataset(Dataset):
    def __init__(self, input_frames, target_frames):
        self.input_frames = input_frames
        self.target_frames = target_frames

    def __len__(self):
        return len(self.target_frames)

    def __getitem__(self, idx):
        # normalize pixel data to range [0, 1]
        frame1 = self.input_frames[idx][0].float() / 255.0
        frame2 = self.input_frames[idx][1].float() / 255.0
        target_frame = self.target_frames[idx].float() / 255.0

        # permute the dimensions to [C, H, W]
        frame1 = frame1.permute(2, 0, 1)
        frame2 = frame2.permute(2, 0, 1)
        target_frame = target_frame.permute(2, 0, 1)

        return (frame1, frame2), target_frame



model = FrameInterpolationCNN().to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5

frame_dataset = FrameDataset(input_frames, target_frames)
train_loader = DataLoader(frame_dataset, batch_size=10, shuffle=False, num_workers=12) # I ran it on 12 CPU Cores, use GPU if you can

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, ((frame1, frame2), target_frame) in enumerate(train_loader):
        frame1, frame2, target_frame = frame1.to(device), frame2.to(device), target_frame.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(frame1, frame2)

        # Compute loss
        loss = criterion(outputs, target_frame)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()  

        # print statistics so that we can tell the progress of this model as it runs
        running_loss += loss.item()
        if i % 10 == 9:    # print every 10 mini-batches
            print(f'Epoch: {epoch + 1}, Batch: {i + 1}, Loss: {running_loss / 10:.4f}')
            running_loss = 0.0

print('Finished Training')


### Testing the Model

In [None]:
test_input_frames, test_target_frames = create_training_pairs(torch.from_numpy(test_array))

test_dataset = FrameDataset(test_input_frames, test_target_frames)
#test_dataset = FrameDataset(input_frames, target_frames)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
#test_loader = DataLoader(frame_dataset, batch_size=1, shuffle=True)

model.eval()

import matplotlib.pyplot as plt

def plot_frames(frame1, frame2, true_frame, predicted_frame, title=""):
    fig, axs = plt.subplots(1, 5, figsize=(20, 5))
    axs[0].imshow(frame1.permute(1, 2, 0))
    axs[0].set_title("Frame 1")
    axs[0].axis('off')

    axs[1].imshow(frame2.permute(1, 2, 0))
    axs[1].set_title("Frame 2")
    axs[1].axis('off')

    axs[2].imshow(true_frame.permute(1, 2, 0))
    axs[2].set_title("True Middle Frame")
    axs[2].axis('off')

    axs[3].imshow(predicted_frame.permute(1, 2, 0))
    axs[3].set_title("Predicted Middle Frame")
    axs[3].axis('off')

    frame_avg = (frame1 + frame2) / 2
    axs[4].imshow(frame_avg.permute(1, 2, 0))
    axs[4].set_title("Simple Average Frames")
    axs[4].axis('off')

    plt.show()

with torch.no_grad():  # Disable gradient computation for testing
    for i, ((frame1, frame2), target_frame) in enumerate(test_loader):
        outputs = model(frame1, frame2)
        plot_frames(frame1.squeeze(0), frame2.squeeze(0), target_frame.squeeze(0), outputs.squeeze(0), title=f"Test Sample {i+1}")

from sklearn.metrics import mean_squared_error

mse_scores = []
simple_average_mse_scores = []

with torch.no_grad():
    for (frame1, frame2), target_frame in test_loader:
        outputs = model(frame1, frame2)

        # Flatten the tensors to 1D arrays
        target_flat = target_frame.view(-1).numpy()
        output_flat = outputs.view(-1).numpy()

        mse = mean_squared_error(target_flat, output_flat)
        mse_scores.append(mse)
        
        # Calculate MSE for simple average of frames
        simple_average_frame = (frame1 + frame2) / 2
        simple_average_flat = simple_average_frame.view(-1).numpy()
        simple_average_mse = mean_squared_error(target_flat, simple_average_flat)
        simple_average_mse_scores.append(simple_average_mse)

average_mse = sum(mse_scores) / len(mse_scores)
print("Average MSE on Test Set:", average_mse)

average_simple_average_mse = sum(simple_average_mse_scores) / len(simple_average_mse_scores)
print("Average MSE on Test Set (Simple Average of Frames):", average_simple_average_mse)

Here were some of the outputs when we ran this:

![title](test1/1.png)
![title](test1/2.png)
![title](test1/3.png)
![title](test1/4.png)
![title](test1/5.png)

`Average MSE on Test Set: 0.0008857935173742737`

`Average MSE on Test Set (Simple Average of Frames): 0.3296718009393223`

In [None]:
test_url2 = 'https://www.youtube.com/watch?v=tpzkwouQ6XE'
test_array2 = torch.from_numpy(yt_to_data(test_url2))

test_input_frames2, test_target_frames2 = create_training_pairs(test_array2)

test_dataset2 = FrameDataset(test_input_frames2, test_target_frames2)
#test_dataset = FrameDataset(input_frames, target_frames)
test_loader2 = DataLoader(test_dataset2, batch_size=1, shuffle=False)
#test_loader = DataLoader(frame_dataset, batch_size=1, shuffle=True)

model.eval()

import matplotlib.pyplot as plt

def plot_frames(frame1, frame2, true_frame, predicted_frame, title=""):
    fig, axs = plt.subplots(1, 5, figsize=(20, 5))
    axs[0].imshow(frame1.permute(1, 2, 0))
    axs[0].set_title("Frame 1")
    axs[0].axis('off')

    axs[1].imshow(frame2.permute(1, 2, 0))
    axs[1].set_title("Frame 2")
    axs[1].axis('off')

    axs[2].imshow(true_frame.permute(1, 2, 0))
    axs[2].set_title("True Middle Frame")
    axs[2].axis('off')

    axs[3].imshow(predicted_frame.permute(1, 2, 0))
    axs[3].set_title("Predicted Middle Frame")
    axs[3].axis('off')

    frame_avg = (frame1 + frame2) / 2
    axs[4].imshow(frame_avg.permute(1, 2, 0))
    axs[4].set_title("Simple Average Frames")
    axs[4].axis('off')

    plt.show()

with torch.no_grad():  # Disable gradient computation for testing
    for i, ((frame1, frame2), target_frame) in enumerate(test_loader2):
        outputs = model(frame1, frame2)
        plot_frames(frame1.squeeze(0), frame2.squeeze(0), target_frame.squeeze(0), outputs.squeeze(0), title=f"Test Sample {i+1}")

from sklearn.metrics import mean_squared_error

mse_scores = []
simple_average_mse_scores = []

with torch.no_grad():
    for (frame1, frame2), target_frame in test_loader2:
        outputs = model(frame1, frame2)

        # Flatten the tensors to 1D arrays
        target_flat = target_frame.view(-1).numpy()
        output_flat = outputs.view(-1).numpy()

        mse = mean_squared_error(target_flat, output_flat)
        mse_scores.append(mse)
        
        # Calculate MSE for simple average of frames
        simple_average_frame = (frame1 + frame2) / 2
        simple_average_flat = simple_average_frame.view(-1).numpy()
        simple_average_mse = mean_squared_error(target_flat, simple_average_flat)
        simple_average_mse_scores.append(simple_average_mse)

average_mse = sum(mse_scores) / len(mse_scores)
print("Average MSE on Test Set:", average_mse)

average_simple_average_mse = sum(simple_average_mse_scores) / len(simple_average_mse_scores)
print("Average MSE on Test Set (Simple Average of Frames):", average_simple_average_mse)

Here were some of the outputs when we ran this:

![title](test2/1.png)
![title](test2/2.png)
![title](test2/3.png)
![title](test2/4.png)
![title](test2/5.png)

`Average MSE on Test Set: 0.0165942580395833`

`Average MSE on Test Set (Simple Average of Frames): 0.13649153818886325`

As you can tell, the color looks washed out. This is likely because we only got to run 5 epochs with it. The image would likely look nicer under, say, 10 or 20 epochs

In [None]:
test_url3 = 'https://www.youtube.com/watch?v=ZKTGeS3yuIY'
test_array3 = torch.from_numpy(yt_to_data(test_url3))

test_input_frames3, test_target_frames3 = create_training_pairs(test_array3)

test_dataset3 = FrameDataset(test_input_frames3, test_target_frames3)
#test_dataset = FrameDataset(input_frames, target_frames)
test_loader3 = DataLoader(test_dataset3, batch_size=1, shuffle=False)
#test_loader = DataLoader(frame_dataset, batch_size=1, shuffle=True)

model.eval()

import matplotlib.pyplot as plt

def plot_frames(frame1, frame2, true_frame, predicted_frame, title=""):
    fig, axs = plt.subplots(1, 5, figsize=(20, 5))
    axs[0].imshow(frame1.permute(1, 2, 0))
    axs[0].set_title("Frame 1")
    axs[0].axis('off')

    axs[1].imshow(frame2.permute(1, 2, 0))
    axs[1].set_title("Frame 2")
    axs[1].axis('off')

    axs[2].imshow(true_frame.permute(1, 2, 0))
    axs[2].set_title("True Middle Frame")
    axs[2].axis('off')

    axs[3].imshow(predicted_frame.permute(1, 2, 0))
    axs[3].set_title("Predicted Middle Frame")
    axs[3].axis('off')

    frame_avg = (frame1 + frame2) / 2
    axs[4].imshow(frame_avg.permute(1, 2, 0))
    axs[4].set_title("Simple Average Frames")
    axs[4].axis('off')

    plt.show()

with torch.no_grad():  # Disable gradient computation for testing
    for i, ((frame1, frame2), target_frame) in enumerate(test_loader3):
        outputs = model(frame1, frame2)
        plot_frames(frame1.squeeze(0), frame2.squeeze(0), target_frame.squeeze(0), outputs.squeeze(0), title=f"Test Sample {i+1}")

from sklearn.metrics import mean_squared_error

mse_scores = []
simple_average_mse_scores = []

with torch.no_grad():
    for (frame1, frame2), target_frame in test_loader3:
        outputs = model(frame1, frame2)

        # Flatten the tensors to 1D arrays
        target_flat = target_frame.view(-1).numpy()
        output_flat = outputs.view(-1).numpy()

        mse = mean_squared_error(target_flat, output_flat)
        mse_scores.append(mse)
        
        # Calculate MSE for simple average of frames
        simple_average_frame = (frame1 + frame2) / 2
        simple_average_flat = simple_average_frame.view(-1).numpy()
        simple_average_mse = mean_squared_error(target_flat, simple_average_flat)
        simple_average_mse_scores.append(simple_average_mse)

average_mse = sum(mse_scores) / len(mse_scores)
print("Average MSE on Test Set:", average_mse)

average_simple_average_mse = sum(simple_average_mse_scores) / len(simple_average_mse_scores)
print("Average MSE on Test Set (Simple Average of Frames):", average_simple_average_mse)

Here were some of the outputs when we ran this:

![title](test3/1.png)
![title](test3/2.png)
![title](test3/3.png)
![title](test3/4.png)
![title](test3/5.png)

`Average MSE on Test Set: 0.007915535417450131`

`Average MSE on Test Set (Simple Average of Frames): 0.1258860845993098`