<a href="https://colab.research.google.com/github/shenghaoc/ee5907-ca2/blob/main/cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from PIL import Image
from pathlib import Path


In [2]:
# CONSTANTS
NUM_SUBJECTS = 68
NUM_CHOSEN = 25
NUM_IMAGES_PER_SUBJECT = 170

TRAIN_RATIO = 0.7
NUM_IMAGES = NUM_CHOSEN * NUM_IMAGES_PER_SUBJECT
NUM_TRAIN_IMAGES_PER_SUBJECT = np.int_(np.around(TRAIN_RATIO * NUM_IMAGES_PER_SUBJECT))
NUM_TRAIN_IMAGES = NUM_CHOSEN * NUM_TRAIN_IMAGES_PER_SUBJECT
NUM_TEST_IMAGES = NUM_IMAGES - NUM_TRAIN_IMAGES

NUM_SELFIES = 10
NUM_TRAIN_SELFIES = np.int_(np.around(TRAIN_RATIO * NUM_SELFIES))
NUM_TEST_SELFIES = NUM_SELFIES - NUM_TRAIN_SELFIES
SELFIE_LABEL = NUM_SUBJECTS + 1

NUM_TOTAL_TRAIN_IMAGES = NUM_TRAIN_IMAGES + NUM_TRAIN_SELFIES
NUM_TOTAL_TEST_IMAGES = NUM_TEST_IMAGES + NUM_TEST_SELFIES

SEED1 = 2021
SEED2 = 2022

WIDTH = 32
HEIGHT = 32
NUM_PIXELS = WIDTH * HEIGHT

# New constants due to need to fit input for tensorflow
NUM_PEOPLE = NUM_CHOSEN + 1  # meaning plus the person with 10 selfies
NUM_CHANNELS = 1


In [3]:
# Ensure that the directory to store figures is created
figures_directory = Path("report") / "figures"
figures_directory.mkdir(exist_ok=True)


In [4]:
# Must start from 1 to accommodate folder naming scheme
# Choose NUM_CHOSEN elements from NUM_SUBJECTS integers without replacement
chosen = np.random.default_rng(SEED1).choice(
    np.arange(1, NUM_SUBJECTS + 1), NUM_CHOSEN, replace=False
)


In [5]:
# Load images from disk
# Use lists for manual looping without use of numpy functions
images = []
labels = []

# Assume PIE is in pwd
directory = Path("PIE")
for i in range(len(chosen)):
    # Do not flatten yet, need to split train and test for each subject
    subject_images = []
    subject_labels = []
    subdirectory = directory / str(chosen[i])
    # Order is arbitrary for glob, but better to shuffle anyway
    files = list(subdirectory.glob("*.jpg"))
    np.random.default_rng(SEED2).shuffle(files)
    for filename in files:
        # PIL is slower but OpenCV is unnecessary
        im = Image.open(filename)
        subject_images.append(np.array(im))
        # For tensorflow input, use sequential label
        subject_labels.append(i)
    images.append(subject_images)
    labels.append(subject_labels)


In [6]:
# Slightly altered code for selfies
selfie_images = []
selfie_labels = []

directory = Path("resized")
# Assume selfies have been resized and folder is in pwd
for filename in directory.glob("*.jpg"):
    im = Image.open(filename)
    selfie_images.append(np.array(im))
    # For tensorflow input, use number of chosen subjects (25) to avoid clashes
    selfie_labels.append(NUM_CHOSEN)


In [7]:
# Further processing without disk access
# Train-test split
images_train, images_test = np.split(
    np.array(images), [NUM_TRAIN_IMAGES_PER_SUBJECT], axis=1
)
labels_train, labels_test = np.split(
    np.array(labels), [NUM_TRAIN_IMAGES_PER_SUBJECT], axis=1
)

selfie_images_train, selfie_images_test = np.split(
    np.array(selfie_images), [NUM_TRAIN_SELFIES]
)
selfie_labels_train, selfie_labels_test = np.split(
    np.array(selfie_labels), [NUM_TRAIN_SELFIES]
)


In [8]:
# Flatterning
# For Conv2D, a 4+D tensor is required, add 1 for the grayscale channel
images_train = images_train.reshape(NUM_TRAIN_IMAGES, WIDTH, HEIGHT, NUM_CHANNELS)
selfie_images_train = selfie_images_train.reshape(
    NUM_TRAIN_SELFIES, WIDTH, HEIGHT, NUM_CHANNELS
)
images_test = images_test.reshape(NUM_TEST_IMAGES, WIDTH, HEIGHT, NUM_CHANNELS)
selfie_images_test = selfie_images_test.reshape(
    NUM_TEST_SELFIES, WIDTH, HEIGHT, NUM_CHANNELS
)

labels_train = labels_train.reshape(NUM_TRAIN_IMAGES)
labels_test = labels_test.reshape(NUM_TEST_IMAGES)

# Combine PIE images and selfies
total_images_train = np.append(
    images_train,
    selfie_images_train,
    axis=0,
)
total_labels_train = np.append(labels_train, selfie_labels_train)

total_images_test = np.append(
    images_test,
    selfie_images_test,
    axis=0,
)
total_labels_test = np.append(labels_test, selfie_labels_test)


In [9]:
# Start of CNN code
import tensorflow as tf


In [10]:
# CONSTANTS
CONV_KERNEL_SIZE = 5
MAX_POOL_KERNEL_SIZE = 2
MAX_POOL_SIZE = 2

BATCH_SIZE = 128
SHUFFLE_BUFFER_SIZE = 100

AUTOTUNE = tf.data.AUTOTUNE


In [11]:
# Load numpy arrays
# Use built-in one-hot encoder, the numerical labels have no meaning, encoding is necessary to avoid misinterpretation
train_dataset = tf.data.Dataset.from_tensor_slices(
    (total_images_train, tf.keras.utils.to_categorical(total_labels_train))
)
test_dataset = tf.data.Dataset.from_tensor_slices(
    (total_images_test, tf.keras.utils.to_categorical(total_labels_test))
)

train_dataset = (
    train_dataset.cache()
    .shuffle(SHUFFLE_BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .prefetch(buffer_size=AUTOTUNE)
)
test_dataset = test_dataset.cache().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)


In [12]:
tf.keras.backend.clear_session()
model = tf.keras.Sequential(
    [
        # Not really necessary, but good practice?
        tf.keras.layers.Rescaling(1.0 / 255, input_shape=(WIDTH, HEIGHT, NUM_CHANNELS)),
        tf.keras.layers.Conv2D(20, CONV_KERNEL_SIZE, activation="relu"),
        tf.keras.layers.MaxPool2D(
            pool_size=(MAX_POOL_KERNEL_SIZE, MAX_POOL_KERNEL_SIZE),
            strides=(MAX_POOL_SIZE, MAX_POOL_SIZE),
        ),
        tf.keras.layers.Conv2D(50, CONV_KERNEL_SIZE, activation="relu"),
        tf.keras.layers.MaxPool2D(
            pool_size=(MAX_POOL_KERNEL_SIZE, MAX_POOL_KERNEL_SIZE),
            strides=(MAX_POOL_SIZE, MAX_POOL_SIZE),
        ),
        tf.keras.layers.Flatten(),  # too many dimensions after Conv2D
        tf.keras.layers.Dense(500, activation="relu"),
        # Keras documentation: often used for last layer because result can be interpreted as 
        # a probability distribution 
        tf.keras.layers.Dense(NUM_PEOPLE, activation="softmax"),
    ]
)

model.compile(
    optimizer=tf.keras.optimizers.Adam(amsgrad=True), # newest ADAM
    loss=tf.keras.losses.CategoricalCrossentropy(), # multi-class labeling
    metrics=["accuracy"],
)


In [13]:
model.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 rescaling (Rescaling)       (None, 32, 32, 1)         0         
                                                                 
 conv2d (Conv2D)             (None, 28, 28, 20)        520       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 14, 14, 20)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 10, 10, 50)        25050     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 5, 5, 50)         0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 1250)              0

In [14]:
# This is cumulative!
model.fit(train_dataset, epochs=100)


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

<keras.callbacks.History at 0x169acc422e0>

In [15]:
model.predict(test_dataset)


array([[9.96000469e-01, 3.23834063e-17, 1.22068917e-14, ...,
        2.99125755e-14, 5.60216151e-10, 1.69231473e-09],
       [9.91360128e-01, 3.62780282e-18, 5.31080883e-14, ...,
        9.33449417e-14, 2.89284063e-09, 2.08377005e-09],
       [9.94833827e-01, 8.78540389e-17, 1.07576615e-15, ...,
        2.35206819e-13, 9.93924179e-11, 2.26093363e-07],
       ...,
       [5.43096590e-08, 2.67539354e-14, 6.14998058e-20, ...,
        3.69697101e-22, 7.55938803e-19, 3.16684581e-02],
       [1.19749124e-15, 1.01459995e-02, 2.46369018e-04, ...,
        3.46680103e-15, 1.48096184e-12, 3.43635445e-03],
       [7.07404881e-12, 2.19561094e-22, 3.06999298e-25, ...,
        8.17932796e-25, 1.69240906e-22, 9.99999166e-01]], dtype=float32)

In [16]:
model.evaluate(test_dataset)




[0.14663906395435333, 0.9702660441398621]