# Installing Packaging

In [2]:
! pip install ftfy regex tqdm
! pip install git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /private/var/folders/zx/46y7zv8x5gd6xfp0s16kr0w00000gp/T/pip-req-build-mur_dlbz
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /private/var/folders/zx/46y7zv8x5gd6xfp0s16kr0w00000gp/T/pip-req-build-mur_dlbz
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25ldone


In [42]:
from pkg_resources import packaging
from collections import OrderedDict
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import IPython.display
from os import system
from PIL import Image
import urllib.request
import numpy as np
import skimage
import pickle
import torch
import torch
import time
import clip
import os
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

print("Torch version:", torch.__version__)
print("CLIP Models:",clip.available_models())

Torch version: 2.4.1
CLIP Models: ['RN50', 'RN101', 'RN50x4', 'RN50x16', 'RN50x64', 'ViT-B/32', 'ViT-B/16', 'ViT-L/14', 'ViT-L/14@336px']


# Loading the model

`clip.available_models()` will list the names of available CLIP models.

In [43]:
model, preprocess = clip.load("ViT-B/32")
input_resolution = model.visual.input_resolution
context_length = model.context_length
vocab_size = model.vocab_size

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in model.parameters()]):,}")
print("Input resolution:", input_resolution)
print("Context length:", context_length)
print("Vocab size:", vocab_size)

Model parameters: 151,277,313
Input resolution: 224
Context length: 77
Vocab size: 49408


## Building features

We normalize the images, tokenize each text input, and run the forward pass of the model to get the image and text features.

In [44]:
def loadImage(url, rows=5, cols=5):
  original_images = []
  urllib.request.urlretrieve(
    url,
    "img.gif")
  im = Image.open("img.gif")
  try:
    while 1:
      im.seek(im.tell()+1)
      original_images.append(im.convert("RGB"))
  except EOFError:
    pass

  print(len(original_images))

  processed_images = []

  for image in original_images:
    processed_images.append(preprocess(image))

  plt.figure(figsize=(20, 10))
  start = 0
  for i in range(rows*cols):
    plt.subplot(rows, cols, i+1)
    plt.imshow(original_images[start+i])
    plt.axis('off')
    #plt.title(str(start+i))
  plt.tight_layout()
  plt.subplots_adjust(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1)

  return original_images, processed_images



In [46]:
from IPython.display import clear_output
from PIL import Image, ImageSequence
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import numpy as np
import shutil
import cv2
import os

def split_video_to_mp4(video_path, output_dir, window_size=5):
    if output_dir in os.listdir('.'):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir, exist_ok=True)

    if video_path.endswith('.gif'):
        gif = Image.open(video_path)
        frames = [frame.copy() for frame in ImageSequence.Iterator(gif)]
        total_frames = len(frames)

        width, height = frames[0].size
        duration = gif.info['duration']
        fps = 1000 / duration
    else:
        video = cv2.VideoCapture(video_path)
        fps = video.get(cv2.CAP_PROP_FPS)
        total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        frames = []
        success, frame = video.read()
        while success:
            frames.append(frame)
            success, frame = video.read()
        video.release()

    for i in range(total_frames - window_size + 1):
        output_path = os.path.join(output_dir, f'{i + 1}.mp4')
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        for frame in frames[i:i + window_size]:
            if isinstance(frame, Image.Image):
                frame_rgb = frame.convert('RGB')
                frame_array = np.array(frame_rgb)
                frame_bgr = cv2.cvtColor(frame_array, cv2.COLOR_RGB2BGR)
            else:
                frame_bgr = frame
            out.write(frame_bgr)

        out.release()

def load_basketball(basketball_path, size):
    basketball = Image.open(basketball_path)
    basketball = basketball.resize((size, size), Image.LANCZOS)
    return basketball

def rotate_basketball(basketball):
    random_angle = np.random.randint(0, 360)
    return basketball.rotate(random_angle, expand=True)

def add_basketball_to_frame(frame, basketball):
    frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    basketball_rotated = rotate_basketball(basketball)

    frame_width, frame_height = frame_pil.size
    basketball_width, basketball_height = basketball_rotated.size

    max_x = frame_width - basketball_width
    max_y = frame_height - basketball_height
    rand_x = np.random.randint(0, max_x)
    rand_y = np.random.randint(0, max_y)

    frame_pil.paste(basketball_rotated, (rand_x, rand_y), basketball_rotated)
    return cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)

def add_noise(input_video_path, output_video_path, basketball_path, basketball_size):
    basketball = load_basketball(basketball_path, basketball_size)
    cap = cv2.VideoCapture(input_video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break

        frame_with_basketball = add_basketball_to_frame(frame, basketball)
        out.write(frame_with_basketball)

    cap.release()
    out.release()

In [32]:
if 'aug1' in os.listdir('.'):
    shutil.rmtree('aug1')
if 'aug2' in os.listdir('.'):
    shutil.rmtree('aug2')

def get_dim(file_path):
    vid = cv2.VideoCapture(file_path)
    height = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)
    width = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
    return height, width

backflip_files = list(filter(lambda x: x != 'backflip/.DS_Store',[os.path.join('backflip', x) for x in os.listdir('backflip')]))

os.mkdir('aug1')
os.mkdir('aug2')
for backflip in tqdm(backflip_files):
    height, width = get_dim(backflip)
    add_noise(backflip, os.path.join('aug1', backflip.split('/')[1]), '../../../Storage/cruise.png', int(min(height, width) / 3))

for aug1 in tqdm([os.path.join('aug1',x) for x in os.listdir('aug1')]):
    height, width = get_dim(aug1)
    add_noise(aug1, os.path.join('aug2', aug1.split('/')[1]), '../../../Storage/cruise.png', int(min(height, width) / 3))

shutil.rmtree('aug1')

  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

In [12]:
split_video_to_mp4('augment/6.mp4', output_dir='output')

In [47]:
cached_images = {}

def loadGifLocal(local_path, rows=5, cols=5):
    if local_path in cached_images: return cached_images[local_path]
    original_images = []

    im = Image.open(local_path)
    
    try:
        im.seek(im.tell())
        original_images.append(im.convert("RGB"))
        while 1:
            im.seek(im.tell()+1)
            original_images.append(im.convert("RGB"))
    except EOFError:
        pass

    processed_images = []

    for image in original_images:
        processed_images.append(preprocess(image))

    cached_images[local_path] = (original_images, processed_images)
    return original_images, processed_images

def loadMP4Local(local_path, rows=5, cols=5):
    video = cv2.VideoCapture(local_path)
    fps = video.get(cv2.CAP_PROP_FPS)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    frames = []
    success, frame = video.read()
    while success:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(Image.fromarray(frame))
        success, frame = video.read()
    video.release()

    original_images = frames

    processed_images = []

    for image in original_images:
        processed_images.append(preprocess(image))

    return original_images, processed_images

def findMatch(original_images, processed_images, texts, show_output=False):
    t1 = time.perf_counter()
    image_input = torch.tensor(np.stack(processed_images))
    text_tokens = clip.tokenize(["This is " + desc for desc in texts])

    with torch.no_grad():
        image_features = model.encode_image(image_input).float()
        text_features = model.encode_text(text_tokens).float()

    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    similarity = text_features.cpu().numpy() @ image_features.cpu().numpy().T
    t2 = time.perf_counter()
    
    if show_output:
        plt.figure(figsize=(18, 6 * len(texts)))
    
    results = []
    for i, text in enumerate(texts):
        if show_output:
            plt.subplot(len(texts), 2, 1 + 2 * i)
            plt.plot(range(len(similarity[i])), similarity[i])
        results.append(np.argmax(similarity[i]))
        if show_output:
            plt.subplot(len(texts), 2, 2 + 2 * i)
            plt.imshow(original_images[np.argmax(similarity[i])])
            plt.title(text, fontdict={'fontsize': 40})
    if show_output:
        plt.tight_layout()
    return results

# BABY Use this code (ignore everything else)

In [34]:
import cv2
from PIL import Image, ImageTk
import tkinter as tk

def showFrames(path, highlight = False):
    video_path = path
    cap = cv2.VideoCapture(video_path)
    
    frames = []
    success, frame = cap.read()
    while success:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(frame)
        frames.append(img)
        success, frame = cap.read()
    
    cap.release()

    if not frames:
        print("No frames found in the video.")
        return
    
    root = tk.Tk()
    root.title(path)
    
    def display_frame(index):
        frame_label.config(text=f"Frame {index}")
        img = ImageTk.PhotoImage(frames[index])
        frame_canvas.create_image(0, 0, anchor=tk.NW, image=img)
        frame_canvas.image = img
    
    frame_label = tk.Label(root, text="Frame 0", font=('Hack', 14), fg="red" if highlight else "black")
    frame_label.pack()
    
    frame_canvas = tk.Canvas(root, width=frames[0].width, height=frames[0].height)
    frame_canvas.pack()
    
    display_frame(0)
    
    def next_frame(event):
        current_frame = int(frame_label.cget("text").split()[1])
        next_index = (current_frame + 1) % len(frames)
        display_frame(next_index)

    def doubleSkip(event):
        next_frame(event)
        next_frame(event)
    
    def prev_frame(event):
        current_frame = int(frame_label.cget("text").split()[1])
        next_index = (current_frame - 1) % len(frames)
        display_frame(next_index)
    def doublePrev(event):
        prev_frame(event)
        prev_frame(event)
    
    root.bind('<Right>', next_frame)
    root.bind('<Left>', prev_frame)
    root.bind('r', doubleSkip)
    root.bind('l', doublePrev)
    
    root.mainloop()

In [36]:
for i in range(1, 51): 
    showFrames(f'augment/{i}.mp4', highlight= (i in [2,3,5,24,30,38,40,46,48])) # Show frames for a video

--------

In [17]:
orig_imgs, proc_imgs = loadMP4Local('194.mp4')
result = findMatch(orig_imgs, proc_imgs, ['A person performs a backflip.'])[0]
print(result)

55


In [None]:
from IPython.display import clear_output
import pickle

clip_pred = []

backflip_files = os.listdir('augment')
backflip_files.sort(key = lambda x: int(x.split('.')[0]))
backflip_files = [os.path.join('augment', x) for x in backflip_files]

pbar = tqdm(backflip_files)
for video in pbar:
    pbar.set_description(video.split('/')[1])
    orig_imgs, proc_imgs = loadMP4Local(video)
    result = findMatch(orig_imgs, proc_imgs, ['A person performs a backflip.'])[0]
    
    clip_pred.append(result)
    print(str(result), end = " | ")

with open('predictions/clip_pred.pkl', 'wb') as file:
    pickle.dump([x.item() for x in clip_pred], file)

print("Done!")

# Loading Data from `photo_data.csv`

In [None]:
import pandas as pd
import math
photo_data_csv = pd.read_csv("photo_data.csv")
photo_data_csv.head()

In [None]:
all_labeled_data = photo_data_csv.iloc[:, :3]

unfiltered_data_triples = list(all_labeled_data.itertuples(index=False, name=None))

labeled_data_float = list(filter(lambda x: not math.isnan(x[0]), unfiltered_data_triples))

labeled_data = list(map(lambda x: (f'gifs/{int(x[0])}.gif', x[1], int(x[2])), labeled_data_float))

X = labeled_data

In [None]:
X[:5]

In [None]:
from IPython.display import clear_output
from tqdm import tqdm

# y_pred = []
X_lengths = []

X_frames = []

for gif_path, phrase, frame in tqdm(labeled_data):
    orig_imgs, proc_imgs = loadImageLocal(gif_path)
    X_lengths.append(len(orig_imgs))
    curr_frames = []
    for f_index, frame_pic in enumerate(orig_imgs):
        append = ""
        if f_index < frame:
            append = " before" * min(50, frame - f_index)
        elif f_index > frame:
            append = " after" * min(50, f_index - frame)
        else:
            append = ""
        after = ""
        if f_index == frame:
            after = ' now'
        curr_frames.append((frame_pic, append + phrase + after))
    X_frames.append(curr_frames)
    
    # prediction = findMatch(orig_imgs, proc_imgs, [phrase])[0]
    # print(f"Prediction for {gif_path}: Frame {prediction} vs. Actual: {frame}")
    # print("For the phrase '" + phrase + "'")
    # clear_output(wait=True)
    # y_pred.append(prediction)

# y_pred = np.array(y_pred)
X_lengths = np.array(X_lengths)

In [None]:
y_error = np.divide(np.abs(y_pred - photo_data_csv['Correct Frame'][0:25]), X_lengths) * 100
plt.boxplot(y_error)
plt.yticks(np.arange(0, 100, 10), np.char.add(np.arange(0, 100, 10).astype("str"), '%'))
plt.title("Error in % off")
plt.show()

In [None]:
y_error = np.divide(np.abs(y_pred - photo_data_csv['Correct Frame'][0:25]), X_lengths) * 100
plt.boxplot(y_error)
plt.yticks(np.arange(0, 100, 10), np.char.add(np.arange(0, 100, 10).astype("str"), '%'))
plt.title("Error in % off")
plt.show()

In [None]:
y_error = np.divide(np.abs(y_pred - photo_data_csv['Correct Frame'][0:272]), X_lengths) * 100
plt.boxplot(y_error)
plt.yticks(np.arange(0, 100, 10), np.char.add(np.arange(0, 100, 10).astype("str"), '%'))
plt.title("Error in % off")
plt.show()

In [None]:
y_error = np.divide(np.abs(y_pred - photo_data_csv['Correct Frame'][0:272]), X_lengths) * 100
plt.boxplot(y_error)
plt.yticks(np.arange(0, 100, 10), np.char.add(np.arange(0, 100, 10).astype("str"), '%'))
plt.title("Error in % off")
plt.show()

In [None]:
len(X_frames)

In [None]:
X_frames[0]

# Actual Training

In [None]:
pip install transformers

In [None]:
model.load_state_dict(torch.load('saved_model.pth', weights_only=True, map_location=torch.device('cpu')))

In [None]:
# Put the model in evaluation mode
model.eval()

# Initialize a list to store results
correct_predictions = 0
total_samples = 0

# Loop through the test data
with torch.no_grad():  # Disable gradient calculation for efficiency
    for batch in tqdm(test_dataloader, total=len(test_dataloader)):
        images, texts = batch
        print(images[0])
        images = images.to(device)
        texts = texts.to(device)

        # Forward pass
        logits_per_image, logits_per_text = model(images, texts)

        # Calculate accuracy or other metrics
        # Assuming the task is image-text retrieval, where the correct text for an image is its corresponding index
        ground_truth = torch.arange(len(images), dtype=torch.long, device=device)
        image_predictions = logits_per_image.argmax(dim=1)
        text_predictions = logits_per_text.argmax(dim=1)

        print(image_predictions)
        # print(text_predictions)
        print(ground_truth)
        correct_predictions += (image_predictions == ground_truth).sum().item()
        total_samples += len(images)

# Calculate final accuracy
accuracy = correct_predictions / total_samples
print(f"Test Accuracy: {accuracy:.4f}")


# Start

In [None]:
import json
from PIL import Image

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import clip
from transformers import CLIPProcessor, CLIPModel

In [None]:
# Load the CLIP model and processor
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [None]:
# Choose computation device
device = "cuda:0" if torch.cuda.is_available() else "cpu"

In [None]:
# Load pre-trained CLIP model
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)

In [None]:
# Define a custom dataset
class image_title_dataset():
    def __init__(self, list_image,list_txt):
        # Initialize image paths and corresponding texts
        self.image = list_image
        # Tokenize text using CLIP's tokenizer
        self.title  = clip.tokenize(list_txt)

    def __len__(self):
        return len(self.title)

    def __getitem__(self, idx):
        # Preprocess image using CLIP's preprocessing function
        image = preprocess(self.image[idx])
        title = self.title[idx]
        return image, title

In [None]:
train_cycle = 1  # Adjust this between 1 and 5

slice_length = len(X_frames) // 5

start_index = (train_cycle - 1) * slice_length
end_index = start_index + slice_length

X_test = X_frames[start_index:end_index]

X_train = X_frames[:start_index] + X_frames[end_index:]

print(f"Training data for train_cycle {train_cycle}: {len(X_train)} elements")
print(f"Testing data for train_cycle {train_cycle}: {len(X_test)} elements")


In [None]:
X_train_flatten = []
X_test_flatten = []
for GIF in X_train:
    for frame in GIF:
        X_train_flatten.append(frame)
for GIF in X_test:
    for frame in GIF:
        X_test_flatten.append(frame)

X_train = X_train_flatten
X_test = X_test_flatten

In [None]:
len(X_train)

In [None]:
def loadFromX(X):
    list_image = []
    list_txt = []
    for image, desc in X:
        img_path = image
        caption = desc
        list_image.append(img_path)
        list_txt.append(caption)
    return list_image, list_txt

In [None]:
list_image_train, list_txt_train = loadFromX(X_train)
dataset_train = image_title_dataset(list_image_train, list_txt_train)
train_dataloader = DataLoader(dataset_train, batch_size=100, shuffle=True) #Define your own dataloader

In [None]:
list_image_test, list_txt_test = loadFromX(X_test)
dataset_test = image_title_dataset(list_image_test, list_txt_test)
test_dataloader = DataLoader(dataset_test, batch_size=100, shuffle=True) #Define your own dataloader

In [None]:
# Function to convert model's parameters to FP32 format
def convert_models_to_fp32(model):
    for p in model.parameters():
        p.data = p.data.float()
        p.grad.data = p.grad.data.float()
if device == "cpu":
  model.float()

In [None]:
# Prepare the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5,betas=(0.9,0.98),eps=1e-6,weight_decay=0.2) # the lr is smaller, more safe for fine tuning to new dataset


# Specify the loss function
loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()

In [None]:
num_epochs = 30
for epoch in range(num_epochs):
    pbar = tqdm(train_dataloader, total=len(train_dataloader))
    for batch in pbar:
        optimizer.zero_grad()

        images,texts = batch

        images= images.to(device)
        texts = texts.to(device)

        # Forward pass
        logits_per_image, logits_per_text = model(images, texts)

        # Compute loss
        ground_truth = torch.arange(len(images),dtype=torch.long,device=device)
        total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2

        # Backward pass
        total_loss.backward()
        if device == "cpu":
            optimizer.step()
        else :
            convert_models_to_fp32(model)
            optimizer.step()
            clip.model.convert_weights(model)

        pbar.set_description(f"Epoch {epoch}/{num_epochs}, Loss: {total_loss.item():.4f}")

In [None]:
!pip install --upgrade decorator==4.4.2
!pip install ffmpeg --upgrade
!pip install moviepy

In [2]:
from moviepy.editor import VideoFileClip, AudioFileClip
import sys

def scale_video(input_file, output_file, target_duration):
    # Load the video file
    video = VideoFileClip(input_file)
    
    # Get the original duration of the video in seconds
    original_duration = video.duration
    
    # Calculate the scaling factor
    if original_duration > target_duration:
        scale_factor = target_duration / original_duration
    else:
        scale_factor = 1  # No scaling needed if the video is shorter than the target duration

    # Apply the speed-up effect to the video
    scaled_video = video.fx(lambda clip: clip.speedx(factor=1/scale_factor))

    new_audio = AudioFileClip("../../Desktop/tauchip.wav")

    # Set the new audio to the video clip
    video_with_new_audio = scaled_video.set_audio(new_audio)
    
    # Write the result to an MP4 file
    video_with_new_audio.write_videofile("output.mp4", codec="libx264", audio_codec="aac")
scale_video('../../Desktop/tauplayback.mov', '../../Desktop/tauscaled.mp4', 6*60 + 28)

Moviepy - Building video output.mp4.
MoviePy - Writing audio in outputTEMP_MPY_wvf_snd.mp4


                                                                                                                                                                                                                                                                                                                                         

MoviePy - Done.
Moviepy - Writing video output.mp4



                                                                                                                                                                                                                                                                                                                                         

Moviepy - Done !
Moviepy - video ready output.mp4


In [None]:
from moviepy.editor import VideoFileClip, AudioFileClip

# Load the video file and the WAV audio file
video_clip = VideoFileClip("../../Desktop/tauscaled.mp4")
new_audio = AudioFileClip("../../Desktop/tauchip.wav")

# Set the new audio to the video clip
video_with_new_audio = video_clip.set_audio(new_audio)

# Write the result to an MP4 file
video_with_new_audio.write_videofile("output.mp4", codec="libx264", audio_codec="aac")

print("Video and audio combined successfully into output.mp4!")
