In [1]:
import numpy as np
import random
import string
from collections import Counter, defaultdict
import cv2
from PIL import Image
import os
import itertools as it
import shutil

In [2]:
# Group images by size to increase computationally efficiency:

def Image_Optimizer(name_size):

    short_list = []
    files_investigate = {}
    counts = dict()

    # find unique image size values & store in a dictionary:

    size_list = [n[1] for m,n in enumerate(name_size)]  

    poss_dupes = set(size_list)     

    for m, n in enumerate(name_size):
        if n[1] in poss_dupes:           
            counts[n[1]] = counts.get(n[1], 0) + 1

    # remove items with values = 1 to use as a checksum:

    short_list = [c for c in counts if counts[c] > 1]  

    # create a dictionary where keys = file size and values = lists of file names.


    for ns in name_size:
        if ns[1] in files_investigate:
             files_investigate[ns[1]].append(ns[0])
        else:
            files_investigate[ns[1]] = [ns[0]]

    # dictionary filters `files_investigate` for values > 1:

    filtered_dict = defaultdict(list)

    for k, v in files_investigate.items():
        if len(v) > 1:
            filtered_dict[k].append(v)

    if len(short_list) == len(filtered_dict):
        print('>> Created {} groups of images to compare\n'.format(len(short_list)))
        return(filtered_dict)
    else:
        print("Error detected.")

In [3]:
# check for redundant images in <path> directory:

def Check_Duplicates(path):
    
    duplicates_list, corrupted_list, size_list, name_size = [], [], [], []
    comp_dict = {}

    # create list of all files in directory & check for errors:
    
    img_list = [i for i in os.listdir(path)] 
    
    for i, j in enumerate(img_list):
        read = cv2.imread(path + j)
    
        # create list of tuples in filename, size format:

        try:
            temp = read.shape
            size_list = (j, temp)
            name_size.append(size_list)

        except:
            if j == '.ipynb_checkpoints':
                shutil.rmtree(path + j)     # remove jupyter labs checkpoint file if present
            else:
                print('Bad image found:', j)
    
    print('Original image list size:', len(name_size))
    
    # optimize processing by organizing images into groups of equal size:
    
    prepped_images = Image_Optimizer(name_size)   
    
    # conduct pairwise comparison of images sharing a key:
    
    for k, v in prepped_images.items():
        v = sum(v, [])                        # flatten values list
        img_combos = it.combinations(v, 2)    # create list of pairwise combinations from values

        print('Image combinations being processed:', len(list(img_combos)))

        for i, j in enumerate(img_combos):
            try:
                original = cv2.imread(path + j[0])
                duplicate = cv2.imread(path + j[1])

                if original.shape == duplicate.shape:    # double check that image dimensions equal 
                  
                # compute image differences and split by channel:
                
                    difference = cv2.subtract(original, duplicate)
                    b, g, r = cv2.split(difference)

                    if cv2.countNonZero(b) == 0 and cv2.countNonZero(g) == 0 and cv2.countNonZero(r) == 0:
                        print('Images are completely Equal:', j)
                        duplicates_list.append(j)    # append duplicate filenames to list
            
            except:
                print('Bad file(s) detected:', fname)
                corrupted_list.append(fname)
                
    print('Duplicates: ', len(duplicates_list))
    print('Corrupted: ', len(corrupted_list))
    
    if len(duplicates_list) > 0:
        
        d_list = [i[0] for i in duplicates_list]
        
        for dl in duplicates_list:
            if dl[0] in comp_dict:
                comp_dict[dl[0]].append(dl[1])
            else:
                comp_dict[dl[0]] = [dl[1]]
        
        # compile a list of values from comp_dict to delete:
        
        dump_list = []
        
        for cd, v in comp_dict.items():
            dump_list.extend(v)
        
        print('Dump List:',len(dump_list))
        
        # remove duplicate values and convert to ordered data structure
        
        dump_list = list(set(dump_list))    
        print(dump_list)
        
        # user confirmation to delete files:
        
        user_answer = input('Remove duplicate files? [y] or [n]')

        if user_answer == 'y':
            ctr = 0
            for d in dump_list:
                os.remove(path + d)
                ctr += 1
            print('Deleted {} files.'.format(ctr))

In [None]:
# Run duplicates function:

Check_Duplicates(path)