In [1]:
import pandas as pd
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from joblib import Parallel, delayed
from scipy.stats import skew
from sklearn.preprocessing import MinMaxScaler
    

In [2]:
train_df = pd.read_csv("./dataset/splits/train.csv", index_col='index')
test_df = pd.read_csv("./dataset/splits/test.csv", index_col='index')
val_df = pd.read_csv("./dataset/splits/validation.csv", index_col='index')
train_df.head()

Unnamed: 0_level_0,clip_name,clip_path,label,encoded_label
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,v_Diving_g03_c01.avi,./dataset/Diving/v_Diving_g03_c01.avi,Diving,0
1,v_Diving_g19_c03.avi,./dataset/Diving/v_Diving_g19_c03.avi,Diving,0
2,v_Diving_g03_c04.avi,./dataset/Diving/v_Diving_g03_c04.avi,Diving,0
3,v_Diving_g05_c04.avi,./dataset/Diving/v_Diving_g05_c04.avi,Diving,0
4,v_Diving_g15_c03.avi,./dataset/Diving/v_Diving_g15_c03.avi,Diving,0


In [3]:
CONFIG = {
    'frame_skip': 5,  # Process every 5th frame to speed up
    'resize_dim': (224, 224),
    'n_jobs': -1  # Use all CPU cores for feature extraction
}

In [4]:
class VideoExtractorFeature:
    def __init__(self, skip_frame:int =5):
        self.skip_frame = skip_frame
        self.scalar = MinMaxScaler()

    def _get_frame_color_features(self, frame):
        hsv_frame = cv.cvtColor(frame, cv.COLOR_BGR2HSV)
        rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)

        features = {}
        for i, column_name in enumerate(['h', 's', 'v']):
            channel = hsv_frame[:, :, i]

            mean = np.mean(channel)
            std = np.std(channel)

            if std < 1e-6:
                skewness = 0
            else:
                skewness = skew(channel.flatten())
                if np.isnan(skewness):
                    skewness = 0

            features[f'moments_mean_{column_name}'] = mean
            features[f'moments_std_{column_name}'] = std
            features[f'moments_skew_{column_name}'] = skewness

        avg_rgb  = np.mean(rgb_frame, axis=(0, 1))
        features['avg_color_r'] = avg_rgb[0]
        features['avg_color_g'] = avg_rgb[1]
        features['avg_color_b'] = avg_rgb[2]
        return features

    def _process_video(self, row: dict):
        video_id = row['index']
        video_path = row['clip_path']
        cap = cv.VideoCapture(video_path)
        features = []
        frame_cnt = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame_cnt % self.skip_frame == 0:
                frame = cv.resize(frame, CONFIG['resize_dim'], interpolation=cv.INTER_AREA)
                feat = self._get_frame_color_features(frame)
                feat['index'] = video_id
                features.append(feat)
            frame_cnt += 1

        cap.release()
        return features

    def _normalize_features(self, feature_df: pd.DataFrame, is_test: bool = False):
        feature_names = [col for col in feature_df.columns if col not in ['index', 'encoded_label']]
        feature_df[feature_names] = feature_df[feature_names].astype(float)
        if is_test:
            feature_df[feature_names] = self.scalar.transform(feature_df[feature_names])
        else:
            feature_df[feature_names] = self.scalar.fit_transform(feature_df[feature_names])
        return feature_df

    def _process_dataset(self, df: pd.DataFrame, is_test=False):
        rows = df.reset_index().to_dict('records')
        nested_results = Parallel(n_jobs=-1)(delayed(self._process_video)(row) for row in rows)

        flatten_results = [item for sub_list in nested_results for item in sub_list]
        feature_df = pd.DataFrame(flatten_results)
        return self._normalize_features(feature_df, is_test)

    def process_train_df(self, df: pd.DataFrame):
        process_df = self._process_dataset(df)
        merged_df = pd.merge(train_df, process_df, on='index', how='inner')
        output = merged_df['encoded_label']
        merged_df = merged_df.drop(columns=['index', 'clip_path', 'clip_name', 'label', 'encoded_label'])
        return merged_df, output

    def process_test_df(self, df: pd.DataFrame):
        process_df = self._process_dataset(df, True)
        merged_df = pd.merge(train_df, process_df, on='index', how='inner')
        merged_df = merged_df.drop(columns=['clip_path', 'clip_name', 'label'])
        return merged_df


In [5]:
extractor = VideoExtractorFeature()

In [6]:
X, Y = extractor.process_train_df(train_df)

In [7]:
M = extractor.process_test_df(val_df)

In [8]:
X.head()

Unnamed: 0,moments_mean_h,moments_std_h,moments_skew_h,moments_mean_s,moments_std_s,moments_skew_s,moments_mean_v,moments_std_v,moments_skew_v,avg_color_r,avg_color_g,avg_color_b
0,0.437377,0.703766,0.469759,0.184177,0.320461,0.443086,0.672033,0.708757,0.265971,0.621076,0.682887,0.697304
1,0.419381,0.691181,0.472115,0.173806,0.312256,0.452479,0.671253,0.7074,0.266933,0.624704,0.684147,0.697224
2,0.445359,0.637138,0.472924,0.174393,0.315061,0.467496,0.670058,0.709306,0.268808,0.62434,0.68427,0.694419
3,0.452045,0.706548,0.463693,0.168384,0.309778,0.46808,0.672741,0.704658,0.267192,0.63004,0.685004,0.69994
4,0.458268,0.636646,0.467077,0.161103,0.303536,0.48331,0.673845,0.710806,0.268158,0.634046,0.689022,0.699814


In [9]:
Y.head()

0    0
1    0
2    0
3    0
4    0
Name: encoded_label, dtype: int64