In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, BatchNormalization, Flatten, Dropout, Activation, Input, Lambda
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.applications import InceptionV3
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "0"


gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
cpus = tf.config.experimental.list_physical_devices(device_type='CPU')
print(gpus, cpus)

for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)


In [None]:
num_classes = 2
input_shape = (150, 150, 3)

data_x = np.load('../Data/COVID CXR/covid_lung_Unet_Crop_finish_150_X.npy')
data_y = np.load('../Data/COVID CXR/covid_lung_Unet_Crop_finish_150_Y.npy')



In [None]:
data_y

In [None]:
index = [k for k in range(data_x.shape[0])]
np.random.shuffle(index) 


data_x2 = data_x[index]
data_y2 = data_y[index]

In [None]:
data_y2

In [None]:
data_x2 = data_x2 / 255.0
data_y2 = tf.keras.utils.to_categorical(data_y2, num_classes)

In [None]:
data_y2

In [None]:
def window_partition(x, window_size):
    _, height, width, channels = x.shape
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        x, shape=(-1, patch_num_y, window_size, patch_num_x, window_size, channels)
    )
    x = tf.transpose(x, (0, 1, 3, 2, 4, 5))
    windows = tf.reshape(x, shape=(-1, window_size, window_size, channels))
    return windows


def window_reverse(windows, window_size, height, width, channels):
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        windows,
        shape=(-1, patch_num_y, patch_num_x, window_size, window_size, channels),
    )
    x = tf.transpose(x, perm=(0, 1, 3, 2, 4, 5))
    x = tf.reshape(x, shape=(-1, height, width, channels))
    return x


class DropPath(layers.Layer):
    def __init__(self, drop_prob=None, **kwargs):
        super().__init__(**kwargs)
        self.drop_prob = drop_prob

    def call(self, x):
        input_shape = tf.shape(x)
        batch_size = input_shape[0]
        rank = x.shape.rank
        shape = (batch_size,) + (1,) * (rank - 1)
        random_tensor = (1 - self.drop_prob) + tf.random.uniform(shape, dtype=x.dtype)
        path_mask = tf.floor(random_tensor)
        output = tf.math.divide(x, 1 - self.drop_prob) * path_mask
        return output


In [None]:
class WindowAttention(layers.Layer):
    def __init__(
        self, dim, window_size, num_heads, qkv_bias=True, dropout_rate=0.0, prefix='', **kwargs
    ):
        super().__init__(**kwargs)
        self.dim = dim
        self.window_size = window_size
        self.num_heads = num_heads
        self.scale = (dim // num_heads) ** -0.5
        self.prefix = prefix
        self.qkv = layers.Dense(dim * 3, use_bias=qkv_bias, name=f'{self.prefix}/attn/qkv')
        self.dropout = layers.Dropout(dropout_rate)
        self.proj = layers.Dense(dim, name=f'{self.prefix}/attn/proj')

    def build(self, input_shape):
        num_window_elements = (2 * self.window_size[0] - 1) * (
            2 * self.window_size[1] - 1
        )
        self.relative_position_bias_table = self.add_weight(f'{self.prefix}/attn/relative_position_bias_table',
            shape=(num_window_elements, self.num_heads),
            initializer=tf.initializers.Zeros(),
            trainable=True,
        )
        coords_h = np.arange(self.window_size[0])
        coords_w = np.arange(self.window_size[1])
        coords_matrix = np.meshgrid(coords_h, coords_w, indexing="ij")
        coords = np.stack(coords_matrix)
        coords_flatten = coords.reshape(2, -1)
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.transpose([1, 2, 0])
        relative_coords[:, :, 0] += self.window_size[0] - 1
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)

        self.relative_position_index = tf.Variable(
            initial_value=tf.convert_to_tensor(relative_position_index), trainable=False,
            name=f'{self.prefix}/attn/relative_position_index'
        )

    def call(self, x, mask=None):
        _, size, channels = x.shape
        head_dim = channels // self.num_heads
        x_qkv = self.qkv(x)
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, 3, self.num_heads, head_dim))
        x_qkv = tf.transpose(x_qkv, perm=(2, 0, 3, 1, 4))
        q, k, v = x_qkv[0], x_qkv[1], x_qkv[2]
        q = q * self.scale
        k = tf.transpose(k, perm=(0, 1, 3, 2))
        attn = q @ k

        num_window_elements = self.window_size[0] * self.window_size[1]
        relative_position_index_flat = tf.reshape(
            self.relative_position_index, shape=(-1,)
        )
        relative_position_bias = tf.gather(
            self.relative_position_bias_table, relative_position_index_flat
        )
        relative_position_bias = tf.reshape(
            relative_position_bias, shape=(num_window_elements, num_window_elements, -1)
        )
        relative_position_bias = tf.transpose(relative_position_bias, perm=(2, 0, 1))
        attn = attn + tf.expand_dims(relative_position_bias, axis=0)

        if mask is not None:
            nW = mask.get_shape()[0]
            mask_float = tf.cast(
                tf.expand_dims(tf.expand_dims(mask, axis=1), axis=0), tf.float32
            )
            attn = (
                tf.reshape(attn, shape=(-1, nW, self.num_heads, size, size))
                + mask_float
            )
            attn = tf.reshape(attn, shape=(-1, self.num_heads, size, size))
            attn = tf.keras.activations.softmax(attn, axis=-1)
        else:
            attn = tf.keras.activations.softmax(attn, axis=-1)
        attn = self.dropout(attn)

        x_qkv = attn @ v
        x_qkv = tf.transpose(x_qkv, perm=(0, 2, 1, 3))
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, channels))
        x_qkv = self.proj(x_qkv)
        x_qkv = self.dropout(x_qkv)
        return x_qkv


In [None]:
class SwinTransformer_Block(layers.Layer):
    def __init__(
        self,
        dim,
        num_patch,
        num_heads,
        window_size=7,
        shift_size=0,
        num_mlp=1024,
        qkv_bias=True,
        dropout_rate=0.0,
        prefix='',
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.dim = dim  # number of input dimensions
        self.num_patch = num_patch  # number of embedded patches
        self.num_heads = num_heads  # number of attention heads
        self.window_size = window_size  # size of window
        self.shift_size = shift_size  # size of window shift
        self.num_mlp = num_mlp  # number of MLP nodes
        self.prefix = prefix
        self.norm1 = layers.LayerNormalization(epsilon=1e-5, name=f'{self.prefix}/norm1')
        self.attn = WindowAttention(
            dim,
            window_size=(self.window_size, self.window_size),
            num_heads=num_heads,
            qkv_bias=qkv_bias,
            dropout_rate=dropout_rate,
            prefix=self.prefix
        )
        self.drop_path = DropPath(dropout_rate)
        self.norm2 = layers.LayerNormalization(epsilon=1e-5, name=f'{self.prefix}/norm2')
        
        self.mlp = tf.keras.Sequential(
            [
                layers.Dense(num_mlp, name=f'{prefix}/mlp/fc1'),
                layers.Activation(tf.keras.activations.gelu),
                layers.Dropout(dropout_rate),
                layers.Dense(dim, name=f'{prefix}/mlp/fc2'),
                layers.Dropout(dropout_rate),
            ]
        )

        if min(self.num_patch) < self.window_size:
            self.shift_size = 0
            self.window_size = min(self.num_patch)

    def build(self, input_shape):
        if self.shift_size == 0:
            self.attn_mask = None
        else:
            height, width = self.num_patch
            h_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            w_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            mask_array = np.zeros((1, height, width, 1))
            count = 0
            for h in h_slices:
                for w in w_slices:
                    mask_array[:, h, w, :] = count
                    count += 1
            mask_array = tf.convert_to_tensor(mask_array)

            # mask array to windows
            mask_windows = window_partition(mask_array, self.window_size)
            mask_windows = tf.reshape(
                mask_windows, shape=[-1, self.window_size * self.window_size]
            )
            attn_mask = tf.expand_dims(mask_windows, axis=1) - tf.expand_dims(
                mask_windows, axis=2
            )
            attn_mask = tf.where(attn_mask != 0, -100.0, attn_mask)
            attn_mask = tf.where(attn_mask == 0, 0.0, attn_mask)
            self.attn_mask = tf.Variable(initial_value=attn_mask, trainable=False, name=f'{self.prefix}/attn_mask')

    def call(self, x):
        height, width = self.num_patch
        _, num_patches_before, channels = x.shape
        x_skip = x
        x = self.norm1(x)
        x = tf.reshape(x, shape=(-1, height, width, channels))
        if self.shift_size > 0:
            shifted_x = tf.roll(
                x, shift=[-self.shift_size, -self.shift_size], axis=[1, 2]
            )
        else:
            shifted_x = x

        x_windows = window_partition(shifted_x, self.window_size)
        x_windows = tf.reshape(
            x_windows, shape=(-1, self.window_size * self.window_size, channels)
        )
        attn_windows = self.attn(x_windows, mask=self.attn_mask)

        attn_windows = tf.reshape(
            attn_windows, shape=(-1, self.window_size, self.window_size, channels)
        )
        shifted_x = window_reverse(
            attn_windows, self.window_size, height, width, channels
        )
        if self.shift_size > 0:
            x = tf.roll(
                shifted_x, shift=[self.shift_size, self.shift_size], axis=[1, 2]
            )
        else:
            x = shifted_x

        x = tf.reshape(x, shape=(-1, height * width, channels))
        x = self.drop_path(x)
        x = x_skip + x
        x_skip = x
        x = self.norm2(x)
        x = self.mlp(x)
        x = self.drop_path(x)
        x = x_skip + x
        return x


In [None]:
class PatchExtract(layers.Layer):
    def __init__(self, patch_size, **kwargs):
        super().__init__(**kwargs)
        self.patch_size_x = patch_size[0]
        self.patch_size_y = patch_size[0]

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=(1, self.patch_size_x, self.patch_size_y, 1),
            strides=(1, self.patch_size_x, self.patch_size_y, 1),
            rates=(1, 1, 1, 1),
            padding="VALID",
        )
        patch_dim = patches.shape[-1]
        patch_num = patches.shape[1]
        return tf.reshape(patches, (batch_size, patch_num * patch_num, patch_dim))


class PatchEmbedding(layers.Layer):
    def __init__(self, num_patch, embed_dim, **kwargs):
        super().__init__(name='patch_embed', **kwargs)
        self.num_patch = num_patch
        self.proj = layers.Dense(embed_dim, name='proj')
        self.pos_embed = layers.Embedding(input_dim=num_patch, output_dim=embed_dim, name='embedding')

    def call(self, patch):
        pos = tf.range(start=0, limit=self.num_patch, delta=1)
        return self.proj(patch) + self.pos_embed(pos)


class PatchMerging(tf.keras.layers.Layer):
    def __init__(self, num_patch, embed_dim, prefix=''):
        super().__init__()
        self.num_patch = num_patch
        self.embed_dim = embed_dim
        self.linear_trans = layers.Dense(2 * embed_dim, use_bias=False, name=f'{prefix}/downsample/reduction')

    def call(self, x):
        height, width = self.num_patch
        _, _, C = x.get_shape().as_list()
        x = tf.reshape(x, shape=(-1, height, width, C))
        x0 = x[:, 0::2, 0::2, :]
        x1 = x[:, 1::2, 0::2, :]
        x2 = x[:, 0::2, 1::2, :]
        x3 = x[:, 1::2, 1::2, :]
        x = tf.concat((x0, x1, x2, x3), axis=-1)
        x = tf.reshape(x, shape=(-1, (height // 2) * (width // 2), 4 * C))
        return self.linear_trans(x)


In [None]:
patch_size = (3, 3)  # 2-by-2 sized patches
dropout_rate = 0.03  # Dropout rate
num_heads = 8  # Attention heads
embed_dim = 128  # Embedding dimension
num_mlp = 256  # MLP layer size
qkv_bias = True  # Convert embedded patches to query, key, and values with a learnable additive value
window_size = 2  # Size of attention window
shift_size = 1  # Size of shifting window
image_dimension = 150  # Initial image size



learning_rate = 1e-3
batch_size = 32
num_epochs = 300
validation_split = 0.2
weight_decay = 0.0001
label_smoothing = 0.1


In [None]:
def CNNSwinTransformer3(input_size, patch_size):
    inputs = layers.Input(input_size)

    
    cnn_model = InceptionV3(include_top=False, input_tensor=inputs, weights=None)
    cnn_outputs = cnn_model.get_layer('mixed1').output
    x1 = layers.GlobalAveragePooling2D(name="avg_pool")(cnn_outputs)
    x1 = layers.BatchNormalization()(x1)
    x1 = layers.Dense(64)(x1)
    
    
    # swin transformer

    input_size2 = input_size
        
    num_patch_x = input_size2[0] // patch_size[0]
    num_patch_y = input_size2[1] // patch_size[1]

    x2 = PatchExtract(patch_size)(inputs)
    x2 = PatchEmbedding(num_patch_x * num_patch_y, embed_dim)(x2)

    x2 = SwinTransformer_Block(
        dim=embed_dim,
        num_patch=(num_patch_x, num_patch_y),
        num_heads=num_heads,
        window_size=window_size,
        shift_size=0,
        num_mlp=num_mlp,
        qkv_bias=qkv_bias,
        dropout_rate=dropout_rate,
        prefix="swinbloc1"
    )(x2)
    x2 = SwinTransformer_Block(
        dim=embed_dim,
        num_patch=(num_patch_x, num_patch_y),
        num_heads=num_heads,
        window_size=window_size,
        shift_size=shift_size,
        num_mlp=num_mlp,
        qkv_bias=qkv_bias,
        dropout_rate=dropout_rate,
        prefix="swinbloc2"
    )(x2)
    

    x2 = PatchMerging((num_patch_x, num_patch_y), embed_dim=embed_dim, prefix='patch_merge')(x2)
    x2 = layers.GlobalAveragePooling1D()(x2)
    x2 = layers.Dense(64)(x2)
    
    x3 = layers.concatenate([x1,x2])
    x3 = layers.Dense(32)(x3)
    outputs = layers.Dense(num_classes, activation="softmax",name='output')(x3)
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    return model

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
reduce_lr = ReduceLROnPlateau(monitor='val_accuracy', factor=0.1,
                              patience=10)
earlystop = EarlyStopping(monitor='val_accuracy', patience=15)


In [None]:
seed = 101
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
cvacc = []
cvauc = []

i = 1
for train, test in kfold.split(data_x2,data_y):
    input_size = (150,150,3)
    patch_size = (3,3)
    model = CNNSwinTransformer3(input_size, patch_size)
    model.compile(
    loss=keras.losses.CategoricalCrossentropy(label_smoothing=label_smoothing),
    optimizer=tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    ),
    metrics=[
        keras.metrics.CategoricalAccuracy(name="accuracy"),
    ],
    )

    checkpoint_filepath = "../Data/models/fold{0}_inceptionv3swintransformer_lung_binary.h5".format(str(i),'w')
    if os.path.exists(checkpoint_filepath):
        os.remove(checkpoint_filepath)
    checkpoint = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath, save_weights_only=True, save_best_only=True, verbose=1,monitor='val_accuracy',mode='max'
    )

    model.fit(data_x2[train], data_y2[train], validation_split=0.2,epochs=300, batch_size=32, verbose=1,shuffle=True,callbacks=[checkpoint,reduce_lr,earlystop])
    model.load_weights(checkpoint_filepath)
    # evaluate the model
    evals = model.evaluate(data_x2[test], data_y2[test], verbose=1)
    y_pred = model.predict(data_x2[test])
    with open('../Data/models/fold{0}_inceptionv3swintransformer_lung_binary.csv'.format(str(i)),'w') as file:
        file.write('true_label,predict_label'+'\n')
        for a,b in zip(data_y2[test],y_pred):
            file.write(str(a) + ','+str(b[0])+'\n')
    acc = evals[1]
 
    auc = roc_auc_score(data_y2[test],y_pred)
    
    print("%s: %.2f%%" % (model.metrics_names[1], acc))
    print("auc: %.2f"%(auc))

    cvacc.append(acc*100)
    cvauc.append(auc)

    i+=1
print("mean acc %.2f%% (+/- %.2f%%)" % (np.mean(cvacc), np.std(cvacc)))
print("mean auc %.2f%% (+/- %.2f%%)" % (np.mean(cvauc), np.std(cvauc)))


In [None]:
cvacc

In [None]:
np.mean(cvacc)

In [None]:
cvauc

In [None]:
np.mean(cvauc)

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix,accuracy_score,f1_score,roc_auc_score,recall_score,precision_score

In [None]:
def calculate_metric(gt, pred): 
    pred2 = []
    for i in pred:
        if i>0.5:
            pred2.append(1)
        else:
            pred2.append(0)
    confusion = confusion_matrix(gt,pred2)
    TP = confusion[1, 1]
    TN = confusion[0, 0]
    FP = confusion[0, 1]
    FN = confusion[1, 0]
    acc = (TP+TN)/float(TP+TN+FP+FN)
    auc = roc_auc_score(gt,pred)
    precision = TP/float(TP+FP)
    f1_score_ = f1_score(gt,pred2)
    sensitivity = TP / float(TP+FN)
    specificity = TN / float(TN+FP)
    
    return round(acc,4),round(auc,4),round(f1_score_,4), round(precision,4),round(sensitivity,4), round(specificity,4)

In [None]:
df1 = pd.read_csv('../Data/models/fold1_inceptionv3swintransformer_lung_binary.csv')
df1['true_label'] = df1['true_label'].str[1:-1].str.split(' ').str[0].astype(float)

calculate_metric(df1['true_label'],df1['predict_label'])

In [None]:
df2 = pd.read_csv('../Data/models/fold2_inceptionv3swintransformer_lung_binary.csv')
df2['true_label'] = df2['true_label'].str[1:-1].str.split(' ').str[0].astype(float)

calculate_metric(df2['true_label'],df2['predict_label'])

In [None]:
df3 = pd.read_csv('../Data/models/fold3_inceptionv3swintransformer_lung_binary.csv')
df3['true_label'] = df3['true_label'].str[1:-1].str.split(' ').str[0].astype(float)

calculate_metric(df3['true_label'],df3['predict_label'])

In [None]:
df4 = pd.read_csv('../Data/models/fold4_inceptionv3swintransformer_lung_binary.csv')
df4['true_label'] = df4['true_label'].str[1:-1].str.split(' ').str[0].astype(float)

calculate_metric(df4['true_label'],df4['predict_label'])

In [None]:
df5 = pd.read_csv('../Data/models/fold5_inceptionv3swintransformer_lung_binary.csv')
df5['true_label'] = df5['true_label'].str[1:-1].str.split(' ').str[0].astype(float)

calculate_metric(df5['true_label'],df5['predict_label'])