In [None]:
import os
import numpy as np
import shutil
import csv
import PIL
from PIL import Image
import pathlib
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D, BatchNormalization
from tensorflow.keras.layers.experimental.preprocessing import RandomFlip, RandomRotation
from tensorflow.keras.optimizers import Adam
from tensorflow_addons.optimizers import AdamW
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, CSVLogger
from tensorflow_estimator.python.estimator.canned.dnn import dnn_logit_fn_builder
import tensorflow_hub as hub
from tensorflow.keras.applications import ResNet152

from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

os.environ['CUDA_VISIBLE_DEVICES']='0' # The second
tf.config.set_soft_device_placement(True)


print("Version: ", tf.__version__)
print("Eager mode: ", tf.executing_eagerly())
print("Hub version: ", hub.__version__)
print("GPU is", "available" if tf.config.list_physical_devices("GPU") else "NOT AVAILABLE")

### Constants

In [None]:
folder_name = './dataset'
csv_file = 'newModel_log.csv'    
h5_file = 'newModel_weight.h5'  
load_file = 'newModel_weight.h5'  
acc_file = 'newModel_acc.png'   
loss_file = 'newModel_loss.png'  

image_size = 256
batch_size = 16

### Load data

In [None]:
data_dir = pathlib.Path(folder_name)
print(data_dir)

In [None]:
train_img_count = len(list(data_dir.glob('images_arranged91/train/*/*.jpg')))
vali_img_count = len(list(data_dir.glob('images_arranged91/vali/*/*.jpg')))
test_img_count = len(list(data_dir.glob('test/*.jpg')))
unlabel_img_count = len(list(data_dir.glob('unlabeled_data/*.jpg')))
print(f'training data size: {train_img_count}')
print(f'validation data size: {vali_img_count}')
print(f'testing data size: {test_img_count}')
print(f'unlabel data size: {unlabel_img_count}')

In [None]:
train_img_PosixPath = list(data_dir.glob('images_arranged91/train/*/*.jpg'))
vali_img_PosixPath = list(data_dir.glob('images_arranged91/vali/*/*.jpg'))
test_img_PosixPath = list(data_dir.glob('test/*.jpg'))
unlabel_img_PosixPath = list(data_dir.glob('unlabeled_data/*.jpg'))
print(len(train_img_PosixPath),train_img_PosixPath)
print(len(vali_img_PosixPath),vali_img_PosixPath)
print(len(test_img_PosixPath),test_img_PosixPath)
print(len(unlabel_img_PosixPath),unlabel_img_PosixPath)

In [None]:
# get labels of training images
train_img_name = []
train_img = []
train_label = []
vali_img_name = []
vali_img = []
vali_label = []
test_img_name = []
test_img = []
# unlabel_img_name = []
# unlabel_img = []

for i in range(len(train_img_PosixPath)):
    train_img_name.append(str(train_img_PosixPath[i]).split('/')[4])
    train_img.append(img_to_array(load_img(train_img_PosixPath[i], target_size=(image_size,image_size)))/255.)
    train_label.append(str(train_img_PosixPath[i]).split('/')[3])
print('Training data done.')

for i in range(len(vali_img_PosixPath)):
    vali_img_name.append(str(vali_img_PosixPath[i]).split('/')[4])
    vali_img.append(img_to_array(load_img(vali_img_PosixPath[i], target_size=(image_size,image_size)))/255.)
    vali_label.append(str(vali_img_PosixPath[i]).split('/')[3])
print('Validation data done.')

for i in range(len(test_img_PosixPath)):
    test_img_name.append(str(test_img_PosixPath[i]).split('/')[2])
    test_img.append(img_to_array(load_img(test_img_PosixPath[i], target_size=(image_size,image_size)))/255.)
print('Testing data done.')

In [None]:
my_seed = 7777
np.random.seed(my_seed)
np.random.shuffle(train_img_name)
np.random.seed(my_seed)
np.random.shuffle(train_img)
np.random.seed(my_seed)
np.random.shuffle(train_label)
np.random.seed(my_seed)
np.random.shuffle(vali_img_name)
np.random.seed(my_seed)
np.random.shuffle(vali_img)
np.random.seed(my_seed)
np.random.shuffle(vali_label)

In [None]:
X_train = np.array(train_img)
vali_train = np.array(vali_img)
X_train.shape

In [None]:
# one-hot encode the labels
train_label=np.array(train_label)
vali_label=np.array(vali_label)
y_train = tf.keras.utils.to_categorical(train_label, num_classes=4)
vali_label = tf.keras.utils.to_categorical(vali_label, num_classes=4)

y_train.shape

In [None]:
mean = np.mean(X_train)
std = np.std(X_train)

X_train = (X_train-mean)/(std)
X_train.shape

### Model Design

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=25,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    vertical_flip=False,
    zoom_range=0.2
)

train_data = datagen.flow(X_train, y_train, batch_size=batch_size)
valid_data = datagen.flow(vali_train, vali_label, batch_size=batch_size)

In [None]:
from classification_models.tfkeras import Classifiers
Classifiers.models_names()

In [None]:
from tensorflow.keras.applications import VGG16, VGG19, InceptionResNetV2, ResNet152
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.mobilenet import MobileNet
from tensorflow.keras.applications.densenet import DenseNet201
SeResNeXT101, preprocess_input = Classifiers.get('seresnext101')
SeResNet101, preprocess_input = Classifiers.get('seresnet101')
Resnext101, preprocess_input = Classifiers.get('resnext101') # not adopted
Mobilenetv2, preprocess_input = Classifiers.get('mobilenetv2')
Resnet152v2, preprocess_input = Classifiers.get('resnet152v2')
Xception, preprocess_input = Classifiers.get('xception')
Densenet169, preprocess_input = Classifiers.get('densenet169')



base_model = SeResNeXT101(
    weights = "imagenet", 
    include_top=False, 
    input_shape=(256,256, 3))
    
base_model.summary()

In [None]:
# Freeze the layers in the pre-trained model
for layer in base_model.layers[:-7]:
    layer.trainable = False

In [None]:
# Add new classification layers on top of the pre-trained model
# last_layer = 'conv_pw_13_relu'  #mobilenet
# last_layer = 'conv_7b_ac' #inceptionResnetv2
# last_layer ='mixed10' # inceptionV3
# last_layer ='conv5_block3_out' # resnet152
# last_layer ='relu' # densenet201
last_layer ='activation_165' # SeResNeXT101
# last_layer ='block14_sepconv2_act' # Xception
# last_layer ='out_relu' # mobilenetV2 
# last_layer ='activation_2349' # SeResNeT101 
# last_layer ='block5_pool' # vgg19
# last_layer ='activation_165' # SeResNeXT101 
# last_layer ='post_relu' # Resnet152V2
last_layer = base_model.get_layer(last_layer)

x = Flatten()(last_layer.output)
x = Dense(512, activation='relu')(x)
x = Dense(256, activation='relu')(x)
x = Dense(128, activation='relu')(x)
x = BatchNormalization()(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.5)(x)
output = Dense(4, activation='softmax')(x)  # Assuming 4 classes for classification

In [None]:
# Create a new model combining the pre-trained model with the new classification layers
model = Model(inputs=base_model.input, outputs=output)
model.summary()

In [None]:
lr = 1e-4
def lr_scheduler(epoch, lr):
    decay_rate = 0.1
    decay_step = 10
    if epoch % decay_step == 0 and epoch:
        return lr * decay_rate
    return lr

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, EarlyStopping, LearningRateScheduler
from tensorflow.keras.optimizers import Adam, SGD

checkpoint = ModelCheckpoint(filepath=h5_file, verbose=1, save_best_only=True, save_weights_only=True)
csv_logger = CSVLogger(filename=csv_file, separator=',', append=False)
earlystopping = EarlyStopping(monitor="val_loss", patience=30, verbose=1, mode='min')
optimizer = Adam(learning_rate=lr)
reduce_lr = LearningRateScheduler(lr_scheduler, verbose=1)

model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [None]:
history = model.fit(train_data, 
                        epochs=500,
                        verbose=1, 
                        validation_data=valid_data,
                        # class_weight=class_weight,
                        callbacks=[checkpoint, csv_logger, earlystopping,reduce_lr])

### Plot

In [None]:
import pandas as pd
# data = pd.read_csv(csv_file)
# epoch = data[data.columns[0]]
# accuracy = data[data.columns[1]]
# loss = data[data.columns[2]]
# val_accuracy = data[data.columns[3]]
# val_loss = data[data.columns[4]]

# import matplotlib.pyplot as plt

# acc = history.history['accuracy']
# val_acc = history.history['val_accuracy']
# loss = history.history['loss']
# val_loss = history.history['val_loss']
# epochs = range(1, len(acc) + 1)
# plt.figure(1)
# # "bo" is for "blue dot"
# plt.plot(epochs, loss, 'bo', label='Training loss')
# # b is for "solid blue line"
# plt.plot(epochs, val_loss, 'b', label='Validation loss')
# plt.title('Training and validation loss')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.legend()
# plt.show()

# plt.figure(2)
# acc_values = history.history['accuracy']
# val_acc_values = history.history['val_accuracy']
# plt.plot(epochs, acc, 'bo', label='Training acc')
# plt.plot(epochs, val_acc, 'b', label='Validation acc')
# plt.title('Training and validation accuracy')
# plt.xlabel('Epochs')
# plt.ylabel('Accuracy')
# plt.legend()
# plt.show()


### Predict

In [None]:
x_test = np.array(test_img)
x_test.shape


In [None]:
mean = np.mean(x_test)
std = np.std(x_test)

x_test = (x_test-mean)/(std)

In [None]:
from tensorflow.keras.models import load_model
model.load_weights(h5_file)

In [None]:
y_test = model.predict(x_test)
y_test = np.argmax(y_test, axis=1)
y_test

In [None]:
# Write predictions to CSV file
output_file = 'output.csv'
with open(output_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Name', 'Type'])
    for i in range(len(test_img_name)):
        writer.writerow([test_img_name[i], y_test[i]])