<a href="https://colab.research.google.com/github/j-hartmann/automated-image-analysis/blob/branch_s/pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports 

In [None]:
%%capture
# install yolo
!git clone https://github.com/ultralytics/yolov5  # clone
!cd yolov5
!pip install -r requirements.txt  # install
!cd content

In [None]:
%%capture
# install IQA library
!pip install image-quality
import imquality.brisque as brisque

In [None]:
import cv2
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
import shutil
import re
#import distutils.dir_util
import torch
import skimage.measure
import copy
from sklearn.preprocessing import MinMaxScaler
import PIL.Image 
#from matplotlib.colorbar import Colorbar
import matplotlib.pyplot as plt
import math
import time
from google.colab.patches import cv2_imshow
import cv2

In [None]:
# load model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

# Link drive (optional)

In [None]:
from google.colab import drive
drive.mount("/content/drive/")

# Paths 
Path to either drive folder or local directory (GC) -> Need to configure yourself

In [None]:
# path to image folder, e.g. "/content/drive/My Drive/images/" 
path_images = ""
# optional: export min/max images for each category
path_images_out_min_max = ""
# path to image folder, e.g. "/content/drive/My Drive/csvs/"
path_csv_out = ""
print(os.getcwd())


# Control variables
Note: brisque and visualbalance will increase computation time drastically

In [None]:
# control variables for feature extraction
start = 0 #@param {type:"integer", min:0}
end =  50#@param {type:"integer", min:0}
resize = 25 #@param {type:"integer"}
bool_brisque = False #@param ["False", "True"] {type:"raw"}
bool_visualbalance = False #@param ["False", "True"] {type:"raw"}
yolo_extraction_bool = False #@param ["False", "True"] {type:"raw"}
print_values = False #@param ["False", "True"] {type:"raw"}
rescale_w = 480 #@param {type:"integer", min:0}
rescale_h = 480 #@param {type:"integer", min:0}
bool_save_min_max_img = False #@param ["False", "True"] {type:"raw"}

# Functions

In [None]:
# NOTE: WORK IN PROGRESS

# calculate the euclidean distance between two images -> symmetrical
def euclid_dist_symmetrical(image):

  '''
  This function:
  0. Takes in an image
  1. calls helper function adjust_image_shape(img) to 
  adjust the shape of a given image if necessary
  -> when col/row uneven
  2. slices the image horizontally by height
  3. Calculates and returns the sum of all symmetrical points between both image slices
  '''

  # extract h,w
  height, width, channels = image.shape
  # exit if one of width or height are not even
  if (height % 2 != 0) or (width% 2 != 0):
    # adjust shape of image:
    # subtract one col or row or both from given image if it is not even
    # reason: sub image slice need to be of same size
    image = adjust_image_shape(image)

  # cut image vertically
  # determine value to slice image by
  slice_by = width // 2
  # slice array
  slice_1 = image[:, :slice_by]
  slice_2 = image[:, slice_by:]

  # euclidean distance of rgb tuple, symmetric
  # reference: https://en.wikipedia.org/wiki/Euclidean_distance
  euclid_distance = 0
  for index in range(len(slice_1)):
    for index2 in range(len(slice_1[index])):
        euclid_distance += math.sqrt((slice_1[index][index2][0] - slice_2[index][-index2][0])**2 + (slice_1[index][index2][1] - slice_2[index][-index2][1])**2 + (slice_1[index][index2][2] - slice_2[index][-index2][2])**2)
       
  return euclid_distance/(width*height)

In [None]:
# NOTE: WORK IN PROGRESS

# calculate the diagonal symmetrical distance between images
def euclid_dist_diagonal(image):

  '''
  This function:
  0. Takes in an image
  1. calls helper function adjust_image_shape(img) to 
  adjust the shape of a given image if necessary
  -> when col/row uneven
  2. slices the image horizontally by height
  3. Calculates and returns the sum of all diagonals between both image slices
  '''

  # extract h,w
  height, width, channels = image.shape
  # exit if one of width or height are not even
  if (height % 2 != 0) or (width% 2 != 0):
    # adjust shape of image:
    # subtract one col or row or both from given image if it is not even
    # reason: sub image slice need to be of same size
    image = adjust_image_shape(image)

  # cut image vertically
  # determine value to slice image by
  slice_by = height // 2
  # slice array
  slice_1 = image[:slice_by, :]
  slice_2 = image[slice_by:, :]

  # euclidean distance of rgb tuple, symmetric
  # reference: https://en.wikipedia.org/wiki/Euclidean_distance
  euclid_distance = 0
  for index in range(len(slice_1)):
    for index2 in range(len(slice_1[index])):
        euclid_distance += math.sqrt((slice_1[index][index2][0] - slice_2[-index][-index2][0])**2 + (slice_1[index][index2][1] - slice_2[-index][-index2][1])**2 + (slice_1[index][index2][2] - slice_2[-index][-index2][2])**2)
  
  return euclid_distance/(width*height)

In [None]:
# helper function for euclid_dist_diagonal/symmetrical
def adjust_image_shape(img):
  ''' 
  This function:
  0. takes in an image and its h,w
  1. if h or w of slice is uneven, subtract row or col
  2. return adjusted image
  '''

  if img.shape[0] % 2 != 0:
    # subtract row from height
    img = img[:-1, :]
  if img.shape[1] % 2 != 0:
    # subtract col from width
    img = img[:, :-1]
  return img

In [None]:
def _iter(path_imgs, path_csv_out, resize_percent, start, end, bool_brisque, bool_visualbalance, yolo_extraction_bool, print_values):

  '''
  This function:
  0. [x] Takes in input/output paths 
  1. [x] reads input csv
  2. [x] iterates over given image path
  3. [x] reads images
  4. [x] performs feature extraction
  5. [x] merges extracted features with base file
  6. [x] exports new file
  7. [x] yolov5 coco inference
  8. [x] add # image features
  '''

  # structure new df
  df_new = pd.DataFrame(columns=["filename",
                                 "height",
                                 "width",
                                 "r_mean",
                                 "g_mean",
                                 "b_mean",
                                 "hue_avg",
                                 "saturation_avg",
                                 "brightness_avg", 
                                 "greyscale_avg",
                                 "shannon_entropy",
                                 "size_kB",
                                 "visual_complexity_kB",
                                 "visual_balance_euclid_dist_symmetrical",
                                 "visual_balance_euclid_dist_diagonal",
                                 "image_clarity",
                                 "sharpness",
                                 "brisque_IQA",
                                 "warm_hues_perc",
                                 "cold_hues_perc",
                                 "xmin", 
                                 "ymin",
                                 "xmax",
                                 "ymax", 
                                 "confidence", 
                                 "label"])
  
  # read names of given image directory into list
  # slice into array that we want to analyze
  names = os.listdir(path_images)[start:end]

  # iterate over zipped data -> [(x1,y1,z1),...,(xn,yn,zn)]
  for index, name in enumerate(names):

        # read image as BGR
          img = cv2.imread(path_images + name)

          # since read in image differs in output val when read with PIL/cv2, choose PIL as stated in documentation
          pil_image = PIL.Image.open(path_images + name)

          # resize image
          output = cv2.resize(img, (int(img.shape[1] * resize_percent / 100), int(img.shape[0] * resize_percent / 100)))

          # brisque val
          brisque_val = 0
          
          # perform brisque computation
          if bool_brisque:
            try:
              brisque_val = brisque.score(output)
            except:
              print("brisque error")

            if print_values:
              print(f"brisque={brisque_val}")

          # sharpness
          sharpness = cv2.Laplacian(img, cv2.CV_64F).var()
          if print_values:
            print(f"sharpness={sharpness}")

          # get height and width
          height, width, channel = img.shape
          if print_values:
            print("height, width",height, width)
          
          # convert to RGB
          img_rgb = cv2.imread(path_images + name, cv2.COLOR_BGR2RGB)

          # calculate the symmetrical euclidean distance of a given image
          # important: have to verify if this follows description from paper
          euclid_dist_sym_bool = True
          euclid_dist_diag_bool = True
          if bool_visualbalance:
    
            try:
              euclid_dist_sym  = euclid_dist_symmetrical(img_rgb)
            except:
              euclid_dist_sym_bool = False
              print("error: uneven image")

            # euclid dist diagonal visual balance
            # own idea, needs to be verified and testet if useful
            
            try:
              euclid_dist_diag = euclid_dist_diagonal(img_rgb)
            except:
              euclid_dist_diag_bool = False
              print("error: uneven image")

          # split image in RGB format into single color changels
          r,g,b = cv2.split(img_rgb)
          # take mean
          r_mean = r.mean()
          g_mean = g.mean()
          b_mean = b.mean()
          if print_values:
            print(f"r={r_mean}\nb={g_mean}\nb={b_mean}")
          
          # convert to HSV-map
          img_hsv = cv2.imread(path_images + name, cv2.COLOR_BGR2HSV)

          # average of values
          hue = img_hsv[0].mean()
          if print_values:
            print(f"hue mean={hue}")
          saturation = img_hsv[1].mean()
          if print_values:
            print(f"saturation mean={saturation}")
          brightness = img_hsv[2].mean()
          if print_values:
            print(f"brightness mean={brightness}")

          # calculate proportion of warm hues
          hues = img_hsv[0]
          if print_values:
            print(f"hues={hues}")
          # flatten hue values
          hues_flattened = [item for sublist in hues for item in sublist]
          # filter warm hues
          warm_hues = [item for item in hues_flattened if item >= 30 and item <= 110]
          cold_hues = [item for item in hues_flattened if item < 30 or item > 110]
          if print_values:
            print(f"warm hues={warm_hues}")
            print(f"cold hues={cold_hues}")

          # percentage of image that is of warm hue
          warm_hues_perc = len(warm_hues) / len(hues_flattened)
          if print_values:
            print(f"warm hues perc={warm_hues_perc}")
          cold_hues_perc = len(cold_hues) / len(hues_flattened)
          if print_values:
            print(f"cold_hues_perc={cold_hues_perc}")

          # calculate image clarity
          # define scaler
          scaler = MinMaxScaler(feature_range=(0, 1))
          # normalize brightness to [0-1]
          normalize_brightness = scaler.fit_transform(img_hsv[2])
          if print_values:
            print(f"normalize_brightness={normalize_brightness}")

          # flatten nested list
          brightness_flattened = [item for sublist in normalize_brightness for item in sublist]
          if print_values:
            print(f"flattened={brightness_flattened}")

          # count bright values -> [0,1] as described in paper
          # https://pubsonline.informs.org/doi/suppl/10.1287/mnsc.2021.4175/suppl_file/mnsc.2021.4175.sm1.pdf
          bright_values = [item for item in brightness_flattened if item >= 0.7]

          # image clarity
          image_clarity = len(bright_values)/len(brightness_flattened)
          if print_values:
            print(f"image_clarity={image_clarity}")

          # convert to grayscale
          img_grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

          # calculate contrast of image
          # we choose RMS contrast, which is the SD of a gray image
          RMS_contrast = img_grey.std()
          if print_values:
            print(f"RMS_contrast={RMS_contrast}")

          # average of grayscale
          avg_grey = np.average(img_grey)
          if print_values:
            print(f"avg_gray={avg_grey}")
          
          # average RGB values in this image
          # [[[]]] -> []
          avg_rgb_values = np.average(np.average(img_rgb, axis=0), axis=0)
          if print_values:
            print(f"avg_rgb_values={avg_rgb_values}")


          # object detection inference yolov5 (choose model earlier)
          predictions = []
          if yolo_extraction_bool:
            predictions = model(img).pandas().xyxy[0]
            if print_values:
              print(f"predictions={predictions}")

            # number of detected objects -> one xmin (arbitrary value) for each found bb
            numb_objects = len(predictions["xmin"])
            if print_values:
              print(f"numb_objects={numb_objects}")        

          # file size in kB -> file read in as # in bit
          file_size_kB = int(os.path.getsize(path_imgs + name) /(1024 * 8))
          if print_values:
            print(f"file size in bit={file_size_kB}")

          # visual complexity kB
          visual_complexity_kB = file_size_kB / (width*height)
          if print_values:
            print(f"visual complexity kB={visual_complexity_kB}")

          # Shannon-Entropy
          shannon_entropy = skimage.measure.shannon_entropy(img)
          if print_values:
            print(f"shannon-entropy={shannon_entropy}")

          # write values"
          df_new.at[index, "filename"] = name
          df_new.at[index, "hue_avg"] = hue
          df_new.at[index, "saturation_avg"] = saturation
          df_new.at[index, "brightness_avg"] = brightness
          df_new.at[index, "greyscale_avg"] = avg_grey
          df_new.at[index, "width"] = width
          df_new.at[index, "height"] = height
          df_new.at[index, "shannon_entropy"] = shannon_entropy
          df_new.at[index, "visual_complexity_kB"] = visual_complexity_kB
          df_new.at[index, "size_kB"] = file_size_kB
          # image clarity
          df_new.at[index, "image_clarity"] = image_clarity

          # warm/cold hues perc
          df_new.at[index, "warm_hues_perc"] = warm_hues_perc
          df_new.at[index, "cold_hues_perc"] = cold_hues_perc

          # means of colors
          df_new.at[index, "r_mean"] = r_mean
          df_new.at[index, "g_mean"] = g_mean
          df_new.at[index, "b_mean"] = b_mean

          # sharpness
          df_new.at[index, "sharpness"] = sharpness

          # RMS contrast
          df_new.at[index, "RMS_contrast"] = RMS_contrast

          # brisq val
          if bool_brisque:
            df_new.at[index, "brisque_IQA"] = brisque_val
          else:
            df_new.at[index, "brisque_IQA"] = ""

          # euclidean average distance between pixels across the 
          # symmetrical (split image) vertical line
          if euclid_dist_sym_bool and bool_visualbalance:
            df_new.at[index, "visual_balance_euclid_dist_symmetrical"] = euclid_dist_sym
          
          else:
            df_new.at[index, "visual_balance_euclid_dist_symmetrical"] = ""
          
          # euclidean average distance of diagonals
          # if even image
          if euclid_dist_diag_bool and bool_visualbalance:
            df_new.at[index, "visual_balance_euclid_dist_diagonal"] = euclid_dist_diag
          
          else:
              df_new.at[index, "visual_balance_euclid_dist_diagonal"] = ""
            
          # if predicitons is not empty
          if yolo_extraction_bool == True:
            #if predictions.empty() == False:
              # write most predictions in df
              df_new.at[index,"xmin"] = predictions["xmin"].to_list()
              df_new.at[index,"ymin"] = predictions["ymin"].to_list()
              df_new.at[index,"xmax"] = predictions["xmax"].to_list()
              df_new.at[index,"ymax"] = predictions["ymax"].to_list()
              df_new.at[index,"confidence"] = predictions["confidence"].to_list()
              df_new.at[index,"label"] = predictions["name"].to_list()
          
  # drop unnamed if existent
  if df_new.columns[0] == "Unnamed: 0":
    df_new = df_new.drop("Unnamed: 0", axis=1)

  # export merged files
  df_new.to_csv(path_csv_out + f"Pipeline_CV_features_start_{start}_end_{end}.csv")

In [None]:
# read full csv in
# with filename
def investigate(rescale_width, rescale_height, filepath, img_out):
  
  # read previously compute feature file
  df = pd.read_csv(filepath)
  # store names in list
  names = df["filename"].to_list()
  # drop non-numeric values
  if bool_visualbalance:
    df = df.drop(["Unnamed: 0","filename", "ymax", "ymin", "xmax", "xmin", "confidence", "label"], axis=1)
  else:
    df = df.drop(["Unnamed: 0","filename", "ymax", "ymin", "xmax", "xmin","visual_balance_euclid_dist_symmetrical", "visual_balance_euclid_dist_diagonal", "confidence", "label"], axis=1)
  if bool_brisque == False:
    df = df.drop("brisque_IQA", axis=1)


  # zip indices of min, max with columns in df2
  # iterate over them
  for indexmin, indexmax, col in zip(df.idxmin(), df.idxmax(), df.columns):
    min_name = names[int(indexmin)]
    max_name = names[int(indexmax)]

    # read in min image
    imagemin = cv2.imread(path_images + min_name)
    # resize min image to default value
    image_min_scaled = cv2.resize(imagemin, (rescale_width, rescale_height))
    # read in max image
    imagemax = cv2.imread(path_images + max_name)
    # rescale max image to default value
    image_max_scaled = cv2.resize(imagemax, (rescale_width, rescale_height))

    min_val = df.at[int(indexmin), col]
    max_val = df.at[int(indexmax), col]

    double_window =  np.concatenate((image_min_scaled, image_max_scaled), axis=1)
    print(f"\nFeature:  {col}\nLeft(min) value: {min_val} image: {min_name}")
    print(f"Right(max) value: {max_val} image: {max_name}")
    cv2_imshow(double_window)

    # if save min/max selected
    if bool_save_min_max_img:
      # save min
      cv2.imwrite(img_out + col + "_min_" + min_name, imagemin)
      # save max
      cv2.imwrite(img_out + col + "_max_" + max_name, imagemax)




# Analyze images

In [None]:
# time function call
starttime = time.time()
# function call
_iter(path_imgs=path_images,
      path_csv_out=path_csv_out,
      resize_percent = resize,
      start=start,
      end=end,
      bool_brisque=bool_brisque,
      bool_visualbalance=bool_visualbalance,
      yolo_extraction_bool = yolo_extraction_bool,
      print_values=print_values)

endtime = time.time()
print(f"Elapsed time for computing: {endtime-starttime}")

# Inspect extracted features (open to see outputs)

In [None]:
# read extracted feature file
# drop cols with NaN values
df = pd.read_csv(path_csv_out + f"Pipeline_CV_features_start_{start}_end_{end}.csv")

# drop cols containing NaNs
if bool_brisque == False:
  df = df.drop("brisque_IQA", axis=1)
if bool_visualbalance == False:
  df = df.drop(["visual_balance_euclid_dist_symmetrical","visual_balance_euclid_dist_diagonal"], axis=1)
if yolo_extraction_bool == False:
  df = df.drop(["ymax","ymin", "xmax", "xmin", "confidence", "label"], axis=1)
if df.columns[0] == "Unnamed: 0":
  df = df.drop("Unnamed: 0", axis=1)

df

# Investigate max/min feature values (open to see outputs)
Note: Since images might be too small or too big, they will be resized to a default value of 300.

In [None]:
# investigates last file containing extracted features
investigate(rescale_width=rescale_w, rescale_height=rescale_h, filepath = path_csv_out + f"Pipeline_CV_features_start_{start}_end_{end}.csv", img_out=path_images_out_min_max)