In [1]:
 ### Installations ###
#####################

# scene detection
!pip install --upgrade scenedetect[opencv]

Collecting scenedetect[opencv]
  Downloading scenedetect-0.6.1-py3-none-any.whl (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.1/115.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: scenedetect
Successfully installed scenedetect-0.6.1


In [2]:
### Imports ###
###############

# general
import numpy as np
import pandas as pd
import shutil
import os
from tqdm import tqdm

# image processing
import cv2
from google.colab.patches import cv2_imshow

# scene detection
from scenedetect import detect, ContentDetector

# audio processing
import librosa

In [3]:
### Set data directory
##################

# connect to drive
from google.colab import drive
drive.mount('/content/drive')

# set data directory
data_dir = '/content/drive/MyDrive/0_Masterarbeit/2_Pipelines/Data'

Mounted at /content/drive


In [4]:
### Upload video and audio files ###
####################################

data_file = 'ferrari'

# copy zip files
shutil.copy(os.path.join(data_dir, f'Video_{data_file}.zip'), './')
shutil.copy(os.path.join(data_dir, f'Audio_{data_file}.zip'), './')

# create folders to unpack zip files to
os.makedirs('./Video')
os.makedirs('./Audio')

# unpack zip files
shutil.unpack_archive(f'./Video_{data_file}.zip', extract_dir = './Video')
shutil.unpack_archive(f'./Audio_{data_file}.zip', extract_dir = './Audio')

### Extract Color Values

In [5]:
### Function to extract color values ###
########################################

def cal_color_values(video, n_parts = 3):

  # Open the video file
  cap = cv2.VideoCapture(video)

  # Get the number of frames and frame rate
  num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  fps = int(cap.get(cv2.CAP_PROP_FPS))

  # Define proportion of frames that are considered (in this case: one per second)
  interval = fps

  # Calculate the number of frames in each part
  frames_per_part = int(num_frames / n_parts)

  # Initialize a list to store the average hue of each part
  average_hue = []
  average_sat = []
  average_value = []


  ### Loop to consider each defined part seperately ###
  #####################################################
  for i in range(n_parts):

    # Set the starting frame for the part
    start_frame = i * frames_per_part

    # Set the position of the video file to the starting frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    # Initialize the running total mean for the channels and counter for the number of considered frames for the part
    total_hue = 0
    total_sat = 0
    total_value = 0
    num_fpp_considered = 0

    ### Loop over the frames in the part ###
    ########################################
    for j in range(frames_per_part):

        # Read the frame
        ret, frame = cap.read()

        # Break if no more frames
        if not ret:
            break

        # Compute if first frame of interval
        if j%interval == 0:

            # Convert the frame to the HSV color space
            hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

            # Extract the hsv channels
            hue_channel = hsv_frame[:, :, 0] # color information
            sat_channel = hsv_frame[:, :, 1] # saturation/ intensity/purity
            value_channel = hsv_frame[:, :, 2] # brightness/lightness

            # Calculate mean of channels over considered frame
            mean_hue = hue_channel.mean()
            mean_sat = sat_channel.mean()
            mean_value = value_channel.mean()

            # Add to running total mean for each channel
            total_hue += mean_hue
            total_sat += mean_sat
            total_value += mean_value

            # Count considered frames for part
            num_fpp_considered  += 1


    # Calculate the averages part and add to list
    average_hue.append(total_hue/num_fpp_considered)
    average_sat.append(total_sat/num_fpp_considered)
    average_value.append(total_value/num_fpp_considered)

  # Release the video capture object and close the windows
  cap.release()
  cv2.destroyAllWindows()

  #return np.array(average_hue), np.array(average_sat), np.array(average_value)
  return average_hue, average_sat, average_value

In [6]:
### Apply function to all videos ###
###################################

# Create empty lists to store
video_id_list = []
hue_list = []
sat_list = []
value_list = []

# Loop through videos and extract color values
n_parts = 3
for video_file in tqdm(os.listdir('./Video')):
  average_hue, average_sat, average_value = cal_color_values(os.path.join('./Video', video_file), n_parts = n_parts)

  video_id_list.append(video_file[:-4])
  hue_list.append(average_hue)
  sat_list.append(average_sat)
  value_list.append(average_value)

100%|██████████| 182/182 [11:13<00:00,  3.70s/it]


In [7]:
### Create pandas dataframe ###
###############################

# Normalize values according to opencv documentation:
# -> https://opencv24-python-tutorials.readthedocs.io/en/latest/py_tutorials/py_imgproc/py_colorspaces/py_colorspaces.html

# video id df
video_id_df = pd.Series(video_id_list, name = 'video_id')

# create hue df
hue_df = pd.DataFrame(np.array(hue_list) / 179)
hue_name_dict = {i:f"hue_part_{i+1}" for i in range(n_parts)}
hue_df = hue_df.rename(columns = hue_name_dict)

# create sat df
sat_df = pd.DataFrame(np.array(sat_list) / 255)
sat_name_dict = {i:f"sat_part_{i+1}" for i in range(n_parts)}
sat_df = sat_df.rename(columns = sat_name_dict)

# create value df
value_df = pd.DataFrame(np.array(value_list) / 255)
value_name_dict = {i:f"value_part_{i+1}" for i in range(n_parts)}
value_df = value_df.rename(columns = value_name_dict)

# merge into one df
color_df = pd.concat([video_id_df, hue_df, sat_df, value_df], axis=1)

### Extract scene dynamics values

In [8]:
### Define functions ###
########################

# write of function to magnitude from motion vectors due to observed problem with cv2 implementation, which returns 'inf', when vectors very small
# -> https://github.com/opencv/opencv/issues/19506
# -> https://github.com/numpy/numpy/issues/5228#issue-46746558
def cartToPol(x, y):
    ang = np.arctan2(y, x)
    mag = np.hypot(x, y)
    return mag, ang

# write own function, which attributes shot boundaries to n equal parts over the video
def shot_boundary_temp_distr(scene_list, n_parts):

  # if no scenes are detected (e.g. whole video in one shot)
  if len(scene_list) == 0:
    shot_boundaries_per_part = [0,0,0]
    return shot_boundaries_per_part

  # if scenes can be detected in video
  else:
    # get list of scene lens
    scene_len_list = np.array([(t[1]-t[0]).get_seconds() for t in scene_list])

    # get cumsum of scene_len_list
    scene_len_cs = np.cumsum(scene_len_list)

    # devide video n equal parts and get list start/stop values of video in seconds
    video_parts_ls = np.linspace(start = 0, stop = scene_len_cs[-1], num = n_parts + 1)

    # save number of shot boundaries per video part
    shot_boundaries_per_part = []
    for i in range(n_parts):
        shot_boundaries_per_part.append(((scene_len_cs > video_parts_ls[i]) & (scene_len_cs <= video_parts_ls[i+1])).sum())

    # return list of shot boundary counts per second for each video part
    return list(np.array(shot_boundaries_per_part) / (scene_len_cs[-1] / 3))



# function to detect average optical flow per scene
# -> https://www.geeksforgeeks.org/python-opencv-dense-optical-flow/
def average_optical_flow(video, scene_list):

  # initialize video capturing
  cap = cv2.VideoCapture(video)

  # if no scene can be detected (e.g. whole video in one shot)
  if len(scene_list) == 0:

    # extract number of frames in the video
    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # split the video in 3 equal parts
    equal_parts = list(np.linspace(0, num_frames, 4, dtype = int))

    # extract tupels of first and last scene for each part
    scene_list = [(start, stop) for start, stop in zip(equal_parts, equal_parts[1:])]

    # get list of start frames of each part
    scene_start_frames = [i[0] for i in scene_list]

    # get list of part lengths in num frames
    scene_len_in_frames = [i[1] - i[0] for i in scene_list]



  else:

    # get list of start frames for each scene
    scene_start_frames = [i[0].get_frames() for i in scene_list]

    # get list of scene lenths in num frames
    scene_len_in_frames = [i[1].get_frames() - i[0].get_frames() for i in scene_list]



  ### initialize empty list and variables
  of_per_scene = [] # emplty list to store average of values per scene
  fps = int(cap.get(cv2.CAP_PROP_FPS))
  frame_interval = int(fps/4) # set interval of frames to consider
  min_scene_len =   int(fps/2) # set min scene lenth to look at (need to be as long to extract at least two frames given frame_interval)


  # loop to access start and len of individual parts
  for start_frame, len_in_frames in zip(scene_start_frames, scene_len_in_frames):

    # continue if scenes are to short
    if len_in_frames < min_scene_len:
      of_per_scene.append(np.nan) # append nan to not consider in np.nanmean() function later
      continue

    # set video to beginning of respective scene
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)


    # Initialize the variables for calculating running total optical flow over respective part
    total_flow_x = 0
    total_flow_y = 0
    num_frames = 0


    # Loop through each frame of the video part
    for f in range(len_in_frames):

        # Read frames
        ret, frame = cap.read()

        # If there are no more frames, break out of the loop
        if not ret:
            break

        # only contider frames that are start points of given frame interval
        if (f+1) % frame_interval != 0: # add one frame (f+1) to not exactly catch the shot boundary at beginning of scene, which would distort the optical flow calculation
            continue

        # Convert the frame to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # If this is not the first frame, compute the optical flow
        if num_frames > 0:

            # calculate flow values for each pixel
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

            # Split the flow into x and y components
            flow_x = flow[...,0]
            flow_y = flow[...,1]

            # Add the flow to the running total
            total_flow_x += flow_x
            total_flow_y += flow_y

        # Save the current frame as the previous frame for the next iteration
        prev_gray = gray.copy()
        num_frames += 1

    # Compute the average optical flow per part over considered frames
    avg_flow_x = (total_flow_x / num_frames)
    avg_flow_y = (total_flow_y / num_frames)

    # compute the magnitude of the flow vector for each pixel
    avg_flow_mag, _ = cartToPol(avg_flow_x, avg_flow_y)

    # append the average over all pixels
    of_per_scene.append(np.mean(avg_flow_mag))

  return of_per_scene


# function to calc the optical flow over n eqal parts of the video
def avg_flow_temp_distr(shot_boundary_counts, of_per_scene):

  # get cumulative distribution of scene_counts with start value of 0
  cdf = np.zeros(len(shot_boundary_counts) + 1).astype(int)
  cdf[1:] = np.cumsum(shot_boundary_counts)

  # get mean value of portion of scenes wrt. current video part (n_part specified while for scene count function)
  # the flow of scenes that ovelap two parts are attributed to the latter one
  avg_flow_per_part = []
  for start, stop in zip(cdf, cdf[1:]):

    if start == stop: # if one parts contains no shot boundary (i.e. the scene is longer than the whole part), the average optical flow of this scene is saved for the part
      avg_flow_per_part.append(np.nanmean(of_per_scene[start]))

    else: # if there are multiple scenes in one part, the average over the scenes is saved (this is the most common case for the youtube short videos)
      avg_flow_per_part.append(np.nanmean(of_per_scene[start:stop]))

  return avg_flow_per_part

In [None]:
### Apply ###
#############

video_id_list = []
sb_list = []
flow_list = []

n_parts = 3

for video_file in tqdm(os.listdir('./Video')):

  # save video id
  video_id_list.append(video_file[:-4])

  # path to current video
  current_video = os.path.join('./Video', video_file)

  # extract scene list of current video
  scene_list = detect(current_video, ContentDetector())

  # extract scene counts over n parts of the video
  sb_counts = shot_boundary_temp_distr(scene_list, n_parts = n_parts)
  sb_list.append(sb_counts)

  # extract optical flow over n parts of the video
  if len(scene_list) == 0:
    flow_list.append(average_optical_flow(current_video, scene_list))

  else:
    flow_list.append(avg_flow_temp_distr(sb_counts, average_optical_flow(current_video, scene_list)))

  0%|          | 0/182 [00:00<?, ?it/s]INFO:pyscenedetect:Downscale factor set to 2, effective resolution: 360 x 640
INFO:pyscenedetect:Detecting scenes...


In [None]:
# Create video id df
video_id_df = pd.Series(video_id_list, name = 'video_id')

# Create shot boundary df
sb_df = pd.DataFrame(sb_list)
sb_name_dict = {i:f"sb_part_{i+1}" for i in range(n_parts)}
sb_df = sb_df.rename(columns = sb_name_dict)

# Create  optical flow df
flow_df = pd.DataFrame(flow_list)
flow_name_dict = {i:f"flow_part_{i+1}" for i in range(n_parts)}
flow_df = flow_df.rename(columns = flow_name_dict)

# Merge into one df
dynamics_df = pd.concat([video_id_df, sb_df, flow_df], axis=1)

In [None]:
dynamics_df

### Audio values

In [None]:
def low_level_audio_features(audio_data, n_parts):

  # define variables
  FRAME_SIZE = 1024
  HOP_LENGTH = 512

  # Root mean squared energy (from spectrogram -> more accurate)
  S, phase = librosa.magphase(librosa.stft(audio_data, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH))
  rmse_audio = librosa.feature.rms(S=S, frame_length=FRAME_SIZE, hop_length=HOP_LENGTH)[0]

  # Zero crossing rate
  zcr_audio = librosa.feature.zero_crossing_rate(audio_data, frame_length=FRAME_SIZE, hop_length=HOP_LENGTH)[0]

  # Spectral centroid
  sc_audio = librosa.feature.spectral_centroid(y=audio_data, sr=sr, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH)[0]

  # Spectral bandwidth
  sbw_audio = librosa.feature.spectral_bandwidth(y=audio_data, sr=sr, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH)[0]

  # extract mean values per measure for each video part
  part_len = int(len(rmse_audio) / n_parts)


  average_rmse = []
  average_zcr = []
  average_sc = []
  average_sbw = []

  for i in range(n_parts):
    average_rmse.append(rmse_audio[i*part_len: (i+1) * part_len].mean())
    average_zcr.append(zcr_audio[i*part_len: (i+1) * part_len].mean())
    average_sc.append(sc_audio[i*part_len: (i+1) * part_len].mean())
    average_sbw.append(sbw_audio[i*part_len: (i+1) * part_len].mean())


  return average_rmse, average_zcr, average_sc, average_sbw

In [None]:
### Apply function to all videos ###
###################################

# Create empty lists to store
video_id_list = []
rmse_list = []
zcr_list = []
sc_list = []
sbw_list = []

# Loop through audio files and extract color values
for audio_file in tqdm(os.listdir('./Audio')):
  audio_data, sr = librosa.load(os.path.join('./Audio', audio_file))
  average_rmse, average_zcr, average_sc, average_sbw = low_level_audio_features(audio_data, n_parts = 3)

  video_id_list.append(audio_file[:-4])
  rmse_list.append(average_rmse)
  zcr_list.append(average_zcr)
  sc_list.append(average_sc)
  sbw_list.append(average_sbw)

In [None]:
### Create pandas dataframe ###
###############################

# video id df
video_id_df = pd.Series(video_id_list, name = 'video_id')

# create rmse df
rmse_df = pd.DataFrame(rmse_list)
rmse_name_dict = {i:f"rmse_part_{i+1}" for i in range(n_parts)}
rmse_df = rmse_df.rename(columns = rmse_name_dict)

# create zcr df
zcr_df = pd.DataFrame(zcr_list)
zcr_name_dict = {i:f"zcr_part_{i+1}" for i in range(n_parts)}
zcr_df = zcr_df.rename(columns = zcr_name_dict)

# create sc df
sc_df = pd.DataFrame(np.array(sc_list) / 11025)
sc_name_dict = {i:f"sc_part_{i+1}" for i in range(n_parts)}
sc_df = sc_df.rename(columns = sc_name_dict)

# create sbw df
sbw_df = pd.DataFrame(np.array(sbw_list) / 11025)
sbw_name_dict = {i:f"sbw_part_{i+1}" for i in range(n_parts)}
sbw_df = sbw_df.rename(columns = sbw_name_dict)

# merge into one df
low_level_audio_df = pd.concat([video_id_df, rmse_df, zcr_df, sc_df, sbw_df], axis=1)

In [None]:
### Merge all 3 dataframes into one ###
#######################################

# merge
low_level_features = color_df.merge(dynamics_df, on='video_id').merge(low_level_audio_df, on='video_id')

# move video_id column to beginning
first_column = low_level_features.pop('video_id')
low_level_features.insert(0, 'video_id', first_column)

In [None]:
### Save as csv file ###
########################
save_dir = '/content/drive/MyDrive/0_Masterarbeit/2_Pipelines/Feature_outputs'

low_level_features.to_csv(f'./low_level_features_{data_file}.csv')
shutil.copy(f'./low_level_features_{data_file}.csv', save_dir)