In [20]:
from concurrent.futures import ProcessPoolExecutor
# from loky import ProcessPoolExecutor  # for Windows users

def parallel(func, iterable):
    e = ProcessPoolExecutor()
    return e.map(func, iterable)

In [36]:
import numpy as np
import cv2
import os
from pathlib import Path
from PIL import Image

In [37]:
image_file_extensions = ('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')

def is_image_path_valid(path: Path):
    return path.is_file() and path.suffix in image_file_extensions

def verify_image(fn):
    "Confirm that `fn` can be opened"
    try:
        im = Image.open(fn)
        im.draft(im.mode, (32,32))
        im.load()
        return True
    except: return False

def load_image(path):
    return Image.open(path)

In [40]:
def load_images_recursively(root_dir: Path):
    ls = os.listdir
    
    images = []
    label2image = []
    
    def append_if_image(root: Path, filename: str):
        path = root / filename
        
        if is_image_path_valid(path):
            images.append(path)
            label2image.append(root.stem)
        
    for filename in ls(root_dir):
        file_path = root_dir / filename
            
        if file_path.is_dir():
            for nested_filename in ls(file_path):
                append_if_image(file_path, nested_filename)
        else:
            append_if_image(root_dir, filename)
            
    return images, label2image

In [41]:
images, labels = load_images_recursively(Path('./data/new_images/'))

In [42]:
are_valid = parallel(verify_image, images)

In [43]:
valid_images = [images[i] for i, is_valid in enumerate(are_valid) if is_valid]

In [44]:
len(images), len(valid_images)

(1512, 1503)

In [45]:
import numpy as np
import matplotlib.pyplot as plt

def resize_with_padding(image, desired_size=450):
    old_size = image.size # old_size[0] is in (width, height) format

    ratio = float(desired_size) / max(old_size)
    new_size = tuple([int(x * ratio) for x in old_size])
    
    image = np.array(image)
    image = cv2.resize(image, (new_size[0], new_size[1])) 

    delta_w = desired_size - new_size[0]
    delta_h = desired_size - new_size[1]
    top, bottom = delta_h//2, delta_h-(delta_h//2)
    left, right = delta_w//2, delta_w-(delta_w//2)

    color = [0, 0, 0]
    new_im = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT,
        value=color)

    return new_im

In [56]:
import math
import torch
import warnings

np.random.seed(42)

def load_and_prepropess_image(path):
    image = load_image(path)
    image = resize_with_padding(image)
    return torch.Tensor(image)
    

def generate_batch(data = images, labels = labels, batch_size = 32, shuffle=True):
    
    num_of_samples = len(data)
    data = np.copy(data)
    
    if shuffle:
        idx = np.random.permutation(num_of_samples)
    else:
        idx = np.arange(num_of_samples)
        
    batch_samples = math.ceil(num_of_samples / batch_size)
    

    for ndx in range(0, num_of_samples, batch_size):
        batch_idx = idx[ndx: np.min([ndx + batch_size, num_of_samples - 1])]
        
        batch_image_paths = data[batch_idx]
        batch_labels = np.array(labels)[batch_idx]
        
        batch_images = []
        
        for image_path in batch_image_paths:
            try:
                image = load_and_prepropess_image(image_path)
                
                if image.shape[-1] == 3:
                    batch_images.append(image)
                else:
                    warnings.warn(f"{image_path} had {image.shape[-1]} channels")
            except:
                warnings.warn(f"{image_path} could not be loaded")
                continue

        batch_images = torch.stack(batch_images)
        
        yield batch_images, batch_image_paths, batch_labels

In [57]:
batch_images, batch_paths, batch_labels = next(iter(generate_batch()))



In [58]:
from crop_face import prepare_multi_face_model, convert_face_tensor_to_numpy_array

In [59]:
batch_images.shape

torch.Size([31, 450, 450, 3])

In [60]:
mtcnn = prepare_multi_face_model()

In [61]:
faces = mtcnn(batch_images)

In [62]:
import warnings

destination_path = Path('./data/new_image_crops')

def save_faces(faces, labels, paths):
    for i, image_faces in enumerate(faces):
        if image_faces != None and image_faces.shape[0] != 2:
            warnings.warn(f"Warning: Image {paths[i]} does not have 2 faces, but has {len(image_faces)} instead")
        elif image_faces == None:
            warnings.warn(f"Warning: Image {paths[i]} could not be found")
        else:
            for j, face in enumerate(image_faces):
                im_arr = convert_face_tensor_to_numpy_array(face)

                directory = destination_path / f"{labels[i]}_{i}"

                if not directory.exists():
                    directory.mkdir()

                Image.fromarray(im_arr).save(directory / f"{j}.jpg")
        

In [63]:
for batch_images, batch_paths, batch_labels in generate_batch():
    faces = mtcnn(batch_images)
    save_faces(faces, batch_labels, batch_paths)



In [65]:
import glob

len(glob.glob("./data/new_image_crops/*"))

595