# All the Code

#### Imports

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers, models, datasets, callbacks, regularizers
import matplotlib.pyplot as plt
import numpy as np
import os
import logging

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
logger = tf.get_logger()
logger.setLevel(logging.ERROR) # or logging.INFO, logging.WARNING, etc.


#### DenseNets

In [2]:
# '''
# rewrite of https://github.com/naver-ai/rdnet/blob/main/rdnet/rdnet.py in tensorflow

# poor man's dense net
# '''

# class DenseNet:
#     def __init__(self):
#         # rdnet_tiny:
#         # n_layer = 7
#         # num_init_features: 64,
#         # growth_rates: [64] + [104] + [128] * 4 + [224],
#         # num_blocks_list: [3] * n_layer,
#         # is_downsample_block: (None, True, True, False, False, False, True),
#         # transition_compression_ratio: 0.5,

#         input_shape = (32, 32, 3)

#         self.class_num = 10

#         self.n_layers = 5 
#         self.num_init_features = 64 
#         # self.growth_rates = [64] + [104] + [128] * 4 + [224]
#         self.growth_rates = [32] + [64] + [96] * 2 + [128]
#         self.is_downsample_block = (False, True, True, False, False, False, True)
#         self.transition_compression_ratio = 0.5
#         self.dropout_rate = 0.2

#         self.epsilon = 1e-4

#         self.model = self.build_network(input_shape)

#         assert self.n_layers == len(self.growth_rates)

#     def transition_layer(self, x, scope):
#         '''compresses concat features'''
#         x = layers.Conv2D(int(x.shape[-1] * self.transition_compression_ratio), kernel_size=2, strides=2, padding='same', use_bias=False, name=f'{scope}_conv1')(x)
#         x = layers.LayerNormalization()(x)
#         return x

#     def dense_block(self, x, inter_channels, out_channels, scope, block_idx, layer_idx):
#         x = layers.Conv2D(int(x.shape[-1]), kernel_size=7, strides=1, padding='same', activation='relu', use_bias=False, 
#                         name=f'{scope}_block{block_idx}_layer{layer_idx}_conv1')(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.Conv2D(inter_channels, kernel_size=1, strides=1, padding='same', activation='relu', use_bias=False, 
#                         name=f'{scope}_block{block_idx}_layer{layer_idx}_conv2')(x)
#         x = layers.BatchNormalization()(x)
#         x = layers.Conv2D(out_channels, kernel_size=1, strides=1, padding='same', activation='relu', use_bias=False, 
#                         name=f'{scope}_block{block_idx}_layer{layer_idx}_conv3')(x)
#         x = layers.BatchNormalization()(x)
#         return x

#     def dense_stage(self, x, i, layer_name):
#         layers_concat = [x]
#         for j in range(3):  # loop over layers in the dense block
#             dense_block = self.dense_block(
#                 x, inter_channels=(self.growth_rates[i] * 4), out_channels=self.growth_rates[i], 
#                 scope=layer_name, block_idx=i, layer_idx=j
#             )
#             layers_concat.append(dense_block)
#             x = layers.Concatenate(axis=-1, name=f'{layer_name}_concat_{j}')(layers_concat)
#             x = layers.LayerNormalization()(x)
#         return x

#     def build_network(self, input_shape):
#         inputs = keras.Input(shape=input_shape)

#         x = layers.Conv2D(self.num_init_features, kernel_size=7, strides=2, padding='same', activation='relu', use_bias=False, name='conv0')(inputs)
#         x = layers.MaxPooling2D(pool_size=(3, 3), strides=2, padding='same')(x)
#         x = layers.BatchNormalization()(x)

#         for i in range(self.n_layers - 1):
#             x = self.dense_stage(x, i=i, layer_name=f'dense_{i + 1}')
#             if i != 0 and self.is_downsample_block[i]:
#                 x = self.transition_layer(x, scope=f'trans_{i + 1}')
        
#         x = self.dense_stage(x, i=-1, layer_name='dense_final')

#         # x = layers.LayerNormalization(epsilon=self.epsilon, name='linear_batch')(x)
#         # x = layers.Dropout(rate=self.dropout_rate)(x)
#         x = layers.GlobalAveragePooling2D(name='global_avg_pool')(x)
#         outputs = layers.Dense(units=self.class_num, activation='softmax')(x)
#         return keras.Model(inputs, outputs)



In [3]:
# tf.config.optimizer.set_jit(False)  # disable xla

# print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# gpus = tf.config.list_physical_devices('GPU')
# if gpus:
#     try:
#         for gpu in gpus:
#             tf.config.experimental.set_memory_growth(gpu, True)
#     except RuntimeError as e:
#         print(e)

# with tf.device('/GPU:0'):
#     model = DenseNet().model
#     # model = DenseNetSimple()

#     print(model.summary())

#     learning_rate = 0.001
#     epochs = 300

#     optimizer = tf.keras.optimizers.Adam(
#         learning_rate=learning_rate,
#     )

#     model.compile(
#         optimizer=optimizer,
#         loss=tf.keras.losses.SparseCategoricalCrossentropy(),
#         metrics=['accuracy']
#     )

#     history = model.fit(
#         train_dataset, 
#         epochs=epochs, 
#         validation_data=test_dataset,
#         callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)]
#     )

### Simple CNN

In [None]:
# Load CIFAR-10 dataset
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
train_images, test_images = train_images / 255.0, test_images / 255.0
train_labels = [train_label[0] for train_label in train_labels]
test_labels = [test_label[0] for test_label in test_labels]
train_labels = np.array(train_labels, dtype=np.int8)
test_labels = np.array(test_labels, dtype=np.int8)
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.batch(64).prefetch(tf.data.AUTOTUNE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.batch(64).prefetch(tf.data.AUTOTUNE)

# Data augmentation
data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
])

# Model definition
def create_model():
    model = models.Sequential([
        data_augmentation,
        layers.Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4), input_shape=(32, 32, 3)),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.2),
        
        layers.Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.3),
        
        layers.Conv2D(256, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.Conv2D(256, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.4),
        
        layers.Flatten(),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(1e-4)),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])
    return model

# Compile the model
with tf.device('/GPU:0'):
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

    # Learning rate scheduler
    def scheduler(epoch, lr):
        if epoch < 10:
            return lr
        elif lr <= 1e-4:
            return 1e-4
        else:
            return float(lr * tf.math.exp(-0.1))

    lr_scheduler = callbacks.LearningRateScheduler(scheduler)

    # Train the model
    history = model.fit(
        train_dataset,
        validation_data=test_dataset,
        epochs=50,
        callbacks=[lr_scheduler, callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
        verbose=1
    )

    model.save('../models/simple_cnn.h5')

### Simple ResNet

In [5]:
# # Load CIFAR-10 dataset
# (train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
# train_images, test_images = train_images / 255.0, test_images / 255.0
# train_labels = [train_label[0] for train_label in train_labels]
# test_labels = [test_label[0] for test_label in test_labels]
# train_labels = np.array(train_labels, dtype=np.int8)
# test_labels = np.array(test_labels, dtype=np.int8)
# train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
# train_dataset = train_dataset.batch(64).prefetch(tf.data.AUTOTUNE)
# test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
# test_dataset = test_dataset.batch(64).prefetch(tf.data.AUTOTUNE)

# # Data augmentation
# data_augmentation = tf.keras.Sequential([
#     layers.RandomFlip("horizontal"),
#     layers.RandomRotation(0.1),
#     layers.RandomZoom(0.1),
# ])

# class ResidualBlock(layers.Layer):
#     def __init__(self, filters, kernel_size=3, strides=1):
#         super().__init__()
#         self.conv1 = layers.Conv2D(filters, kernel_size, strides=strides, padding="same", use_bias=False)
#         self.bn1 = layers.BatchNormalization()
#         self.relu = layers.ReLU()
#         self.conv2 = layers.Conv2D(filters, kernel_size, strides=1, padding="same", use_bias=False)
#         self.bn2 = layers.BatchNormalization()

#     def call(self, inputs):
#         x = self.conv1(inputs)
#         x = self.bn1(x)
#         x = self.relu(x)
#         x = self.conv2(x)
#         x = self.bn2(x)
#         return self.relu(x + inputs)  # residual connection

# def create_resnet_model():
#     inputs = layers.Input(shape=(32, 32, 3))
#     x = data_augmentation(inputs)
#     x = layers.Conv2D(64, (3, 3), padding="same", use_bias=False)(x)
#     x = layers.BatchNormalization()(x)
#     x = layers.ReLU()(x)

#     for _ in range(3):  # stack residual blocks
#         x = ResidualBlock(64)(x)

#     x = layers.Conv2D(128, (3, 3), strides=2, padding="same", use_bias=False)(x)
#     x = layers.BatchNormalization()(x)
#     x = layers.ReLU()(x)

#     for _ in range(3):
#         x = ResidualBlock(128)(x)

#     x = layers.GlobalAveragePooling2D()(x)
#     x = layers.Dropout(0.5)(x)
#     outputs = layers.Dense(10, activation="softmax")(x)

#     model = models.Model(inputs, outputs)
#     return model

# clr = tf.keras.optimizers.schedules.CosineDecayRestarts(
#     initial_learning_rate=1e-3,
#     first_decay_steps=2000,
#     t_mul=2.0,
#     m_mul=0.5,
#     alpha=1e-5
# )

# optimizer = tf.keras.optimizers.Adam(learning_rate=clr)

# # Compile the model
# model = create_resnet_model()
# model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
#               loss='sparse_categorical_crossentropy',
#               metrics=['accuracy'])

# # Train the model
# history = model.fit(
#     train_dataset,
#     validation_data=test_dataset,
#     epochs=50,
#     callbacks=[callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
#     # verbose=1
# )

# model.save('../models/simple_resnet.keras')

### Simple ViT

In [6]:
# # Hyperparameters
# IMAGE_SIZE = 32
# PATCH_SIZE = 4
# NUM_CLASSES = 10
# EMBED_DIM = 64
# NUM_HEADS = 4
# NUM_LAYERS = 8
# MLP_DIM = 128
# DROPOUT_RATE = 0.1
# BATCH_SIZE = 128
# EPOCHS = 50

# # Preprocess data
# (train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
# train_images, test_images = train_images / 255.0, test_images / 255.0
# train_labels = [train_label[0] for train_label in train_labels]
# test_labels = [test_label[0] for test_label in test_labels]
# train_labels = np.array(train_labels, dtype=np.int8)
# test_labels = np.array(test_labels, dtype=np.int8)
# train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
# train_dataset = train_dataset.batch(64).prefetch(tf.data.AUTOTUNE)
# test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
# test_dataset = test_dataset.batch(64).prefetch(tf.data.AUTOTUNE)

# # Vision Transformer Layers
# class PatchEmbedding(layers.Layer):
#     def __init__(self, patch_size, embed_dim):
#         super().__init__()
#         self.patch_size = patch_size
#         self.embed_dim = embed_dim
#         self.projection = layers.Conv2D(embed_dim, patch_size, patch_size, padding="valid")
#         self.flatten = layers.Reshape((-1, embed_dim))

#     def call(self, x):
#         patches = self.projection(x)
#         return self.flatten(patches)

# class TransformerBlock(layers.Layer):
#     def __init__(self, embed_dim, num_heads, mlp_dim, dropout_rate):
#         super().__init__()
#         self.attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
#         self.mlp = tf.keras.Sequential([
#             layers.Dense(mlp_dim, activation="relu"),
#             layers.Dense(embed_dim),
#         ])
#         self.norm1 = layers.LayerNormalization()
#         self.norm2 = layers.LayerNormalization()
#         self.dropout1 = layers.Dropout(dropout_rate)
#         self.dropout2 = layers.Dropout(dropout_rate)

#     def call(self, x):
#         attn_output = self.attn(x, x)
#         x = self.norm1(x + self.dropout1(attn_output))
#         mlp_output = self.mlp(x)
#         return self.norm2(x + self.dropout2(mlp_output))

# class VisionTransformer(models.Model):
#     def __init__(self, image_size, patch_size, num_classes, embed_dim, num_heads, num_layers, mlp_dim, dropout_rate):
#         super().__init__()
#         self.embed_dim = embed_dim
#         self.patch_embed = PatchEmbedding(patch_size, embed_dim)
#         self.cls_token = tf.Variable(tf.zeros((1, 1, embed_dim)), trainable=True)
#         self.pos_embed = tf.Variable(tf.random.normal((1, (image_size // patch_size) ** 2 + 1, embed_dim)), trainable=True)
#         self.transformer_blocks = [TransformerBlock(embed_dim, num_heads, mlp_dim, dropout_rate) for _ in range(num_layers)]
#         self.mlp_head = tf.keras.Sequential([
#             layers.LayerNormalization(),
#             layers.Dense(num_classes, activation='softmax'),
#         ])

#     def call(self, x):
#         batch_size = tf.shape(x)[0]
#         x = self.patch_embed(x)
#         cls_tokens = tf.broadcast_to(self.cls_token, (batch_size, 1, self.embed_dim))
#         x = tf.concat([cls_tokens, x], axis=1)
#         x += self.pos_embed
#         for block in self.transformer_blocks:
#             x = block(x)
#         cls_output = x[:, 0]
#         return self.mlp_head(cls_output)

# # Build model
# model = VisionTransformer(
#     image_size=IMAGE_SIZE,
#     patch_size=PATCH_SIZE,
#     num_classes=NUM_CLASSES,
#     embed_dim=EMBED_DIM,
#     num_heads=NUM_HEADS,
#     num_layers=NUM_LAYERS,
#     mlp_dim=MLP_DIM,
#     dropout_rate=DROPOUT_RATE,
# )

# clr = tf.keras.optimizers.schedules.CosineDecayRestarts(
#     initial_learning_rate=1e-3,
#     first_decay_steps=2000,
#     t_mul=2.0,
#     m_mul=0.5,
#     alpha=1e-5
# )

# model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=clr),
#     loss="sparse_categorical_crossentropy",
#     metrics=["accuracy"],
# )

# # Train model
# history = model.fit(
#     train_dataset,
#     validation_data=test_dataset,
#     epochs=EPOCHS,
#     callbacks=[callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
# )

# model.save('../models/simple_vit.keras')

### Plot Loss/Accuracy

In [None]:
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')

test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)

In [None]:
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 1.75])
plt.legend(loc='lower right')

test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)

### Comptetion Dataset Test

In [9]:
# # COMPETITION

# #########################################
# # DO NOT MODIFY THIS SECTION

# # load the competition data
# # The data is in the numpy array format:
# #   competition_images: (100,32,32,3) contains 100 images
# #   competition_labels: (100,1) contains class lables (0 to 9)
# import numpy as np
# competition_data = np.load('../comp-template/competition_data.npz') 
# competition_images = competition_data['competition_images']
# competition_labels = competition_data['competition_labels']


# #########################################
# # YOUR CODE/MODEL GOES HERE:

# # load your model and/or trained weights
# import tensorflow as tf
# from tensorflow import keras
# from keras import models

# my_model = models.load_model('../models/simple_cnn.keras')

# config = my_model.get_config()
# print(config)

# my_model.save('../comp-template/simple_cnn.h5')

# # # evaluate your model on the competition data
# # # make any adjustment to the data format as needed to run your model
# # # you must return accuracy of your model on the competition data 
# # competition_loss, competion_acc = my_model.evaluate(competition_images,  competition_labels)

# # # MUST PRINT OUT THE ACCURACY OF YOUR MODEL ON THE COMPETITION DATA
# # print('Accuracy:', competion_acc) 

