In [1]:
import image_similarity_measures
from image_similarity_measures.quality_metrics import metric_functions
from itertools import combinations
import cv2
import pandas as pd
import numpy as np
from typing import List, Tuple, Dict
import time
import matplotlib.pyplot as plt
import seaborn as sns
import seaborn_image as isns
from PIL import Image, ImageOps
import glob
import sys, os
import json

In [2]:
#VAR declarations
#TODO manage this via config
all_metrics =  ['sre']
#maximum number of frames to capture
#max_frames = 150
comparison_images = {}
keyframes = []
max_keyframes = 4

parent_dir = "data/"
video_path = "data/videos/"
test_video = 'testvid.mov'

user_id = "0042"


In [3]:
#create necessary dictionaries
def create_folder_structure(user_id: str):
    #Create folder structure for new dataset and export names to .json file for further usage

    folder_dict = {
                "input": f"{user_id}_input",
                "mask": f"{user_id}_mask", 
                "models": f"{user_id}_models",
                "output": f"{user_id}_output",
                "target": f"{user_id}_target"
    }
    
    
    for key, directory in list(folder_dict.items()):
        path = os.path.join(parent_dir,directory)
        print(path)
        try:
            os.mkdir(path)
            print(f"{directory} successfully created")
        except OSError as error:
            print("Folder already exists. Skipping..")

        #add parent_dict to .json. This is necessary for keeping flexibility if refactoring is necessary
        folder_dict["ParentDir"] = parent_dir
        with open ("folderstructure.json","w") as f:
            json.dump(folder_dict,f)


create_folder_structure(user_id)


data/0042_input
Folder already exists. Skipping..
data/0042_mask
Folder already exists. Skipping..
data/0042_models
Folder already exists. Skipping..
data/0042_output
Folder already exists. Skipping..
data/0042_target
Folder already exists. Skipping..


In [4]:
#Neas video
#cap = cv2.VideoCapture('testvid.mov')
#Richards video
cap = cv2.VideoCapture(f"{video_path}{test_video}")
#Or most common ID for webcams:
#cap = cv2.VideoCapture(0)

In [5]:
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Frames per second using video.get(cv2.CAP_PROP_FPS) : {fps}")

Frames per second using video.get(cv2.CAP_PROP_FPS) : 30.029060381013885


In [6]:
#Add all frames as we'll need them later
#Maybe some padding for frame_name

frame_nr = 0
while cap.isOpened():
    ret, frame = cap.read()

    if ret:
        #TODO: change the filename like ffmpeg ones if we split the video into images
        frame_name = f"frame_{frame_nr}.jpg" 
        cv2.imwrite(f"{parent_dir}{user_id}_input/{frame_name}",frame)
    else:
        cap.release()
        break


In [7]:
#TODO ensure min_frames, max_frames for comparison pool

frame_nr = 0
#TODO ensure how to handle exceptions
#TODO add all frames/framenames somewhere as we'l need them later
#TODO ensure max_len of video or the pre-selection must be adapted

while cap.isOpened():
    ret, frame = cap.read()

    if ret:
        #TODO: change the filename like ffmpeg ones if we split the video into images
        frame_name = f"frame_{frame_nr}.jpg" 
        cv2.imwrite(f"{parent_dir}{user_id}_input/{frame_name}",frame)
        #print(frame_name)
        #cv2.imwrite(f"frame{frame_nr}.jpg", frame)
        #i.e. at 30 fps, this advances one second
        #int(fps) for rounding
        frame_nr += int(fps)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_nr)
        comparison_images[frame_name] = frame
        #comparison_images.append(tuple([frame_name,frame]))
    else:
        cap.release()
        break


In [8]:
def compare(df: pd.DataFrame, img_name_a: str, img_name_b: str, img_a: np.ndarray, img_b: np.ndarray, metrics: List[str]) -> pd.DataFrame:
    for metric in metrics:
        metric_func = metric_functions[metric]
        start_time = time.time()
        out_value = float(metric_func(img_a, img_b))
        end_time = time.time()
        output_dict = {}
        output_dict['metric'] = metric
        output_dict['x'] = img_name_a
        output_dict['y'] = img_name_b
        output_dict['value'] = out_value
        output_dict['walltime'] = end_time - start_time
        df = df.append(output_dict,ignore_index=True)
        output_dict['y'] = img_name_a
        output_dict['x'] = img_name_b
        df = df.append(output_dict,ignore_index=True)
        
    return df


In [9]:
#Build some nice plots for better evaluation
#Select one metric and pivot the dataframe for creating the heatmap plot

df = pd.DataFrame()
for pair in combinations(comparison_images.keys(), 2):
    #print(pair)
    #*pair : instead handing over a tuple(a,b) , hand over each element as separate argument a,b
    df = compare(df, *pair, comparison_images[pair[0]], comparison_images[pair[1]], all_metrics)
    
plot_df = df[df.metric == 'sre'].pivot('x', 'y', 'value')
ax = sns.heatmap(plot_df, annot=True)

AttributeError: 'DataFrame' object has no attribute 'metric'

In [None]:
def select_keyframes(comparison_images: Dict[str,np.ndarray], max_keyframes: int) -> List[np.ndarray]:
    
    compare_options = list(combinations(comparison_images.keys(), max_keyframes))
    compare_df = pd.DataFrame()
    
    #Calculate and accumalate the values on all 2-combinations of each 4-tuple
    #(frame_0, frame_13, frame_60, frame_115)
    # metric(frame_0,frame_13) + metric(frame_0,frame_60) +...
    for option in compare_options:
        value = 0.0
        for x, y in combinations(option, 2):
            #get the value of the current combination and square it
            #add to the final value of the 4-tuple
            value += plot_df[x][y] ** 2

        compare_df = compare_df.append({"option": option, "value": value}, ignore_index=True)

    # select the 4-tuple with lowest metric value as keyframes
    keyframes = list(compare_df.sort_values(by=['value'], ascending=True).iloc[0].option)
    
    return keyframes


In [None]:
keyframes = select_keyframes(comparison_images, max_keyframes)
keyframes

In [None]:
for img_name in keyframes:
    cv2.imwrite(img_name, comparison_images[img_name])
    
xlen = len(keyframes)
fig, axes = plt.subplots(1, xlen)
fig.set_size_inches(20,10)
for ax, img_path in zip(axes, keyframes):
    img = Image.open(img_path)
    isns.imgplot(img, ax=ax, cbar=False)
    ax.invert_yaxis()
    ax.set_xlabel(img_path)