In [None]:
from tensorflow.keras import models, layers, backend
import tensorflow as tf
import tensorflow_addons as tfa
from pathlib import Path
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import os, math, random, imagesize

In [None]:
# GLOBALS
DATA_DIR = "data/"

IMAGE_SIZE = 256 
IMAGE_SHAPE = (IMAGE_SIZE, IMAGE_SIZE)
CHANNELS = 3
BATCH_SIZE = 32
EPOCHS = 30
IMG_TENSOR_SHAPE = (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, CHANNELS)

BATCHED_DATASET = tf.keras.preprocessing.image_dataset_from_directory(DATA_DIR, shuffle=False,  image_size=IMAGE_SHAPE, batch_size=BATCH_SIZE)
NUM_BATCHES = len(BATCHED_DATASET)

FILE_PATHS = BATCHED_DATASET.file_paths
CLASS_NAMES = BATCHED_DATASET.class_names
NUM_CLASSES = len(CLASS_NAMES)

In [None]:
# Unbatch the data to include the filepath
UNBATCHED_DATASET = BATCHED_DATASET.unbatch()
images = list(UNBATCHED_DATASET.map(lambda x, y: x))
labels = list(UNBATCHED_DATASET.map(lambda x, y: y))

In [None]:
def file_path_batch(file_paths, num_batches, batch_size, shape, dtype=tf.string):
    strings_need_to_fill_last_batch = NUM_BATCHES*BATCH_SIZE - len(FILE_PATHS)    
    extended_file_paths = file_paths[:] + ["N/a" for i in range(strings_need_to_fill_last_batch)]
    file_names_tensor = tf.reshape(tf.constant(extended_file_paths, dtype=dtype), shape)
    return file_names_tensor

def add_file_paths_to_dataset(ds=None):
    """https://stackoverflow.com/questions/70260531/how-to-attach-or-get-filenames-from-mapdataset-from-image-dataset-from-directory"""
    if ds == None:
        ds = tf.keras.utils.image_dataset_from_directory(DATA_DIR, shuffle=False, batch_size=BATCH_SIZE)
    normalization_layer = tf.keras.layers.Rescaling(1./255)
    
    def change_inputs(images, labels, paths):
        x = normalization_layer(images)
        return x, labels, tf.stack([tuple(paths)])
        
    def map_inputs(images, labels, paths):
        images = tf.cast(images, dtype=tf.uint8)
        paths = tf.constant(((paths)), tf.string)
        return images, images, paths
    
    return ds.map(lambda images, labels: map_inputs(images, labels, paths=ds.file_paths))

In [None]:
FILE_NAMES_TENSOR = file_path_batch(FILE_PATHS, NUM_BATCHES, BATCH_SIZE, [NUM_BATCHES, BATCH_SIZE])
batched_with_file_paths = add_file_paths_to_dataset()
FILE_NAMES_TENSOR, batched_with_file_paths

In [None]:
def resize_images(data_dir: str, shape: tuple=(256,256), keep_aspect_ratio: bool=True):
    Path(data_dir + "\\resize\\keep_aspect").mkdir(parents=True, exist_ok=True)
    Path(data_dir + "\\resize\\no_aspect").mkdir(parents=True, exist_ok=True)
    
    for img_path in Path(data_dir).glob("*.*"):
        img_path = img_path.name
        if keep_aspect_ratio:
            with Image.open(data_dir + "\\" + img_path) as img: # accepts: .png & .jpg
                im = img.resize(shape, Image.ANTIALIAS)
                im.save(data_dir + "\\resize\\no_aspect\\" + img_path, "JPEG")
        else:
            with Image.open(data_dir + "\\" + img_path) as im:
                im.thumbnail(shape, Image.ANTIALIAS)
                im.save(data_dir + "\\resize\\keep_aspect\\" + img_path, "JPEG")


def move_ill_shaped_images(data_dir: str, shape: tuple=(256,256)):
    """ Moves ill shaped images to a folder in the data folder """
    for img_path in _find_images_with_incorrect_dims(data_dir, shape):
        label = img_path.split("\\")[0]
        p = Path(f"{data_dir}\\_problem_children\\{label}")
        if not p.exists():
            p.mkdir(parents=True, exist_ok=True)
        Path(f"{data_dir}\\{img_path}").replace(f"{data_dir}\\_problem_children\\{img_path}")


def delete_ill_shaped_images(data_dir: str, shape: tuple):
    for img_path in _find_images_with_incorrect_dims(data_dir, shape):
        Path(f"{data_dir}\\{img_path}").unlink(missing_ok=True)


def _find_images_with_incorrect_dims(relative_data_path: str, desired_shape: tuple, ignore_problem_children: bool=True):
    """ returns a list relative paths to each image with an incorrect shape """
    size_mismatch = []
    for dirs_winp in Path(relative_data_path).glob("*"):
        if ignore_problem_children and dirs_winp.name == "_problem_children":
                continue
        for image_winp in Path(f"{dirs_winp}\\").glob("*"):
            if imagesize.get(str(image_winp)) != desired_shape:
                size_mismatch.append(f"{dirs_winp.name}\\{image_winp.name}")
    return size_mismatch

In [None]:
def split_tf_dataset_train_val_test(ds, train_split=0.8, val_split=0.1, test_split=0.1, shuffle=True, shuffle_size=10000):
    if shuffle:
        ds = ds.shuffle(shuffle_size)
    train_set = ds.take(round(NUM_BATCHES * train_split))
    val_set = ds.skip(len(train_set)).take(round(len(ds) * val_split))
    test_set = ds.skip(len(train_set)+len(val_set)).take(-1)
    return train_set, val_set, test_set

In [None]:
train_set, val_set, test_set = split_tf_dataset_train_val_test(BATCHED_DATASET, shuffle=True)
print(len(train_set), len(val_set), len(test_set))

In [None]:
def predict_image(model, img):
    img_array = tf.expand_dims(tf.keras.preprocessing.image.img_to_array(img), 0)
    preds = model.predict(img_array)[0]
    pred_label = CLASS_NAMES[np.argmax(preds)]
    confidence = round(100 * (np.max(preds)), 2)
    return pred_label, confidence


def display_single_prediction_from_set(from_set):
    for images_batch, labels_batch in from_set.take(1):
        index = random.randint(0, len(images_batch)-1)
        image = images_batch[index].numpy().astype("uint8")
        actu_label = CLASS_NAMES[labels_batch[index].numpy()]
        pred_label, conf = predict_image(CNN, image)
        print(f"Predicted Label: {pred_label}")
        print(f"Actual Label:    {actu_label}")    
        print(f"Confidence:      {conf}") 
        plt.axis("off")
        plt.imshow(image)


def display_multiple_predictions(from_set, num_rows, num_cols):
    num_images = num_rows*num_cols
    plt.figure(figsize=(16, num_images*2))
    for images, labels in from_set.take(1):
        for i in range(num_images):
            pred_label, conf = predict_image(CNN, images[i].numpy())
            actual_label = CLASS_NAMES[labels[i]]
            plt.subplot(num_rows, num_cols, i+1)            
            plt.title(f"Actual: {actual_label}, \n Predicted: {pred_label}, \n Confidence: {conf}")
            plt.axis("off")
            plt.imshow(images[i].numpy().astype("uint8"))   

In [None]:
# Number of images per class
dict_class_size = dict(zip([i for i in os.listdir(DATA_DIR)], [len(os.listdir(DATA_DIR+"//"+label)) for label in CLASS_NAMES]))
dict_class_size

In [None]:
"""
Undersampling Algorithms for Imbalanced Classification
https://machinelearningmastery.com/undersampling-algorithms-for-imbalanced-classification/
- Random OVERSAMPLING duplicates examples from the minority class in the training dataset 
  and can result in overfitting for some models.
- Random UNDERSAMPLING deletes examples from the majority class and can result in losing 
  information invaluable to a model.
https://machinelearningmastery.com/tactics-to-combat-imbalanced-classes-in-your-machine-learning-dataset/
https://machinelearningmastery.com/tour-of-evaluation-metrics-for-imbalanced-classification/
https://www.analyticsvidhya.com/blog/2020/07/10-techniques-to-deal-with-class-imbalance-in-machine-learning/
"""
import imblearn as imb
from imblearn.under_sampling import NearMiss, CondensedNearestNeighbour, TomekLinks, EditedNearestNeighbours, OneSidedSelection, NeighbourhoodCleaningRule
from collections import Counter
from sklearn.datasets import make_classification
from numpy import where

NearMiss1 = NearMiss(version=1, n_neighbors=3)
NearMiss2 = NearMiss(version=2, n_neighbors=3)
NearMiss3 = NearMiss(version=3, n_neighbors=3)
tomeks_links = TomekLinks()
  # In practice, Tomek Links procedure is often combined with other methods, 
  # such as the Condensed Nearest Neighbor Rule. (OSS)
OSS = OneSidedSelection(n_neighbors=1, n_seeds_S=200)
  # Tomek Links are identified and removed in the majority class. 
  # CNN then removes redundant majority class examples far from the decision boundary.
CNN = CondensedNearestNeighbour(n_neighbors=1)
  # 1:2 minority to majority examples  seeks to balance the class distribution, the algorithm will continue to add misclassified examples to the store (transformed dataset). This is a desirable property.
ENN = EditedNearestNeighbours(n_neighbors=3) 
  # Also best when combined with another undersampling procedure
NCR = NeighbourhoodCleaningRule(n_neighbors=3, threshold_cleaning=0.5)
  # (CNN) Rule to remove redundant examples and the Edited Nearest Neighbors (ENN) Rule to remove noisy or ambiguous examples.
dataset_params = dict(n_samples=10000, n_features=2, n_redundant=0, n_clusters_per_class=1, weights=[0.99], flip_y=0, random_state=1)

for undersample in [NCR]:
    X, y = make_classification(**dataset_params)
    initial_dist = Counter(y)
    X, y = undersample.fit_resample(X, y)
    resulting_dist = Counter(y)
    for label, _ in counter.items():
        row_ix = where(y == label)[0]
        plt.scatter(X[row_ix, 0], X[row_ix, 1], label=str(label))
    print(initial_dist, resulting_dist)
    plt.legend()
    plt.show()