# Exercise 1: TensorFlow Implementation (Filters)

In [None]:
import os
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import sklearn
import tensorflow as tf
from sklearn.datasets import load_sample_image
from tensorflow import keras

print(tf.__version__)
print(keras.__version__)

# Check your device for learning
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
# Get sample images and normalize
china = load_sample_image("china.jpg") / 255
flower = load_sample_image("flower.jpg") / 255
images = np.array([china, flower])

In [None]:
# Print image info
batch_size, height, width, channels = images.shape
print("batch_size :", batch_size, "\nheight * width :", height, "*", width, "\nchannels :", channels)

In [None]:
# Visualizing
CHINA = 0
FLOWER = 1

plt.imshow(images[CHINA][:, :, 0], cmap='gray')
plt.show()

plt.imshow(images[FLOWER][:, :, 0], cmap='gray')
plt.show()

In [None]:
# Create 2 filters
filters = np.zeros(shape=(7, 7, channels, 2), dtype=np.float32)
filters[:, 3, :, 0] = 1 # vertical line
filters[3, :, :, 1] = 1 # horizontal line

In [None]:
# Visualizing
plt.imshow(filters[:, :, 0, 0],cmap="gray") # First filter
plt.show()
plt.imshow(filters[:, :, 0, 1],cmap="gray") # Second filter
plt.show()

In [None]:
# Convolutional computation using filters
outputs = tf.nn.conv2d(images, filters, strides=1, padding="SAME")

plt.imshow(outputs[CHINA, :, :, 0], cmap="gray") # plot 1st image's 1st feature map
plt.show()
plt.imshow(outputs[CHINA, :, :, 1], cmap="gray") # plot 1st image's 2nd feature map
plt.show()

In [None]:
# Temp function for cropping
def crop(images):
    return images[150:220, 130:250]

In [None]:
# Visualizing
print("Input")
plt.imshow(crop(images[CHINA, :, :, 0]),cmap="gray")
plt.show()

In [None]:
# Visualizing
for feature_map_index, filename in enumerate(["china_vertical", "china_horizontal"]):
    print(filename)
    plt.imshow(crop(outputs[CHINA, :, :, feature_map_index]),cmap="gray")
    plt.show()

# Exercise 2: TensorFlow Implementation (Pooling Layers)

In [None]:
# Pooling layer
max_pool = keras.layers.MaxPool2D(pool_size=2,dtype="float64")
outputs = max_pool(images)

# Visualizing
plt.imshow(images[CHINA, :, :, 0], cmap="gray")
plt.show()
print("Input's H*W :", images[CHINA, :, :, 0].shape)

plt.imshow(outputs[CHINA, :, :, 0], cmap="gray")
plt.show()
print("Output's H*W :", outputs[CHINA, :, :, 0].shape)

In [None]:
# Get cropped images and max pooling
cropped_images = np.array([crop(image) for image in images], dtype=np.float32)
output = max_pool(cropped_images)

# Show the figures side-by-side in a grid
fig = plt.figure(figsize=(12, 8))
gs = mpl.gridspec.GridSpec(nrows=2, ncols=2, width_ratios=[2, 2])

# Plot the 1st image
ax1 = fig.add_subplot(gs[0, 0])
ax1.set_title("Input", fontsize=14)
ax1.imshow(cropped_images[CHINA])
ax1.axis("off")

# Plot the output for the 1st image
ax2 = fig.add_subplot(gs[0, 1])
ax2.set_title("Output", fontsize=14)
ax2.imshow(output[CHINA])
ax2.axis("off")
plt.show()

In [None]:
# Depth-wise max pooling layer
def depth_pool(images):
    with tf.device('/CPU:0'):
        dp = keras.layers.Lambda(lambda X: tf.nn.max_pool(X, ksize=(1, 1, 1, 3), strides=(1, 1, 1, 3), padding="VALID"), dtype="float64")
        return dp(images)

output = depth_pool(images)
print("images :", images.shape) # N * H * W * C
print("output :", output.shape) # N * H * W * C

In [None]:
plt.figure(figsize=(12, 8))

# Plot the 1st image
plt.subplot(1, 2, 1)
plt.title("Input", fontsize=14)
plt.imshow(cropped_images[CHINA])

# Plot the output for the 1st image
plt.subplot(1, 2, 2)
plt.title("Output", fontsize=14)
plt.imshow(depth_pool(cropped_images)[CHINA, ..., 0],cmap="gray")

plt.axis("off")
plt.show()

In [None]:
# Average Pooling
avg_pool = keras.layers.AvgPool2D(pool_size=2)
output_avg = avg_pool(cropped_images)

fig = plt.figure(figsize=(12, 8))
gs = mpl.gridspec.GridSpec(nrows=2, ncols=2, width_ratios=[2, 2])

# Plot the 1st image
ax1 = fig.add_subplot(gs[0, 0])
ax1.set_title("Input", fontsize=14)
ax1.imshow(cropped_images[CHINA])
ax1.axis("off")

# Plot the output for the 1st image
ax2 = fig.add_subplot(gs[0, 1])
ax2.set_title("Output", fontsize=14)
ax2.imshow(output_avg[CHINA])
ax2.axis("off")
plt.show()

In [None]:
# Global average pooling
global_avg_pool = keras.layers.GlobalAvgPool2D(dtype="float64")
output = global_avg_pool(images)

print("images :", images.shape)   # N * H * W * C
print("output :", output.shape)   # N * C
print("china  :", output[CHINA])  # R, G, B
print("flower :", output[FLOWER]) # R, G, B

# Exercise 3: CNN Architectures


In [None]:
# Simple CNN model
model = keras.models.Sequential([
    keras.layers.Conv2D(64, 7, activation="relu", padding="same",
        input_shape=[28, 28, 1] # Shape from fashion mnist image
    ),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(128, 3, activation="relu", padding="same"),
    keras.layers.Conv2D(128, 3, activation="relu", padding="same"),
    keras.layers.MaxPooling2D(2),
    keras.layers.Conv2D(256, 3, activation="relu", padding="same"),
    keras.layers.Conv2D(256, 3, activation="relu", padding="same"),
    keras.layers.MaxPooling2D(2),
    keras.layers.Flatten(),
    keras.layers.Dense(128, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(64, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(10, activation="softmax")
])

In [None]:
# Load fashion mnist dataset
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train, X_valid = X_train_full[:-5000], X_train_full[-5000:] # train data point: 5000, valid: others
y_train, y_valid = y_train_full[:-5000], y_train_full[-5000:] # train data point: 5000, valid: others

# Normalization
X_mean = X_train.mean(axis=0, keepdims=True)
X_std = X_train.std(axis=0, keepdims=True) + 1e-7
X_train = (X_train - X_mean) / X_std
X_valid = (X_valid - X_mean) / X_std
X_test = (X_test - X_mean) / X_std

X_train = X_train[..., np.newaxis]
X_valid = X_valid[..., np.newaxis]
X_test = X_test[..., np.newaxis]

In [None]:
# Compile the CNN model
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])

# Training
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid,y_valid))

# Evaluation
score = model.evaluate(X_test, y_test)

In [None]:
# Prediction
X_new = X_test[:10]
y_pred = model.predict(X_new)

# Compared to test and predicted results
print("Ground truth data:", y_test[:10])
print("Prediction result:", y_pred.argmax(axis=1))

# Exercise 4: ResNet with subclassing & sequential API

In [None]:
# Define a residual unit for ResNet-34
class ResidualUnit(keras.layers.Layer):
    def __init__(self, filters, strides=1, activation="relu", **kwargs):
        super().__init__(**kwargs)
        self.activation = keras.activations.get(activation) # relu activation inthis example
        self.main_layers = [
            keras.layers.Conv2D(filters, 3, strides=strides, padding="same", use_bias=False),
            keras.layers.BatchNormalization(),
            self.activation,
            keras.layers.Conv2D(filters, 3, strides=1, padding="same", use_bias=False),
            keras.layers.BatchNormalization()
        ]
        # To make short cut
        self.skip_layers = []
        if strides > 1:
            self.skip_layers = [
                keras.layers.Conv2D(filters, 1, strides=strides, padding="same", use_bias=False),
                keras.layers.BatchNormalization()
            ]

    def call(self, inputs):
        Z = inputs

        # Forward pass through main layers
        for layer in self.main_layers:
            Z = layer(Z)

        # Forward pass through skip layers
        skip_Z = inputs
        for layer in self.skip_layers:
            skip_Z = layer(skip_Z)

        # Combine main path and skip path
        return self.activation(Z + skip_Z)

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(64, 7, strides=2, input_shape=[28, 28, 1], padding="same", use_bias=False))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.MaxPool2D(pool_size=3, strides=2, padding="same"))


# [64, 64, 64, 128, 128, 128, 128, 256, 256, 256, 256, 256, 256, 512, 512, 512] for ResNet-34
prev_filters = 64
for filters in [64] * 3 + [128] * 4 + [256] * 6 + [512] * 3:
    strides = 1 if filters == prev_filters else 2
    model.add(ResidualUnit(filters, strides=strides)) # Adding Residual model
    prev_filters = filters

model.add(keras.layers.GlobalAvgPool2D())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(10, activation="softmax"))

In [None]:
model.summary()

# Exercise 5: Training ResNet-34 for CIFAR10

In [None]:
from tensorflow.keras.datasets import cifar10
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical

def visualize_data(images, categories, class_names):
    fig = plt.figure(figsize=(14, 6))
    fig.patch.set_facecolor('white')
    for i in range(3 * 7):
        plt.subplot(3, 7, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.imshow(images[i])
        class_index = categories[i].argmax()
        plt.xlabel(class_names[class_index])
    plt.show()

# Define CIFAR-10 class names
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
num_classes = len(class_names)

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Normalization and one-hot encoding
x_train = x_train / 255.0
y_train = to_categorical(y_train, num_classes)
x_test = x_test / 255.0
y_test = to_categorical(y_test, num_classes)

In [None]:
# Visualizing
print(x_train.shape, y_train.shape)
visualize_data(x_train, y_train, class_names)

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(64, 7, strides=2, input_shape=[32, 32, 3], padding="same", use_bias=False))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.MaxPool2D(pool_size=3, strides=2, padding="same"))

# [64, 64, 64, 128, 128, 128, 128, 256, 256, 256, 256, 256, 256, 512, 512, 512]
prev_filters = 64
for filters in [64] * 3 + [128] * 4 + [256] * 6 + [512] * 3:
    strides = 1 if filters == prev_filters else 2
    model.add(ResidualUnit(filters, strides=strides))
    prev_filters = filters

model.add(keras.layers.GlobalAvgPool2D())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(10, activation="softmax"))

In [None]:
model.summary()

In [None]:
# Compile and train the model
model.compile(loss="categorical_crossentropy", optimizer="nadam", metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test)

# Exercise 6: CIFAR10 with data augmentation

In [None]:
# For data augmentation
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Load CIFAR-10 dataset
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Normalization and one-hot encoding
x_train = x_train / 255.0
y_train = to_categorical(y_train, num_classes)
x_test = x_test / 255.0
y_test = to_categorical(y_test, num_classes)

# Visualizing
visualize_data(x_train, y_train, class_names)

In [None]:
# Shift range and flip
width_shift  = 3 / 32
height_shift = 3 / 32
flip = True

# Data augmentation
datagen = ImageDataGenerator(
    horizontal_flip=flip,
    width_shift_range=width_shift,
    height_shift_range=height_shift,
)
datagen.fit(x_train)

# Define iterator
it = datagen.flow(x_train, y_train, shuffle=False)

In [None]:
# Visualizing augmented data
batch_images, batch_labels = next(it)
visualize_data(batch_images, batch_labels, class_names)

In [None]:
# Re-train the model with data augmentation
model.compile(loss="categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test)

# Assignment 1: Define ResNet-50

In [None]:
class ResidualUnit50(keras.layers.Layer):
    def __init__(self, filters, strides=1, activation="relu", **kwargs):
        super().__init__(**kwargs)

        # Relu activation in this example
        self.activation = keras.activations.get(activation)
        self.main_layers = [
            keras.layers.Conv2D(filters, 1 ,strides=strides, padding="same", use_bias=False),
            keras.layers.BatchNormalization(),
            self.activation,
            keras.layers.Conv2D(filters,3 ,strides=1, padding="same", use_bias=False),
            keras.layers.BatchNormalization(),
            self.activation,
            keras.layers.Conv2D(4*filters,1 ,strides=1, padding="same", use_bias=False),
            keras.layers.BatchNormalization(),
        ]

        self.skip_layers = [] # To make skip connection
        if strides > 1:
            self.skip_layers = [
                keras.layers.Conv2D(4*filters, 2, strides=strides, padding = "same", use_bias=False),
                keras.layers.BatchNormalization()
            ]

    def call(self, inputs):
        Z = inputs
        for layer in self.main_layers: Z = layer(Z)
        skip_Z = inputs
        for layer in self.skip_layers: skip_Z = layer(skip_Z)
        return self.activation(Z + skip_Z)

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(64, 7, strides=2, input_shape=[32, 32, 3], padding="same", use_bias=False))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.MaxPool2D(pool_size=3, strides=2, padding="same"))

prev_filters = 0
for filters in [64] * 3 + [128] * 4 + [256] * 6 + [512] * 3:
    strides = 1 if filters == prev_filters else 2
    model.add(ResidualUnit50(filters, strides=strides))
    prev_filters = filters

model.add(keras.layers.GlobalAvgPool2D())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(10, activation="softmax"))

In [None]:
model.compile(loss="categorical_crossentropy", optimizer="nadam",metrics=["accuracy"])
history = model.fit(x_train, y_train, epochs=10, validation_data=(x_test,y_test))
score = model.evaluate(x_test, y_test)