## Imports

In [13]:
import os
import sys
from typing import Dict, List, Optional, Sequence, TextIO
import numpy as np
import pandas as pd
import urllib.request
import pickle

import tensorflow as tf
os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")  # Report only TF errors by default

from DS_2_2024_HW2_efficient_net import EfficientNet

In [14]:
tf.keras.utils.set_random_seed(16)

## Data

In [15]:
# Height, width and number of channels
H: int = 224
W: int = 224
C: int = 3
LABELS: int = 34
    
# Declare functions
def parse(example) -> Dict[str, tf.Tensor]:
    example = tf.io.parse_single_example(example, {
        "image": tf.io.FixedLenFeature([], tf.string),
        "mask": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)})
    example["image"] = tf.image.convert_image_dtype(tf.image.decode_jpeg(example["image"], channels=3), tf.float32)
    example["mask"] = tf.image.convert_image_dtype(tf.image.decode_png(example["mask"], channels=1), tf.float32)
    return example

In [16]:
# Load data. Train & Dev datasets are stored as tfrecord objects (see https://www.tensorflow.org/api_docs/python/tf/data/TFRecordDataset)-
train = tf.data.TFRecordDataset("hw2_trainsample.tfrecord")
dev = tf.data.TFRecordDataset("hw2_devsample.tfrecord")
    
# Process data. Train & Dev are decoded from jpeg; Test data are constructed from list.
train = train.map(parse)
dev = dev.map(parse)

# Build Model

Continue by initializing EfficientNet, building the model, augmentation and training.

In [17]:
# Call constructor of EfficientNet and call your object 'base_model'. You need to specify several parameters:
# - width_coefficient, depth_coefficient: determine the scale for depth and width of the network. Reasonable to start with both equal to 1.0
# - default_resolution: resolution of an input image
# - dropout_rate: dropout rate used throughout the network
# - include_top: if True then the network will include the final classification layer and produce a prediction for 1000 classes in ImageNet classification dataset,
# if False, the network will return 'image features' (the result of the last global average pooling)
# weights: path to the file for pre-trained weights. Use 'efficientnet.h5' that you have downloaded from the Kaggle competition

base_model = EfficientNet(width_coefficient = 1.0,
                          depth_coefficient= 1.0,
                          default_resolution = 224,
                          weights = "efficientnet.h5")




In [18]:
# Calling constuctor of EfficientNet you have constructed a tf.keras.Model object. 
# Iterate over layers of that model and set their attribute 'trainable' to False for most of them,
# (in this way you will train only some of the large number of parameters; - is it reasonable to train the first or the last layers?)
# Use method .summary() to investigate the number of trainable and non-trainable parameters

# Freeze all the layers
for layer in base_model.layers:
    layer.trainable = False
    
# Unfreeze the last two blocks
for layer in base_model.layers[-34:]:
    layer.trainable = True

# Print model summary
len(base_model.layers)
base_model.summary()


Model: "efficientnet"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 stem_conv (Conv2D)             (None, 112, 112, 32  864         ['input_5[0][0]']                
                                )                                                                 
                                                                                                  
 stem_bn (BatchNormalization)   (None, 112, 112, 32  128         ['stem_conv[0][0]']              
                                )                                                      

In [19]:
# Use API logic seen on the practicals to build the following model:
# The input layer takes images with 3 channels and passes them to the initialized EfficientNet model
# After processing through Efficient model, add few more fully connected layers, with regularization of your choice
# Output of your model should be a vector of probabilities for each of the breed.
from tensorflow.keras.regularizers import l2

# Input image
inputs = tf.keras.layers.Input([H, W, C])

# Get the outputs from the EfficientNet base model
efficientnet_output = base_model(inputs)


# Extract the outputs
output_1000 = efficientnet_output[0]               # (None, 1000)
output_7_7_1280 = efficientnet_output[1]           # (None, 7, 7, 1280)
output_14_14_112 = efficientnet_output[2]          # (None, 14, 14, 112)
output_28_28_40 = efficientnet_output[3]           # (None, 28, 28, 40)
output_56_56_24 = efficientnet_output[4]           # (None, 56, 56, 24)
output_112_112_16 = efficientnet_output[5]         # (None, 112, 112, 16)

# Apply Global Average Pooling to each output
pooled_7_7_1280 = tf.keras.layers.GlobalAveragePooling2D()(output_7_7_1280)
pooled_14_14_112 = tf.keras.layers.GlobalAveragePooling2D()(output_14_14_112)
pooled_28_28_40 = tf.keras.layers.GlobalAveragePooling2D()(output_28_28_40)
pooled_56_56_24 = tf.keras.layers.GlobalAveragePooling2D()(output_56_56_24)
pooled_112_112_16 = tf.keras.layers.GlobalAveragePooling2D()(output_112_112_16)

# Concatenate all pooled outputs
concatenated = tf.keras.layers.Concatenate()([
    output_1000,
    pooled_7_7_1280,
    pooled_14_14_112,
    pooled_28_28_40,
    pooled_56_56_24,
    pooled_112_112_16
])

# Add custom layers with L2 regularization
hidden = tf.keras.layers.Dense(1024, activation='relu', kernel_regularizer=l2(0.01))(concatenated)
hidden = tf.keras.layers.Dropout(0.5)(hidden)
hidden = tf.keras.layers.Dense(512, activation='relu', kernel_regularizer=l2(0.01))(hidden)
hidden = tf.keras.layers.Dropout(0.5)(hidden)

# Output layer
outputs = tf.keras.layers.Dense(LABELS, activation='softmax')(hidden)  # Output layer with softmax activation


# Create model
model = tf.keras.Model(inputs=inputs, outputs=outputs)

# Print model summary
model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_6 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 efficientnet (Functional)      [(None, 1000),       5330564     ['input_6[0][0]']                
                                 (None, 7, 7, 1280)                                               
                                , (None, 14, 14, 11                                               
                                2),                                                               
                                 (None, 28, 28, 40)                                         

In [20]:
# Compile your model
model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=tf.keras.metrics.SparseCategoricalAccuracy())


In [21]:
# We define pipeline that will consist of augmentation and batching

def augment_train(input):
    image = input['image']
    label = input['label']
    
    # Augmentation procedures (modify to your wish)

    # Random horizontal flip
    image = tf.image.random_flip_left_right(image)
    # Random vertical flip
    image = tf.image.random_flip_up_down(image)
    # Random brightness change
    image = tf.image.random_brightness(image, max_delta=0.2)
    # Random contrast change
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    # Random saturation change
    image = tf.image.random_saturation(image, lower=0.8, upper=1.2)
    # Random hue change
    image = tf.image.random_hue(image, max_delta=0.1)
    # JPEG quality adjustment
    image = tf.image.random_jpeg_quality(image,80,100)
    # Random rotation
    image = tf.image.rot90(image, k = np.random.randint(1, 4))
    # Random crop
    image = tf.image.random_crop(image, [H, W, C])

    return image, label

def augment_dev(input):
    image = input['image']
    label = input['label']
    
    # Augmentation procedures (add to your wish)
    # Note: if we add augmentation also to dev dataset, we can assume that
    # performance of test dataset will be better as test dataset is not augmented
    
    return image, label

# Now we apply augmentation using .map()
train = train.map(augment_train)
dev = dev.map(augment_dev)

# Apply .batch() to 'train' and 'dev' with some choice of batch size
batch_size = 32
train = train.batch(batch_size)
dev = dev.batch(batch_size)

In [22]:
# Fit your model using early stopping
from tensorflow.keras.callbacks import EarlyStopping

num_epochs = 100


# Define early stopping criteria
early_stopping = EarlyStopping(
    monitor='val_loss',  # Metric to monitor for improvement
    patience=5,  # Number of epochs with no improvement after which training will be stopped
    mode="min",
    restore_best_weights=True  # Restore the best model weights based on the monitored metric
)

# Fit the model on the training data with early stopping
history = model.fit(
    train,
    epochs=num_epochs,
    validation_data=dev,
    callbacks=[early_stopping]  # Pass the EarlyStopping callback
)


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100


# Evaluate

Load test dataset, predict labels and store results. Note that (for educational purposes) we load the data from pickled list. Therefore, transformation to Tensorflow dataset has to be done.

In [23]:
# Load test dataset stored as pickled list.
with open('hw2_outofsample.pickle', 'rb') as handle:
    test = pickle.load(handle)

# Produce dataset from list
test = tf.data.Dataset.from_tensor_slices(test)

# Apply batching also to 'test'
test = test.batch(batch_size)

# Predict
test_prediction_prob = model.predict(test)
test_prediction = [
    np.argmax(probs) for probs in test_prediction_prob
]

# Store results
pd.DataFrame({'TARGET': test_prediction}).to_csv('./prediction.csv', index=True,index_label= "ID", header=True)


