### Splitting the dataset into 25% randomly and dividing it into 5 equal sets to apply different distortions and get labels.

The dataset is split into 5 equal sets to apply differents types of distortions to each set. These distortion types are as follows:
- Brightness/Luminance
- Contrast
- Sharpness
- Noise
- Pixel Blocking

The distortions will be applied to each set on different severity levels. Once the distortion is applied, a score will be given to each video based on how much distortion was applied and the score will be saved in a csv along with the type of distortion type, video name, severity score.

In [1]:
import os
import random
import cv2
import numpy as np
import shutil
import pandas as pd
import tqdm
import skimage as ski
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
random.seed(10)

In [None]:
# Defining the paths of input and output folders.
original_folder = 'EPIC-KITCHENS-100/EPIC-KITCHENS-100-train/train'
output_folder = 'Code/Epic_vid_balanced_new_7Sept'
os.makedirs(output_folder, exist_ok=True)

In [4]:
# creating a new dataframe for storing details regarding the videos.

df = pd.DataFrame(columns=['Video Name', 'Type', 'Degrade Type Factor', 'Quality Score'])

In [5]:
# Getting the paths of the videos in the folder
all_videos = [os.path.join(original_folder, video) for video in os.listdir(original_folder) if video.endswith('.mp4')]

# Calculating the total number of videos in the folder and creating a subset of 25%
total_videos = len(all_videos)
subset_size = int(total_videos * 0.25)

selected_videos = random.sample(all_videos, subset_size)

In [6]:
# Creating 5 different sets for each distortion type
distort_set_size = subset_size // 5

# brightness_set = random.sample(selected_videos, distort_set_size)
# contrast_set = random.sample(selected_videos, distort_set_size)
# sharpness_set = random.sample(selected_videos, distort_set_size)
# noise_set = random.sample(selected_videos, distort_set_size)
# blockiness_set = random.sample(selected_videos, distort_set_size)

# Make a copy of selected_videos to modify safely
remaining_videos = selected_videos.copy()

# Create each set by sampling from the remaining available videos
brightness_set = random.sample(remaining_videos, distort_set_size)
# Remove selected videos from the remaining pool
remaining_videos = [video for video in remaining_videos if video not in brightness_set]

contrast_set = random.sample(remaining_videos, distort_set_size)
remaining_videos = [video for video in remaining_videos if video not in contrast_set]

sharpness_set = random.sample(remaining_videos, distort_set_size)
remaining_videos = [video for video in remaining_videos if video not in sharpness_set]

noise_set = random.sample(remaining_videos, distort_set_size)
remaining_videos = [video for video in remaining_videos if video not in noise_set]

blockiness_set = random.sample(remaining_videos, distort_set_size)
remaining_videos = [video for video in remaining_videos if video not in blockiness_set]



In [7]:
dict = {"Brightness Set" : brightness_set,
        "Contrast Set" : contrast_set,
        "Sharpness Set" : sharpness_set,
        "Noise Set": noise_set,
        "Pixel Blocking Set": blockiness_set}

In [8]:
df_selected = pd.DataFrame(data = dict)

In [9]:
df_selected.to_csv("selected_videos.csv")

In [10]:
# print(len(set(blockiness_set)))

LUMINANCE

In [11]:
# function to change luminance of the video

def adjust_luminance(frame, luminance_factor=1.0):

    # Convert the frame from BGR to YUV color space
    yuv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)

    # Split the channels
    y, u, v = cv2.split(yuv_frame)

    # Apply luminance adjustment
    y = cv2.convertScaleAbs(y, alpha=luminance_factor, beta=0)

    # Merge the channels back
    adjusted_yuv_frame = cv2.merge((y, u, v))

    # Convert back to BGR color space
    adjusted_frame = cv2.cvtColor(adjusted_yuv_frame, cv2.COLOR_YUV2BGR)
    
    return adjusted_frame


def luminance_quality_score(luminance_factor):
    """
    Calculate a quality score between 0 and 1 based on the luminance adjustment factor.
    """
    if luminance_factor < 1:
        return luminance_factor  # The factor itself is between 0 and 1, which can directly represent the score.
    else:  # luminance_factor > 1
        return max(0, 1 - ((luminance_factor - 1.1) / (3 - 1.1)))  # Linearly scale the score between 1 and 0
    

def change_video_luminance(video_path, output_path):
    """
    Adjust the luminance of a video.
    """
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
    
    # Randomly choose between increasing the luminance or not
    choice = random.choice(['increase','decrease'])
    
    # Increase luminance with a factor > 1 (e.g., luminance_factor = 1.5)
    # Decrease luminance with a factor < 1 (e.g., luminance_factor = 0.7)
    if choice == "increase":
        luminance = random.triangular(1.1,3)
    elif choice == "decrease":
        luminance = random.random()

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Adjust the luminance of the frame
        adjusted_frame = adjust_luminance(frame, luminance_factor=luminance)

        # Write the adjusted frame to output
        out.write(adjusted_frame)

    cap.release()
    out.release()

    # Calculate quality score for the entire video
    quality_score = luminance_quality_score(luminance)

    return quality_score , luminance


In [12]:
# Creating bad data by changing brightness

good_vids_brightness = random.sample(brightness_set, len(brightness_set) // 4)

for video in tqdm.tqdm(brightness_set, desc="Processing Videos", unit="video"):
    video_name = os.path.basename(video).split('\\')[-1]
    if video in good_vids_brightness:
        shutil.copy(video, output_folder)
        quality_score = 1  # Assign a perfect score to original videos
        new_row = {'Video Name': video_name, 
                   'Type' : 'Luminance',
                   'Degrade Type Factor' : np.nan,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    else:
        bad_video_path = os.path.join(output_folder, f"{video_name}")
        quality_score, luminance_factor = change_video_luminance(video, bad_video_path)
        new_row = {'Video Name': video_name,
                   'Type' : 'Luminance',
                   'Degrade Type Factor' : luminance_factor,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

  df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
Processing Videos: 100%|██████████| 3360/3360 [04:20<00:00, 12.89video/s]


CONTRAST

In [13]:
# function to change contrast of the video

def contrast_quality_score(alpha):
    """
    Calculate a quality score between 0 and 1 based on the contrast adjustment factor.
    """
    if alpha < 1:
        return alpha  # The factor itself is between 0 and 1, which can directly represent the score.
    else:  # contrast_factor > 1
        return max(0, 1 - ((alpha - 1.1) / (3 - 1.1)))  # Linearly scale the score between 1 and 0


def adjust_contrast_in_video(video_path, output_path):
    """
    Adjust the contrast of a video.

    Parameters:
    - video_path: Path to the input video file.
    - output_path: Path to save the output video file.
    - alpha: Contrast control (1.0-3.0), default is 1.0.
    """
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
    
    # Randomly choose between increasing the contrast or not
    choice = random.choice(['increase','decrease'])
    
    # Increase contrast by setting alpha > 1 (e.g., alpha = 1.5)
    # Decrease contrast by setting alpha < 1 (e.g., alpha = 0.7)
    if choice == "increase":
        alpha = random.triangular(1.1,3)
    elif choice == "decrease":
        alpha = random.random()

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Adjust the contrast of the frame
        contrast_frame = cv2.convertScaleAbs(frame, alpha=alpha)

        # Write the contrast-adjusted frame to output
        out.write(contrast_frame)

    cap.release()
    out.release()

    # Calculate quality score for the entire video
    quality_score = contrast_quality_score(alpha)

    return quality_score , alpha


In [14]:
# Creating bad data by changing contrast

good_vids_contrast = random.sample(contrast_set, len(contrast_set) // 4)

for video in tqdm.tqdm(contrast_set, desc="Processing Videos", unit="video"):
    video_name = os.path.basename(video).split('\\')[-1]
    if video in good_vids_contrast:
        shutil.copy(video, output_folder)
        quality_score = 1  # Assign a perfect score to original videos
        new_row = {'Video Name': video_name, 
                   'Type' : 'Contrast',
                   'Degrade Type Factor' : np.nan,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    else:
        bad_video_path = os.path.join(output_folder, f"{video_name}")
        quality_score, contrast_factor = adjust_contrast_in_video(video, bad_video_path)
        new_row = {'Video Name': video_name, 
                   'Type' : 'Contrast',
                   'Degrade Type Factor' : contrast_factor,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

Processing Videos: 100%|██████████| 3360/3360 [02:53<00:00, 19.33video/s]


SHARPNESS

In [15]:
def apply_blur(video_path, output_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
    
    # Randomly choose a blur factor between 1 and 10
    # sigma closer to 10 blurs the video drastically.
    sigma = random.randint(1, 10)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Apply Gaussian blur using skimage
        blurred = ski.filters.gaussian(frame, sigma=sigma, truncate=3.5, channel_axis=-1)

        # Convert the float image back to uint8 format for OpenCV compatibility
        blurred = (blurred * 255).astype(np.uint8)
        
        # Write the blurred frame to output
        out.write(blurred)

    cap.release()
    out.release()

    # Calculate the quality score based on the blur factor
    quality_score = max(0, 1 - (sigma / 10))

    return quality_score, sigma


In [16]:
# Creating bad data adding blur to the video

good_vids_sharpness = random.sample(sharpness_set, len(sharpness_set) // 4)

for video in tqdm.tqdm(sharpness_set, desc="Processing Videos", unit="video"):
    video_name = os.path.basename(video).split('\\')[-1]
    if video in good_vids_sharpness:
        shutil.copy(video, output_folder)
        quality_score = 1  # Assign a perfect score to original videos
        new_row = {'Video Name': video_name, 
                   'Type' : 'Sharpness',
                   'Degrade Type Factor' : np.nan,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    else:
        bad_video_path = os.path.join(output_folder, f"{video_name}")
        quality_score, sharpness_factor = apply_blur(video, bad_video_path)
        new_row = {'Video Name': video_name, 
                   'Type' : 'Sharpness',
                   'Degrade Type Factor' : sharpness_factor,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

Processing Videos: 100%|██████████| 3360/3360 [39:13<00:00,  1.43video/s]  


NOISE

In [17]:
def apply_noise(video_path, output_path):

    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return

    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
    
    # Randomly choose a noise factor between 0 and 1
    # noise_level closer to 1 creates a more noiser video.
    noise_level = random.uniform(0.1, 1)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Apply Gaussian noise
        # noise = np.random.normal(0, noise_level, frame.shape).astype(np.uint8)
        # noisy_frame = cv2.add(frame, noise)

        # Apply Speckle noise
        noise = np.random.normal(0, noise_level, frame.shape)
        noisy_frame = frame + frame * noise
        noisy_frame = np.clip(noisy_frame, 0, 255).astype(np.uint8)
        
        # Write the noisy frame to output
        out.write(noisy_frame)
        
    cap.release()
    out.release()

    quality_score = 1 - (noise_level - 0.1) / (1 - 0.1)

    return quality_score, noise_level

In [18]:
# Creating bad data by adding noise to the video

good_vids_noise = random.sample(noise_set, len(noise_set) // 4)

for video in tqdm.tqdm(noise_set, desc="Processing Videos", unit="video"):
    video_name = os.path.basename(video).split('\\')[-1]
    if video in good_vids_noise:
        shutil.copy(video, output_folder)
        quality_score = 1  # Assign a perfect score to original videos
        new_row = {'Video Name': video_name, 
                   'Type' : 'Noise',
                   'Degrade Type Factor' : np.nan,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    else:
        bad_video_path = os.path.join(output_folder, f"{video_name}")
        quality_score, noise_factor = apply_noise(video, bad_video_path)
        new_row = {'Video Name': video_name, 
                   'Type' : 'Noise',
                   'Degrade Type Factor' : noise_factor,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

Processing Videos: 100%|██████████| 3360/3360 [31:45<00:00,  1.76video/s]  


BLOCKINESS

In [None]:
# def apply_pixel_blocking(frame, block_size):
#     """
#     Apply pixel blocking (pixelation) to a frame.
    
#     Parameters:
#     - frame: The input video frame (numpy array).
#     - block_size: The size of each block of pixels.
    
#     Returns:
#     - pixelated_frame: The pixelated version of the input frame.
#     """
#     (h, w) = frame.shape[:2]  # Get the height and width of the frame

#     # Calculate the number of blocks along each dimension
#     x_steps = np.arange(0, w, block_size)
#     y_steps = np.arange(0, h, block_size)
    
#     # Copy the frame to create a pixelated version
#     pixelated_frame = frame.copy()

#     # Loop through each block position
#     for x in x_steps:
#         for y in y_steps:
#             # Determine the block region
#             x_end = min(x + block_size, w)
#             y_end = min(y + block_size, h)
            
#             # Extract the block region
#             block = frame[y:y_end, x:x_end]
            
#             # Compute the mean color of the block
#             (B, G, R) = [int(x) for x in np.mean(block, axis=(0, 1))]
            
#             # Assign the mean color to all pixels in the block
#             pixelated_frame[y:y_end, x:x_end] = (B, G, R)
    
#     return pixelated_frame

# def apply_pixel_blocking_to_video(video_path, output_path):
#     """
#     Apply pixel blocking to a video.
    
#     Parameters:
#     - video_path: Path to the input video file.
#     - output_path: Path to save the output video file.
#     - block_size: The size of each block of pixels.
#     """
#     cap = cv2.VideoCapture(video_path)

#     if not cap.isOpened():
#         print(f"Error: Could not open video {video_path}")
#         return

#     fourcc = cv2.VideoWriter_fourcc(*'MP4V')
#     out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
#                           (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))
    
#     # Randomly choose a block_size between 1 and 10
#     # block_size closer to 10 creates more blocky videos.
#     block_size = random.randint(1, 10)

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break

#         # Apply pixel blocking to the frame
#         pixelated_frame = apply_pixel_blocking(frame, block_size=block_size)

#         # Write the pixelated frame to output
#         out.write(pixelated_frame)

#     cap.release()
#     out.release()

#     # Calculate the quality score based on the block size factor
#     quality_score = max(0, 1 - (block_size / 10))

#     return quality_score, block_size



In [19]:
import cv2
import numpy as np
import random


def apply_pixel_blocking(frame, block_size):
    """
    Apply pixel blocking (pixelation) to a frame using OpenCV's resize method.
    
    Parameters:
    - frame: The input video frame (numpy array).
    - block_size: The size of each block of pixels.
    
    Returns:
    - pixelated_frame: The pixelated version of the input frame.
    """
    (h, w) = frame.shape[:2]
    
    # Resize down by the block size, then resize up to original size
    temp = cv2.resize(frame, (w // block_size, h // block_size), interpolation=cv2.INTER_LINEAR)
    pixelated_frame = cv2.resize(temp, (w, h), interpolation=cv2.INTER_NEAREST)
    
    return pixelated_frame

def process_frame_batch(frames, block_size):
    """
    Helper function to apply pixel blocking to a batch of frames.
    """
    return [apply_pixel_blocking(frame, block_size) for frame in frames]

def apply_pixel_blocking_to_video(video_path, output_path):
    """
    Apply pixel blocking to a video using multi-threading with batch processing.
    
    Parameters:
    - video_path: Path to the input video file.
    - output_path: Path to save the output video file.
    """
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return

    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS),
                          (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

    # Randomly choose a block_size between 1 and 10
    block_size = random.randint(1, 10)

    # Read all frames into memory
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    # Process frames in batches using ThreadPoolExecutor
    batch_size = 10  # Process 10 frames in parallel
    with ThreadPoolExecutor(max_workers=4) as executor:  # You can adjust the number of workers
        futures = []
        for i in range(0, len(frames), batch_size):
            batch = frames[i:i+batch_size]
            futures.append(executor.submit(process_frame_batch, batch, block_size))

        for future in as_completed(futures):
            processed_frames = future.result()
            for processed_frame in processed_frames:
                out.write(processed_frame)

    cap.release()
    out.release()

    # Calculate the quality score based on the block size factor
    quality_score = max(0, 1 - (block_size / 10))

    return quality_score, block_size


In [20]:
# Creating bad data by applying pixel blocking

good_vids_blockiness = random.sample(blockiness_set, len(blockiness_set) // 4)

for video in tqdm.tqdm(blockiness_set, desc="Processing Videos", unit="video"):
    video_name = os.path.basename(video).split('\\')[-1]
    if video in good_vids_blockiness:
        shutil.copy(video, output_folder)
        quality_score = 1  # Assign a perfect score to original videos
        new_row = {'Video Name': video_name, 
                   'Type' : 'Blockiness',
                   'Degrade Type Factor' : np.nan,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    else:
        bad_video_path = os.path.join(output_folder, f"{video_name}")
        quality_score, blockiness_factor = apply_pixel_blocking_to_video(video, bad_video_path)
        new_row = {'Video Name': video_name, 
                   'Type' : 'Blockiness',
                   'Degrade Type Factor' : blockiness_factor,
                   'Quality Score': quality_score}
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

Processing Videos: 100%|██████████| 3360/3360 [03:57<00:00, 14.14video/s]


In [21]:
# saving the dataframe as csv
df.to_csv("balanced_dataset_details_7thSept.csv")