In [1]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import tensorflow as tf

import csv
import numpy as np
import decord
import torch
import os, time
import pandas as pd
import random

from gluoncv.torch.utils.model_utils import download
from gluoncv.torch.data.transforms.videotransforms import video_transforms, volume_transforms
from gluoncv.torch.engine.config import get_cfg_defaults
from gluoncv.torch.model_zoo import get_model
from tqdm.notebook import tqdm

ModuleNotFoundError: No module named 'sklearn'

In [None]:
map_path = 'action_map.csv'

action_dict = {}

with open(map_path, 'r', newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        id_value = int(row['id'])
        name_value = row['name']
        
        action_dict[id_value] = name_value

In [None]:
# Define a function to create the linear regression model
def create_linear_regression_model(num_frames):
    model = LinearRegression()
    return model

In [None]:
# Define a function to compute the predicted action class and confidence based on the frames and weights
def compute_predictions_linear_regression(target_data, num_classes, frames, weights, model):

    # Combine frames and weights into a single input matrix
    input_data = np.concatenate((frames, weights), axis=1)

    # Fit the linear regression model
    model.fit(input_data, target_data)  # target data is [class, confidence]

    # Predict the action class and confidence
    predicted_action_class = model.predict(input_data[:, :num_classes])
    predicted_confidence = model.predict(input_data[:, num_classes:])

    return predicted_action_class, predicted_confidence

In [None]:
def compute_loss(frames, weights, target_class, target_confidence, num_classes, model):
    # Compute the predicted action class and confidence based on the weights and frames
    predicted_class, predicted_confidence = compute_predictions_linear_regression([target_class, target_confidence], num_classes, frames, weights, model)

    # Compute the loss based on the predicted and target values
    loss = target_confidence - predicted_confidence
    if predicted_class != target_class:
        loss *= 100

    return loss

In [None]:
def optimize_on_video(frames, target_class, target_confidence, model):

    # Define the initial weights for frames
    initial_weights = [1] * len(frames)

    # Define the optimization algorithm
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    # Perform optimization
    weights = tf.Variable(initial_weights, trainable=True)

    for step in range(1000):
        with tf.GradientTape() as tape:
            loss = compute_loss(frames, weights, target_class, target_confidence, model)

        gradients = tape.gradient(loss, weights)
        optimizer.apply_gradients([(gradients, weights)])

        # Optional: Print the loss for monitoring
        if step % 100 == 0:
            print("Step:", step, "Loss:", loss.numpy())

    # Retrieve the optimized weights
    optimized_weights = weights.numpy()

    return optimized_weights

In [None]:
def get_frames(file_path, video_length):
    frame_id_list = [i for i in range(0, video_length, 2)]
    vr = decord.VideoReader(file_path)
    video_data = vr.get_batch(frame_id_list).asnumpy()
    crop_size = 224
    short_side_size = 256
    transform_fn = video_transforms.Compose([video_transforms.Resize(short_side_size, interpolation='bilinear'),
                                            video_transforms.CenterCrop(size=(crop_size, crop_size)),
                                            volume_transforms.ClipToTensor(),
                                            video_transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
    clip_input = transform_fn(video_data)

    return clip_input 

In [None]:
def f(video_input, model):
    with torch.no_grad():
            pred = model(torch.unsqueeze(video_input, dim=0)).numpy()
    probs = torch.nn.functional.softmax(torch.tensor(pred), dim=1).numpy()
    top_class = np.argmax(probs)
    confidence = np.max(probs) - np.min(probs)

    return top_class, confidence

In [None]:
num_classes = 5
num_videos = 5
video_length = 200 # ???????

frames_and_weights = [[]]

# Get 5 Random videos from collection
dataset_path = 'datasets\\kinetics400_5per\\train'

subfolders = sorted([subfolder for subfolder in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, subfolder))])
random.seed(2)
random_subfolders = random.sample(subfolders, num_classes)
subfolder = random_subfolders[0]

config_file = './i3d_resnet50_v1_kinetics400.yaml'
cfg = get_cfg_defaults()
cfg.merge_from_file(config_file)
f_model = get_model(cfg)
f_model.eval()

try:
    # Iterate over videos
    for index, subfolder in enumerate(random_subfolders):
        subfolder_path = os.path.join(dataset_path, subfolder)
        files = os.listdir(subfolder_path)
        clean_files = []
        for filename in files:
            if filename.endswith(".mp4"):
                clean_files.append(filename)
        random_files = random.sample(clean_files, num_videos)
        random_file = random_files[0]
        for random_file in random_files:
            file_path = os.path.join(subfolder_path, random_file)
            class_id = -1
            for key, value in action_dict.items():
                if value == subfolder:
                    class_id = key
                    break

            # create and train linear regression model
            g_model = create_linear_regression_model()
            
            # get weights for video
            frames = get_frames(file_path, video_length)
            target_class, target_confidence = f(frames, f_model)
            optimized_frame_weights = optimize_on_video(frames, target_class, target_confidence, g_model)

            # store frames and their weights
            frames_and_weights += [[frame, weight] for frame, weight in zip(frames, optimized_frame_weights)]
                
except Exception as e:
    print(e)