In [31]:
import os
import matplotlib.pyplot as plt

import cv2
import tensorflow as tf
import numpy as np
import re

ROOT = "/Users/nicolastobis/PycharmProjects/federated-machine-learning"
DATA = os.path.join(ROOT, "Data")
RAW_DATA = os.path.join(DATA, "Raw Data")
RAW_PAIN_DATA = os.path.join(RAW_DATA, "Pain")
RAW_PAIN_TRAINING_DATA = os.path.join(RAW_PAIN_DATA, "training")
RAW_PAIN_TEST_DATA = os.path.join(RAW_PAIN_DATA, "test")
PREPROCESSED_DATA = os.path.join(DATA, "Preprocessed Data")
PREPROCESSED_PAIN_DATA = os.path.join(PREPROCESSED_DATA, "Pain")
PREPROCESSED_PAIN_TRAINING_DATA = os.path.join(PREPROCESSED_PAIN_DATA, "training")
PREPROCESSED_PAIN_TEST_DATA = os.path.join(PREPROCESSED_PAIN_DATA, "test")

models = tf.keras.models  # like 'from tensorflow.keras import models' (PyCharm import issue workaround)
layers = tf.keras.layers  # like 'from tensorflow.keras import layers' (PyCharm import issue workaround)
optimizers = tf.keras.optimizers  # like 'from tensorflow.keras import optimizers' (PyCharm import issue workaround)

In [29]:
def build_cnn(input_shape):
    """
    Compile and return a simple CNN model for image recognition.

    Configuration:
    Layer 1: Convolution Layer | Filters: 32 | Kernel Size: 3x3 | Activation: Relu
    Layer 2: Max Pooling Layer | Filter: 2x2
    Layer 3: Dense Layer       | Neurons: 32 | Activation: Relu
    Layer 4: Dense Layer       | Neurons: 10 | Activation: Softmax

    Optimizer:      Adam
    Loss function:  Sparse Categorical Cross Entropy
    Loss metric:    Accuracy


    :param input_shape:     image input shape (tuple), e.g. (28, 28, 1)

    :return:
        model               compiled tensorflow model
    """

    # Set up model type
    model = models.Sequential()

    # Add layers, inspired by https://www.tensorflow.org/beta/tutorials/images/intro_to_cnns
    model.add(layers.Conv2D(32, (5, 5), input_shape=input_shape))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (5, 5), input_shape=input_shape))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    return model


def train_cnn(model, train_data, train_labels, epochs=30, callbacks=None):
    """
    Train and return a simple CNN model for image recognition

    :param model:           compiled tensorflow model
    :param train_data:      numpy array
    :param train_labels:    numpy array
    :param epochs:          number of training epochs (i.e. iterations over train_data)
    :param callbacks:       array of callback functions

    :return:
         model              trained tensorflow model
    """

    model.fit(train_data, train_labels, epochs=epochs, batch_size=10, validation_split=0, callbacks=callbacks)
    return model

In [2]:
def load_and_preprocess_image(path):
    img = cv2.imread(path, 0)       # Load image into greyscale
    img = cv2.equalizeHist(img)     # Histogram equilization
    return img

In [3]:
def mirror_folder_structure(inputpath, outputpath):
    for dirpath, dirnames, filenames in os.walk(inputpath):
        structure = os.path.join(outputpath, dirpath[len(inputpath)+1:])
        if not os.path.isdir(structure):
            os.mkdir(structure)
        else:
            print("Folder does already exits!")

In [4]:
def bulk_process_images(inputpath, outputpath, extension):
    for dirpath, dirnames, filenames in os.walk(inputpath):
        structure = os.path.join(outputpath, dirpath[len(inputpath)+1:])
        for file in filenames:
            if file.endswith(extension):
                src = os.path.join(dirpath, file)
                dest = os.path.join(structure, file)
                img = load_and_preprocess_image(src)
                cv2.imwrite(dest,img)

In [21]:
def tf_load_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=0)
    return tf.image.convert_image_dtype(image, tf.float32)

In [22]:
def get_image_paths(root_path):
    image_paths = []
    for dirpath, dirnames, filenames in os.walk(root_path):
        for file in filenames:
            image_paths.append(os.path.join(dirpath, file))
    return image_paths

In [23]:
def get_labels(image_paths, label_type='pain'):
    label_types = {
        'person': 0,
        'session': 1,
        'culture': 2,
        'frame': 3,
        'pain': 4,
    }
    
    labels = []
    for path in image_paths:
        filename = os.path.basename(path)
        filename, extension = os.path.splitext(filename)
        img_labels = filename.split("_")
        label = int(img_labels[label_types[label_type]])
        labels.append(label)
    return labels

In [49]:
def load_all_images_into_tf_dataset(path):
    img_paths = get_image_paths(PREPROCESSED_PAIN_DATA)
    
    path_ds = tf.data.Dataset.from_tensor_slices(img_paths)
    image_ds = path_ds.map(tf_load_and_preprocess_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    
    labels = get_labels(img_paths)
    label_ds = tf.data.Dataset.from_tensor_slices(tf.cast(labels, tf.int64))
    
    image_label_ds = tf.data.Dataset.zip((image_ds, label_ds))
    return image_label_ds, len(labels)

In [56]:
def prepare_dataset_for_training(ds, batch_size, ds_size):
    # Setting a shuffle buffer size as large as the dataset ensures that the data is
    # completely shuffled.
    ds = ds.cache()
    ds = ds.shuffle(buffer_size=ds_size)
    ds = ds.repeat()
    ds = ds.batch(batch_size)
    # `prefetch` lets the dataset fetch batches, in the background while the model is training.
    ds = ds.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return ds

In [57]:
ds_train, ds_size = load_all_images_into_tf_dataset(PREPROCESSED_PAIN_TRAINING_DATA)

In [58]:
ds = prepare_ds_for_training(ds, batch_size=32, ds_size=ds_size)

In [32]:
model = build_cnn((250,250,1))

In [37]:
model.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [44]:
model.fit(ds, epochs=1, steps_per_epoch=tf.math.ceil(len(get_image_paths(PREPROCESSED_PAIN_TRAINING_DATA))/32).numpy())



KeyboardInterrupt: 