In [2]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import numpy as np
import cv2
from tensorflow import keras
import random
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.models import Model,load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras import callbacks
from sklearn.model_selection import train_test_split
import math
from sklearn.metrics import classification_report,roc_auc_score,matthews_corrcoef
import argparse
import time
import sys




In [3]:
image_data_dir = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\check\\Store Images"
gen_data_dir= "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\check\\Gen data"

In [4]:
def save_images_to_disk(image_data, save_path):
    os.makedirs(save_path, exist_ok=True) 
    for i, image in enumerate(image_data):
        np.save(os.path.join(save_path, f'image_{i}.npy'), image)

def load_images_from_disk(load_path):
    image_data = []
    for filename in os.listdir(load_path):
        if filename.endswith('.npy'):
            image = np.load(os.path.join(load_path, filename))
            image_data.append(image)
    return image_data

In [5]:
class config1:
    '''
    define parameters & paths
    '''
    save_tanker_target_train_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\train\\TA"
    save_tanker_target_test_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\test\\TA"
    save_container_target_train_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\train\\CS"
    save_container_target_test_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\test\\CS"
    save_bulkcarrier_target_train_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\train\\BC"
    save_bulkcarrier_target_test_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\test\\BC"
    save_generalcargo_target_train_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\train\\GC"
    save_generalcargo_target_test_path = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\DVTR\\DVTR\\UAV-view\\test\\GC"

In [6]:
seed = 409
np.random.seed(seed)
random.seed(seed)
os.environ['PYTHONHASHSEED']=str(seed)
tf.random.set_seed(seed)

In [7]:
class Dataset:
    def channel4to3(self,img):
        if len(img.shape) > 2 and img.shape[2] == 4:
            img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
        return img


    def rt_data_label(self,dir,label,path,reduce=True):
        all_digits = []
        all_labels = []
        for item in dir:
            img = cv2.imread(os.path.join(path, item), cv2.IMREAD_UNCHANGED)
            if reduce:
                img = self.channel4to3(img)
            all_digits.append(img)
            all_labels.append(label)
        all_digits = np.array(all_digits)
        all_labels = np.array(all_labels)
        return all_digits,all_labels

    def uav_view_dataset(self):
        '''
        get UAV-view image data ready
        :return: data & labels
        '''
        # tanker class
        tanker_dir = os.listdir(config1.save_tanker_target_train_path)
        tanker_dir_rest = os.listdir(config1.save_tanker_target_test_path)
        all_digits_tanker,all_labels_tanker=self.rt_data_label(tanker_dir,0,config1.save_tanker_target_train_path)
        all_digits_tanker_rest, all_labels_tanker_rest = self.rt_data_label(tanker_dir_rest, 0, config1.save_tanker_target_test_path,reduce=False)

        # container class
        container_dir = os.listdir(config1.save_container_target_train_path)
        container_dir_rest = os.listdir(config1.save_container_target_test_path)
        all_digits_container, all_labels_container = self.rt_data_label(container_dir, 1, config1.save_container_target_train_path,reduce=True)
        all_digits_container_rest, all_labels_container_rest = self.rt_data_label(container_dir_rest, 1,
                                                                            config1.save_container_target_test_path,reduce=True)

        # bulkcarrier class
        bulkcarrier_dir = os.listdir(config1.save_bulkcarrier_target_train_path)
        bulkcarrier_dir_rest = os.listdir(config1.save_bulkcarrier_target_test_path)
        all_digits_bulkcarrier, all_labels_bulkcarrier = self.rt_data_label(bulkcarrier_dir, 2,
                                                                        config1.save_bulkcarrier_target_train_path,reduce=False)
        all_digits_bulkcarrier_rest, all_labels_bulkcarrier_rest = self.rt_data_label(bulkcarrier_dir_rest, 2,
                                                                                  config1.save_bulkcarrier_target_test_path,reduce=False)

        # general cargo class
        generalcargo_dir = os.listdir(config1.save_generalcargo_target_train_path)
        generalcargo_dir_rest = os.listdir(config1.save_generalcargo_target_test_path)
        all_digits_generalcargo, all_labels_generalcargo = self.rt_data_label(generalcargo_dir, 3,
                                                                            config1.save_generalcargo_target_train_path,reduce=True)
        all_digits_generalcargo_rest, all_labels_generalcargo_rest = self.rt_data_label(generalcargo_dir_rest, 3,
                                                                                      config1.save_generalcargo_target_test_path,reduce=True)

        all_digits = np.concatenate(
            [all_digits_tanker, all_digits_container, all_digits_bulkcarrier, all_digits_generalcargo], axis=0)
        all_digits = (all_digits.astype("float32") / 255.0) * 2 - 1

        all_digits_rest = np.concatenate(
            [all_digits_tanker_rest, all_digits_container_rest, all_digits_bulkcarrier_rest, all_digits_generalcargo_rest], axis=0)
        all_digits_rest = (all_digits_rest.astype("float32") / 255.0) * 2 - 1

        all_labels = np.concatenate(
            [all_labels_tanker, all_labels_container, all_labels_bulkcarrier, all_labels_generalcargo], axis=0)
        all_labels = keras.utils.to_categorical(all_labels, 4)

        all_labels_rest = np.concatenate(
            [all_labels_tanker_rest, all_labels_container_rest, all_labels_bulkcarrier_rest, all_labels_generalcargo_rest], axis=0)
        all_labels_rest = keras.utils.to_categorical(all_labels_rest, 4)
        
        save_images_to_disk(all_digits, os.path.join(image_data_dir, "train_images"))
        save_images_to_disk(all_digits_rest, os.path.join(image_data_dir, "test_images"))

        return all_labels,all_labels_rest

In [8]:
dataset = Dataset()

# Then call the uav_view_dataset method on that instance
all_labels, all_labels_rest = dataset.uav_view_dataset()
all_labels.shape

(200, 4)

In [9]:
all_labels_rest.shape

(681, 4)

In [10]:
train_images=load_images_from_disk(os.path.join(image_data_dir, "train_images"))

In [11]:
x_train=np.array(train_images)
x_train.shape

(200, 256, 256, 3)

In [12]:
class config2:
    size_per_class = 50*20
    noise_dim = 128
    uav_view_generator = "C:\\Users\\venkat\\OneDrive\\Documents\\uni\\Mini Project\\save_gen\\save_gen\\UAV-view\\generator.h5"

In [13]:
def build_model():
    '''
    build the generator architecture
    :return: model of generator
    '''
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.InputLayer(input_shape=(132,)))
    model.add(tf.keras.layers.Dense(8 * 8 * 256))
    model.add(tf.keras.layers.Reshape((8, 8, 256)))
    model.add(tf.keras.layers.Conv2DTranspose(256, 5, 2, activation=tf.nn.relu, padding='same'))
    model.add(tf.keras.layers.Conv2DTranspose(128, 5, 2, activation=tf.nn.relu, padding='same'))
    model.add(tf.keras.layers.Conv2DTranspose(64, 5, 2, activation=tf.nn.relu, padding='same'))
    model.add(tf.keras.layers.Conv2DTranspose(32, 5, 2, activation=tf.nn.relu, padding='same'))
    model.add(tf.keras.layers.Conv2DTranspose(16, 5, 2, activation=tf.nn.relu, padding='same'))
    model.add(tf.keras.layers.Conv2DTranspose(3, 3, 1, activation=tf.nn.tanh, padding='same'))

    # you can either compile or not the model
    model.compile()
    return model

In [14]:
class generate_data:
    def gen_labels(self,shiptype,size):
        '''
        functio to generate labels for given type of vessels
        :param shiptype: ship type
        :param size: sample size
        :return: array of label
        '''
        if shiptype=='tanker':
            a = np.array([1, -1, -1, -1])
            b = np.tile(a, size)
            b = np.reshape(b, newshape=(size, 4))
            return b
        elif shiptype=='container':
            a = np.array([-1, 1, -1, -1])
            b = np.tile(a, size)
            b = np.reshape(b, newshape=(size, 4))
            return b
        elif shiptype=='bulkcarrier':
            a = np.array([-1, -1, 1, -1])
            b = np.tile(a, size)
            b = np.reshape(b, newshape=(size, 4))
            return b
        elif shiptype=='general cargo':
            a = np.array([-1, -1, -1, 1])
            b = np.tile(a, size)
            b = np.reshape(b, newshape=(size, 4))
            return b

    def uav_view_sample(self):
        '''
        function to generate UAV-view images via trained GAN model
        :return: data and label
        '''
        generator = build_model()
        generator.load_weights(filepath=config2.uav_view_generator)

        # tanker
        noise = np.random.uniform(-1, 1, size=[config2.size_per_class, config2.noise_dim])
        y_tanker = self.gen_labels('tanker', config2.size_per_class)
        random_labels = tf.concat(
            [noise, y_tanker], axis=1
        )
        x_tanker = generator.predict(random_labels)

        # container
        noise = np.random.uniform(-1, 1, size=[config2.size_per_class, config2.noise_dim])
        y_container = self.gen_labels('container', config2.size_per_class)
        random_labels = tf.concat(
            [noise, y_container], axis=1
        )
        x_container = generator.predict(random_labels)

        # bulkcarrier
        noise = np.random.uniform(-1, 1, size=[config2.size_per_class, config2.noise_dim])
        y_bulkcarrier = self.gen_labels('bulkcarrier', config2.size_per_class)
        random_labels = tf.concat(
            [noise, y_bulkcarrier], axis=1
        )
        x_bulkcarrier = generator.predict(random_labels)

        # general cargo
        noise = np.random.uniform(-1, 1, size=[config2.size_per_class, config2.noise_dim])
        y_generalcargo = self.gen_labels('general cargo', config2.size_per_class)
        random_labels = tf.concat(
            [noise, y_generalcargo], axis=1
        )
        x_generalcargo = generator.predict(random_labels)

        x_comb = np.concatenate([x_tanker,x_container,x_bulkcarrier,x_generalcargo],axis=0)
        y_comb = np.concatenate([y_tanker,y_container,y_bulkcarrier,y_generalcargo],axis=0)

        #print(np.array(x_comb))
        save_images_to_disk(x_comb, os.path.join(gen_data_dir, "generated_samples"))

        return y_comb

In [15]:
generate=generate_data()

In [16]:
y_comb=generate.uav_view_sample()





In [17]:
y_comb

array([[ 1, -1, -1, -1],
       [ 1, -1, -1, -1],
       [ 1, -1, -1, -1],
       ...,
       [-1, -1, -1,  1],
       [-1, -1, -1,  1],
       [-1, -1, -1,  1]])

In [18]:
y_comb.shape

(4000, 4)

In [19]:
gen_images=load_images_from_disk(os.path.join(gen_data_dir, "generated_samples"))

In [20]:
x_train_generated=np.array(gen_images)
x_train_generated.shape

(4000, 256, 256, 3)

In [21]:
y_comb = 0.5 * (y_comb + 1)
print(y_comb)

[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 ...
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]]


In [22]:
x_train.shape

(200, 256, 256, 3)

In [23]:
x_train_generated.shape

(4000, 256, 256, 3)

In [24]:
len(x_train)

200

In [25]:
all_labels.shape

(200, 4)

In [26]:
y_comb.shape

(4000, 4)

In [27]:
x_train = np.concatenate([x_train, x_train_generated], axis=0)

In [28]:
y_train = np.concatenate([all_labels, y_comb], axis=0)

In [29]:
y_train.shape

(4200, 4)

In [30]:
x_train.shape

(4200, 256, 256, 3)

In [31]:
np.random.seed(1239)
x_train = np.random.permutation(x_train)

In [32]:
np.random.seed(1239)
y_train = np.random.permutation(y_train)

In [33]:
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=42)

In [34]:
x_train.shape

(3780, 256, 256, 3)

In [35]:
x_val.shape

(420, 256, 256, 3)

In [36]:
y_train.shape

(3780, 4)

In [37]:
y_val.shape

(420, 4)

In [38]:
test_images=load_images_from_disk(os.path.join(image_data_dir, "test_images"))
x_test=np.array(test_images)
x_test.shape

(681, 256, 256, 3)

In [39]:
all_labels_rest.shape

(681, 4)

In [40]:
def resize(array, size):
    rsize_li = []
    for img in array:
        rsize_img = cv2.resize(img, size, interpolation=cv2.INTER_AREA)
        rsize_li.append(rsize_img)
    rsize_array = np.array(rsize_li)
    return rsize_array

In [41]:
def euclideanDistance(x, y):
    dist = tf.linalg.norm(x - y, axis=1)
    return dist

In [42]:
def contrastive_loss_fn(y_true, y_pred):
    size = tf.shape(y_true)[0]

    first_half_y_true = y_true[:int(size/2)]
    second_half_y_true = y_true[int(size/2):]
    first_half_y_pred = y_pred[:int(size/2)]
    second_half_y_pred = y_pred[int(size/2):]
    first_half_y_true_ = tf.math.argmax(first_half_y_true,axis=1)
    second_half_y_true_ = tf.math.argmax(second_half_y_true,axis=1)

    label = tf.math.equal(first_half_y_true_,second_half_y_true_)
    label = 1-tf.cast(label, tf.float32)
    label = tf.cast(label, tf.float32)
    dis = euclideanDistance(first_half_y_pred,second_half_y_pred)
    margin = 2.0
    t = tf.clip_by_value(margin-dis, clip_value_min=0, clip_value_max=math.inf)
    loss_contrastive = tf.math.reduce_mean((1-label)*tf.math.pow(dis,2)+label*tf.math.pow(t,2))
    return loss_contrastive

In [43]:
def train_model(x_train, x_val, y_train, y_val, args, SaveModlFile):
    set_seed()
    base_model = InceptionV3(weights='imagenet', include_top=False)
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    predictions_1 = Dense(128, activation='relu')(x)
    predictions_2 = Dense(4, activation='softmax')(predictions_1)
    model1 = Model(inputs=base_model.input, outputs=predictions_1)
    for layer in base_model.layers:
        layer.trainable = False
    # pre-training
    model1.compile(optimizer=RMSprop(learning_rate=args['lr']), loss=contrastive_loss_fn, metrics=['accuracy'])
    model1.fit(x=x_train, y=y_train, epochs=args['pre_epoch'], batch_size=args['batchsize'], verbose=1)

    model2 = Model(inputs=base_model.input, outputs=predictions_2)
    for layer in model2.layers[:args['l_f']]:
        layer.trainable = False
    for layer in model2.layers[args['l_f']:]:
        layer.trainable = True

    model2.compile(optimizer=RMSprop(learning_rate=args['lr'], momentum=0.9), loss='categorical_crossentropy',
                   metrics=['accuracy'])
    mcp_save = callbacks.ModelCheckpoint(SaveModlFile, save_best_only=True, monitor='val_loss', mode='min')

    model2.fit(x=x_train, y=y_train, epochs=args['ft_epoch'], batch_size=args['batchsize'],
              validation_data=(x_val, y_val), callbacks=[mcp_save], verbose=1)
    return None

In [44]:
def predict(x_test,SaveModlFile):
    model = load_model(SaveModlFile)
    predictions_test = model.predict(x_test)
    return predictions_test

In [45]:
def ensemble(x_test_1,x_test_2,args):
    predictions_test_1 = predict(x_test_1, "C:\\Users\\venkat\\Downloads\\save\\save\\model_cnn1.h5")
    predictions_test_2 = predict(x_test_2, "C:\\Users\\venkat\\Downloads\\save\\save\\model_cnn2.h5")
    predictions_test = (predictions_test_1 + predictions_test_2) / 2
    return predictions_test

In [46]:
def evaluate(GroundTruth,Prediction):
    GroundTruth_Idx = np.argmax(GroundTruth, axis=1).tolist()
    Prediction_Idx = np.argmax(Prediction, axis=1).tolist()
    cr = classification_report(GroundTruth_Idx, Prediction_Idx, output_dict=True)
    MCC = matthews_corrcoef(GroundTruth_Idx, Prediction_Idx)
    AUC = roc_auc_score(GroundTruth, Prediction, average='macro')
    return round(cr['accuracy'],4),round(cr['macro avg']['precision'],4),round(cr['macro avg']['recall'],4),round(cr['macro avg']['f1-score'],4),round(MCC,4),round(AUC,4)

In [47]:
y_test=all_labels_rest

In [48]:
y_test.shape

(681, 4)

In [49]:
args = {
            'batchsize': 64,
            'l_f': 249,
            'pre_epoch': 20,
            'ft_epoch': 100,
            'lr': 0.0001,
            'S': 256,
            'M': 512,
            'train': True,
            'SaveModlFile_1': "./save/model_cnn1.h5",
            'SaveModlFile_2': "./save/model_cnn2.h5"
        }

In [50]:
def gpu_setting(enable_gpu=True):
    if enable_gpu:
        os.environ["CUDA_VISIBLE_DEVICES"] = "0"
        print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
    else:
        os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
        os.environ["CUDA_VISIBLE_DEVICES"] = ""

In [51]:
gpu_setting(enable_gpu=True)

Num GPUs Available:  0


In [59]:
if _name_ == '_main_':
    if args is not None:
        # train CNN1
        if args['train']:
            x_train_1, x_val_1, y_train_1, y_val_1, x_test_1, y_test = get_data(args['S'], reshape=True)
            train_model(x_train_1, x_val_1, y_train_1, y_val_1, args, args['SaveModlFile_1'])

        prediction_test_cnn1 = predict(x_test, "C:\\Users\\venkat\\Downloads\\save\\save\\model_cnn1.h5")
        accuracy, precision, recall, f1_score, MCC, AUC = evaluate(y_test, prediction_test_cnn1)
        print('Performance for CNN1:')
        print('Accuracy: ', accuracy)
        print('Precision: ', precision)
        print('Recall: ', recall)
        print('F1-score: ', f1_score)
        print('MCC: ', MCC)
        print('AUC: ', AUC)
        print('#########################')

        # train CNN2
        x_test_2 = resize(x_test, (args['M'], args['M']))
        if args['train']:
            x_train_2 = resize(x_train_1, (args['M'], args['M']))
            x_val_2 = resize(x_val_1, (args['M'], args['M']))
            train_model(x_train_2, x_val_2, y_train_1, y_val_1, args, args['SaveModlFile_2'])

        prediction_test_cnn2 = predict(x_test_2, "C:\\Users\\venkat\\Downloads\\save\\save\\model_cnn2.h5")
        accuracy, precision, recall, f1_score, MCC, AUC = evaluate(y_test, prediction_test_cnn2)
        print('Performance for CNN2:')
        print('Accuracy: ', accuracy)
        print('Precision: ', precision)
        print('Recall: ', recall)
        print('F1-score: ', f1_score)
        print('MCC: ', MCC)
        print('AUC: ', AUC)
        print('#########################')

        # ensemble
        prediction_test_vael = ensemble(x_test, x_test_2, args)
        accuracy, precision, recall, f1_score, MCC, AUC = evaluate(y_test, prediction_test_vael)
        print('Performance for VAEL:')
        print('Accuracy: ', accuracy)
        print('Precision: ', precision)
        print('Recall: ', recall)
        print('F1-score: ', f1_score)
        print('MCC: ', MCC)
        print('AUC: ', AUC)
        print(conf_matrix)


Performance for CNN1:
Accuracy: 0.9073
Precision: 0.9090
Recall: 0.9072
F1-score: 0.9124
MCC: 0.8762
AUC: 0.9864
Performance for CNN2:
Accuracy: 0.9032
Precision: 0.9046
Recall: 0.9030
F1-score: 0.9080
MCC: 0.8619
AUC: 0.9724
Performance for VAEL:
Accuracy: 0.9186
Precision: 0.9200
Recall: 0.9184
F1-score: 0.9191
MCC: 0.8876
AUC: 0.9942
[[150   0   7   6]
 [  3 168   2   1]
 [  4   0 153  14]
 [  5   0  15 153]]
