# Imports

In [1]:
from random_seed import seed_everything, RANDOM_SEED
seed_everything(RANDOM_SEED)

In [91]:
import hpsv2
import os
import optuna
import gc
import pickle

from PIL import Image
import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import mean_absolute_error, mean_squared_error, accuracy_score, roc_auc_score
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

import torch
import lpips
from torchvision.transforms import ToTensor
from transformers import CLIPProcessor, CLIPModel
from transformers import AutoProcessor, AutoModel
from torchvision.models.optical_flow import Raft_Large_Weights, raft_large

from skimage.metrics import structural_similarity as ssim
from catboost import CatBoostRegressor, Pool
import matplotlib.pyplot as plt

# Preparation

In [3]:
df = pd.read_excel('data.xlsx', index_col=0)

frame_rate = 1
video_frames_folder = "videos_frames"

prompts = df['prompt'].values.tolist()

video_paths = []
for video_name in df['video_name'].values:
    video_name_without_ext = ''.join(video_name.split('.')[:-1])
    video_full_path = os.path.join(video_frames_folder, video_name_without_ext)
    video_full_path_frame_rate = os.path.join(video_full_path, f'frame_rate_{frame_rate}')
    
    video_paths.append(video_full_path_frame_rate)
    
scores = np.array(df['score'].values.tolist())

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
to_tensor = ToTensor()

In [35]:
dct = {}

# Feature mining

## CLIP (prompt and frame similarity)

In [30]:
def calculate_clip_similarity(model, processor, prompt, frame):
    inputs = processor(text=[prompt], images=frame, return_tensors="pt", padding=True).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image.squeeze().item()
    return logits_per_image

### clip-vit-base-patch32

In [31]:
model_name = 'clip-vit-base-patch32'

In [32]:
model = CLIPModel.from_pretrained(f"openai/{model_name}").to(device).eval()
processor = CLIPProcessor.from_pretrained(f"openai/{model_name}")

In [None]:
dct[model_name] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    scores = []
    
    for frame in video_frames:
        result = calculate_clip_similarity(model, processor, prompt, frame)
        scores.append(result)
        
    dct[model_name][prompt] = scores

In [35]:
model.to('cpu')
del model
gc.collect()
torch.cuda.empty_cache() 

### clip-vit-large-patch14

In [47]:
model_name = 'clip-vit-large-patch14'

In [None]:
model = CLIPModel.from_pretrained(f"openai/{model_name}").to(device).eval()
processor = CLIPProcessor.from_pretrained(f"openai/{model_name}")

In [None]:
dct[model_name] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    scores = []
    
    for frame in video_frames:
        result = calculate_clip_similarity(model, processor, prompt, frame)
        scores.append(result)
        
    dct[model_name][prompt] = scores

In [44]:
model.to('cpu')
del model
gc.collect()
torch.cuda.empty_cache() 

## SigLIP (prompt and frame similarity)

In [53]:
def calculate_siglip_similarity(model, processor, prompt, frame):
    inputs = processor(text=[prompt], images=frame, padding="max_length", return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image.squeeze().item()
    return logits_per_image

### siglip-so400m-patch14-384

In [54]:
model_name = 'siglip-so400m-patch14-384'

In [55]:
model = AutoModel.from_pretrained(f"google/{model_name}").to(device).eval()
processor = AutoProcessor.from_pretrained(f"google/{model_name}")

In [None]:
dct[model_name] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    scores = []
    
    for frame in video_frames:
        result = calculate_siglip_similarity(model, processor, prompt, frame)
        scores.append(result)
        
    dct[model_name][prompt] = scores

In [57]:
model.to('cpu')
del model
gc.collect()
torch.cuda.empty_cache() 

### siglip-large-patch16-384

In [59]:
model_name = 'siglip-large-patch16-384'

In [None]:
model = AutoModel.from_pretrained(f"google/{model_name}").to(device).eval()
processor = AutoProcessor.from_pretrained(f"google/{model_name}")

In [None]:
dct[model_name] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    scores = []
    
    for frame in video_frames:
        result = calculate_siglip_similarity(model, processor, prompt, frame)
        scores.append(result)
        
    dct[model_name][prompt] = scores

In [None]:
model.to('cpu')
del model
gc.collect()
torch.cuda.empty_cache() 

## Jina-CLIP (prompt and frame similarity)

In [41]:
def calculate_jinaclip_similarity(model, prompt, frame):
    with torch.no_grad():
        text_embedding = model.encode_text(prompt, task='retrieval.query', truncate_dim=512)
        image_embedding = model.encode_image(frame, truncate_dim=512)

    score = text_embedding @ image_embedding.T
    return score

In [23]:
model_name = 'jina-clip-v2'

In [44]:
model = AutoModel.from_pretrained(f'jinaai/{model_name}', trust_remote_code=True).to(device).eval()

In [None]:
dct[model_name] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    scores = []
    
    for frame in video_frames:
        result = calculate_jinaclip_similarity(model, prompt, frame)
        scores.append(result)
        
    dct[model_name][prompt] = scores

In [None]:
model.to('cpu')
del model
gc.collect()
torch.cuda.empty_cache() 

## HPS (prompt and frame similarity)

In [None]:
dct['hps'] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    
    result = np.array(hpsv2.score(video_frames, prompt, hps_version="v2.1"))
    
    dct['hps'][prompt] = result

## LPIPS (frames comparison)

In [None]:
dct['lpips'] = {}
lpips_model = lpips.LPIPS(net="alex").to(device)

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    print(i)
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg") for i in range(len(os.listdir(video_path)))]
    tensor_frames = [to_tensor(video_frame.convert("RGB")).unsqueeze(0).to(device) for video_frame in video_frames]
    
    lpips_distances = []
    for j in range(len(tensor_frames) - 1):
        frame1 = tensor_frames[j]
        frame2 = tensor_frames[j + 1]
        
        distance = lpips_model(frame1, frame2).item()
        lpips_distances.append(distance)
    
    dct['lpips'][prompt] = np.array(lpips_distances)

## SSIM (frames comparison)

In [None]:
dct['ssim'] = {}

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg").convert("L") for i in range(len(os.listdir(video_path)))]
    print(i)
    ssim_scores = []
    for j in range(len(video_frames) - 1):
        frame1 = np.array(video_frames[j])
        frame2 = np.array(video_frames[j + 1])
        
        score, _ = ssim(frame1, frame2, full=True)
        ssim_scores.append(score)
    
    dct['ssim'][prompt] = np.array(ssim_scores)

## Optical Flow (frames comparison)

In [118]:
dct['optical_flow'] = {}
weights = Raft_Large_Weights.DEFAULT
model = raft_large(weights=weights, progress=False).to(device)
transform = weights.transforms()

def get_features(x):
    return torch.tensor((
        x.sum(),
        x.median(),
        x.mean(),
        x.std(),
        x.min(),
        x.max()
    ))

for i, (prompt, video_path) in enumerate(zip(prompts, video_paths)):
    video_frames = [Image.open(f"{video_path}/frame_{i:04d}.jpg").convert("RGB") for i in range(len(os.listdir(video_path)))]
    
    optical_flow_features = []
    
    for j in range(len(video_frames) - 1):
        frame1_tensor = to_tensor(video_frames[j]).unsqueeze(0).to(device)
        frame2_tensor = to_tensor(video_frames[j + 1]).unsqueeze(0).to(device)
        
        frame_1, frame_2 = transform(frame1_tensor, frame2_tensor)
        
        with torch.no_grad():
            flow = model(frame_1, frame_2)[-1]
        
        flow = flow.squeeze(0)
        flow_features = get_features(flow)
        
        magnitude = torch.sqrt(flow[0]**2 + flow[1]**2)
        magnitude_features = get_features(magnitude)
        
        angle = torch.arctan2(flow[0], flow[1])
        angle_features = get_features(angle)
        
        optical_flow_features.append(torch.cat([flow_features, magnitude_features, angle_features]))

    dct['optical_flow'][prompt] = np.array(optical_flow_features)

# Feature Engineering

In [None]:
with open('dict_with_features.pickle', 'wb') as f:
    pickle.dump(dct, f)

In [87]:
matrix = []

for key, value in dct['hps'].items():
    matrix.append(len(value))

matrix = np.array(matrix)

train, test = train_test_split(
    matrix,
    test_size=0.2, 
    random_state=RANDOM_SEED
)

In [93]:
np.median(train) # среднее значение на train количества фреймов в видео

121.0

In [10]:
num_frames = 100

In [11]:
def calculate_ema(values, alpha):
    ema_pred = values[0]
    ema = None
    
    for value in values:
        ema = alpha * value + (1 - alpha) * ema_pred
        ema_pred = ema

    return ema

def get_features(values):
    ema_features = [
        calculate_ema(values, alpha=0.1),
        calculate_ema(values, alpha=0.3),
        calculate_ema(values, alpha=0.5),
        calculate_ema(values, alpha=0.7),
        calculate_ema(values, alpha=0.9)
    ]
    
    features = [
        np.sum(values), 
        np.mean(values), 
        np.median(values),
        np.std(values),
        np.min(values),
        np.max(values),
        np.quantile(values, 0.25),
        np.quantile(values, 0.75),
        np.quantile(values, 0.75) - np.quantile(values, 0.25),
        np.min(values) / np.max(values)
    ]
    
    return ema_features + features

In [72]:
feature_dct = {}

for prompt in prompts:
    
    video_features = []
    
    for key in sorted(dct.keys()):
        if key != 'optical_flow':
            values = dct[key][prompt]           
            
            new_features = get_features(values) # новые фичи по значениям ряда
            
            frames_values = values[:num_frames]
            
            if len(frames_values) < num_frames:
                tmp = np.zeros(num_frames)
                tmp[:len(frames_values)] = frames_values
                frames_values = tmp
            
            all_features = np.concatenate((new_features, frames_values))
        else:
            values = dct[key][prompt]
            
            new_features_lst = [get_features(values[:, i]) for i in range(values.shape[1])]
            all_features = [feature for features_lst in new_features_lst for feature in features_lst]
            
        video_features += list(all_features)

    feature_dct[prompt] = video_features

In [73]:
num_features = len(feature_dct[prompt])
num_objs = len(df)

matrix = np.zeros((num_objs, num_features + 1))

In [74]:
for i, (prompt, score) in enumerate(zip(prompts, scores)):
    features = np.array(feature_dct[prompt])
    
    matrix[i, :len(features)] = features
    matrix[i, matrix.shape[1] - 1] = score

In [75]:
feature_lst = []

aggregations = [
        'ema_01',
        'ema_03',
        'ema_05',
        'ema_07',
        'ema_09',
        'sum',
        'mean',
        'median',
        'std',
        'min',
        'max',
        'q_25',
        'q_75',
        'IQR',
        'min/max'
    ]

optical_flow_aggregations = [
    'sum',
    'median',
    'mean',
    'std',
    'min',
    'max',
]

for key in sorted(dct.keys()):
    if key == 'optical_flow':
        for upper_aggregation in aggregations:
            for type_metric in ['flow', 'magnitude', 'angel']:
                for aggregation in optical_flow_aggregations:
                    feature_lst.append(f'{key}_{upper_aggregation}_{type_metric}_{aggregation}')
    else:
        for aggregation in aggregations:
            feature_lst.append(f'{key}_{aggregation}')
    
        for num_frame in range(num_frames):
            feature_lst.append(f'{key}_{num_frame}')  

In [85]:
feature_df = pd.DataFrame(data=matrix, columns=feature_lst + ['target'])
feature_df['prompt'] = prompts

In [86]:
feature_df.to_csv('feature_df.csv', index=False)