In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
print(os.listdir('/content/drive/MyDrive/6th_sem_immune_dataset/SubClassification'))

Mounted at /content/drive
['Nuclear', 'Cytoplasmic']


In [None]:
print(os.listdir('/content/drive/MyDrive/6th_sem_immune_dataset/SubClassification/Nuclear/Pleomorphic'))

['AC-14', 'AC-13', 'train.pickle', 'test.pickle', 'weights']


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import random
import pickle as pkl
import cv2
import h5py

#from preprocess_data import dataloader

from tqdm import tqdm
from math import ceil

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from skimage.transform import rotate, AffineTransform, warp, rescale
from skimage.util import random_noise

import tensorflow as tf
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Lambda, Input, Flatten, Dense, Concatenate, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import model_from_json
from scipy.interpolate import make_interp_spline, BSpline
import tensorflow.keras.backend as K

In [None]:
import os
import random
import pickle
import numpy as np
import tensorflow as tf
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras import Input, Sequential, Model
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Lambda, BatchNormalization, Activation, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import HeNormal

class DataLoader:
    """
    Class for loading data directly from image files in directories.
    """
    def __init__(self, width, height, cells, data_path, output_path):
        self.width = width
        self.height = height
        self.cells = cells  # Number of color channels (typically 3 for RGB)
        self.data_path = data_path
        self.output_path = output_path

    def _open_image(self, path):
        """
        Open an image and convert it into a numpy array.
        """
        image = Image.open(path).convert('L')  # Convert image to grayscale
        image = image.resize((self.width, self.height))
        data = np.asarray(image)
        data = np.array(data, dtype='float64')
        return np.expand_dims(data, axis=-1)

    def _rotate_image(self, image, angle):
        """
        Rotate an image by the specified angle and return the rotated image as a numpy array.
        """
        rotated_image = image.rotate(angle)
        data = np.asarray(rotated_image)
        data = np.array(data, dtype='float64')
        return np.expand_dims(data, axis=-1)

    def load_images_from_subfolders(self, folder_name):
     image_paths = {}
     class_labels = {}
     label = 0

    # Iterate through subfolders
     for subfolder in os.scandir(folder_name):
        # Only process directories that are not named "weights"
        if subfolder.is_dir() and subfolder.name.lower() not in ["weights"]:
            subfolder_name = subfolder.name
            class_labels[subfolder_name] = label
            label += 1

            # Collect all .jpeg images in the current subfolder
            image_paths[subfolder_name] = [
                os.path.join(subfolder.path, file)
                for file in os.listdir(subfolder.path)
                if file.lower().endswith('.jpeg')  # Only include .jpeg files
            ]

            # Debugging: Print the number of images found
            if not image_paths[subfolder_name]:
                print(f"Warning: Subfolder {subfolder_name} contains no valid .jpeg files.")
            else:
                print(f"Subfolder {subfolder_name} contains {len(image_paths[subfolder_name])} .jpeg files.")

    # Return the dictionary of image paths and class labels
     return image_paths, class_labels

    def load(self, set_name, folder_name):
        """
        Loads images from the specified folder and its subfolders, and assigns unique class labels for each subfolder.
        Additionally, augment the dataset with 90, 180, 270, and 360-degree rotated versions of each image.
        """
        print(f'Loading dataset from: {self.data_path}')

        x_first = []
        x_second = []
        y = []
        names = []

        # Load images from the given folder and get their class labels
        image_paths, class_labels = self.load_images_from_subfolders(folder_name)
        print(f"Image paths: {image_paths}")
        print(f"Class labels: {class_labels}")
        # Pair images within the same subfolder and assign the corresponding class label
        for subfolder, paths in image_paths.items():
            for i, img1 in enumerate(paths):
                print(paths)
                image1 = Image.open(img1).convert('L').resize((self.width, self.height))

                for j, img2 in enumerate(paths[i+1:]):
                    image2 = Image.open(img2).convert('L').resize((self.width, self.height))

                    # Original pair
                    x_first.append(self._open_image(img1))
                    x_second.append(self._open_image(img2))
                    y.append(class_labels[subfolder])

                    # Augment the dataset with 90, 180, 270, and 360-degree rotations
                    for angle in [90, 180, 270]:
                        x_first.append(self._rotate_image(image1, angle))
                        x_second.append(self._rotate_image(image2, angle))
                        y.append(class_labels[subfolder])

        print(f'Done loading dataset: {len(x_first)} pairs loaded.')

        # Save the augmented dataset as a pickle file
        with open(self.output_path, 'wb') as f:
            pickle.dump([[x_first, x_second], y, names], f)
        print(f'Dataset saved to {self.output_path}')

from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np

class SiameseNetwork:
    def __init__(self, seed, width, height, cells, num_classes, loss, metrics, optimizer, dropout_rate):
        K.clear_session()
        self.load_file = None
        self.seed = seed
        self.num_classes = num_classes  # Number of output classes
        self.initialize_seed()
        self.optimizer = optimizer

        # Define the input shapes
        input_shape = (width, height, cells)
        left_input = Input(input_shape)
        right_input = Input(input_shape)

        # Create the architecture for the Siamese network
        model = self._get_architecture(input_shape)
        encoded_l = model(left_input)
        encoded_r = model(right_input)

        # Compute the absolute difference between the encoded features
        L1_layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
        L1_siamese_dist = L1_layer([encoded_l, encoded_r])
        L1_siamese_dist = Dropout(dropout_rate)(L1_siamese_dist)

        # Output layer for multi-class classification (softmax for num_classes)
        prediction = Dense(self.num_classes, activation='softmax', bias_initializer=self.initialize_bias)(L1_siamese_dist)
        siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)
        self.siamese_net = siamese_net
        self.siamese_net.compile(loss=loss, optimizer=optimizer, metrics=metrics)

    def initialize_seed(self):
        # Set random seed for reproducibility
        os.environ['PYTHONHASHSEED'] = str(self.seed)
        random.seed(self.seed)
        np.random.seed(self.seed)
        tf.random.set_seed(self.seed)

    def initialize_weights(self, shape, dtype=None):
        # Initialize weights for layers using He initializer for better generalization
        return HeNormal()(shape)

    def initialize_bias(self, shape, dtype=None):
        # Initialize biases for layers
        return K.random_normal(shape, mean=0.5, stddev=0.01, dtype=dtype, seed=self.seed)

    def _get_architecture(self, input_shape):
        # Define a simpler CNN architecture used by both "arms" of the Siamese network
        model = Sequential()
        model.add(Conv2D(32, kernel_size=(10, 10), input_shape=input_shape, kernel_initializer=self.initialize_weights,
                         kernel_regularizer=l2(5e-4), name='Conv1'))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())

        model.add(Conv2D(64, kernel_size=(8, 8), kernel_initializer=self.initialize_weights,
                         bias_initializer=self.initialize_bias, kernel_regularizer=l2(5e-4), name='Conv2'))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())

        model.add(Conv2D(128, kernel_size=(6, 6), kernel_initializer=self.initialize_weights,
                         bias_initializer=self.initialize_bias, kernel_regularizer=l2(5e-4), name='Conv3'))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling2D())


        model.add(Flatten())
        model.add(Dense(1024, activation='sigmoid', kernel_initializer=self.initialize_weights,
                        kernel_regularizer=l2(5e-3), bias_initializer=self.initialize_bias))
        return model

    def fit(self, weights_file, train_path, validation_size, batch_size, epochs, early_stopping, patience, min_delta):
        # Load training data
        with open(train_path, 'rb') as f:
            x_train, y_train, names = pickle.load(f)

        # Split into train/validation sets
        x_train_0, x_val_0, y_train_0, y_val_0 = train_test_split(x_train[0], y_train, test_size=validation_size, random_state=self.seed)
        x_train_1, x_val_1, y_train_1, y_val_1 = train_test_split(x_train[1], y_train, test_size=validation_size, random_state=self.seed)

        # Convert to numpy arrays
        x_train_0 = np.array(x_train_0, dtype='float64')
        x_val_0 = np.array(x_val_0, dtype='float64')
        x_train_1 = np.array(x_train_1, dtype='float64')
        x_val_1 = np.array(x_val_1, dtype='float64')

        x_train = [x_train_0, x_train_1]
        x_val = [x_val_0, x_val_1]

        # Convert the class labels to one-hot encoded format (since there are multiple classes)
        y_train_one_hot = to_categorical(y_train_0, num_classes=self.num_classes)
        y_val_one_hot = to_categorical(y_val_0, num_classes=self.num_classes)

        # Data augmentation
        datagen = ImageDataGenerator(
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True
        )

        # Manually augment both left and right images
        augmented_x_train_0 = []
        augmented_x_train_1 = []
        augmented_y_train = []

        for x_0, x_1, y in zip(x_train_0, x_train_1, y_train_one_hot):
            augmented_x_train_0.append(datagen.random_transform(x_0))
            augmented_x_train_1.append(datagen.random_transform(x_1))
            augmented_y_train.append(y)

        augmented_x_train_0 = np.array(augmented_x_train_0)
        augmented_x_train_1 = np.array(augmented_x_train_1)
        augmented_y_train = np.array(augmented_y_train)

        # Fit the model
        print('Beginning to fit the model')
        callback = []
        if early_stopping:
            es = EarlyStopping(monitor='val_loss', min_delta=min_delta, patience=patience, mode='auto', verbose=1)
            callback.append(es)

        self.siamese_net.fit([augmented_x_train_0, augmented_x_train_1], augmented_y_train, batch_size=batch_size,
                             validation_data=([x_val[0], x_val[1]], y_val_one_hot),
                             epochs=epochs, callbacks=callback, verbose=1)

        # Save weights
        self.siamese_net.save_weights(weights_file)

        # Evaluate on the validation set
        loss, accuracy = self.siamese_net.evaluate([x_val[0], x_val[1]], y_val_one_hot, batch_size=batch_size)
        print(f'Loss on Validation set: {loss}')
        print(f'Accuracy on Validation set: {accuracy}')

    def evaluate(self, test_file, batch_size):
        with open(test_file, 'rb') as f:
            x_test, y_test, names = pickle.load(f)

        y_test = np.array(y_test, dtype='float64')
        x_test[0] = np.array(x_test[0], dtype='float64')
        x_test[1] = np.array(x_test[1], dtype='float64')

        # Convert y_test to one-hot encoding
        y_test_one_hot = to_categorical(y_test, num_classes=self.num_classes)

        # Perform evaluation
        loss, accuracy = self.siamese_net.evaluate([x_test[0], x_test[1]], y_test_one_hot, batch_size=batch_size)

        # Return the results
        return loss, accuracy



Loaded pairs: 3


In [None]:
import os
import random
import time

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import categorical_crossentropy

# Environment settings
IS_COLAB = (os.name == 'posix')  # Detect if running in Colab
LOAD_DATA = True  # Ensure we load data
IS_EXPERIMENT = False  # Toggle experiment mode

train_name = 'train'
test_name = 'test'
WIDTH = HEIGHT = 105
CEELS = 1
loss_type = "categorical_crossentropy"  # Updated to categorical cross-entropy for multi-class
validation_size = 0.2
early_stopping = True

if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    data_path = os.path.join('/content/drive/MyDrive/6th_sem_immune_dataset/SubClassification/Nuclear/Pleomorphic')  # Adapt to your Google Drive path
else:
    from data_loader import DataLoader
    from siamese_network import SiameseNetwork
    data_path = os.path.join('path_to_your_local_dataset')
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Function to determine the number of classes based on subfolders
def get_num_classes(data_path):
    subfolders = [f.name for f in os.scandir(data_path) if f.is_dir()]
    return len(subfolders)

def run_combination(l, bs, ep, pat, md, seed, train_path, test_path, num_classes):
    model_save_type = 'h5'
    initialize_seed(seed)
    parameters_name = f'seed_{seed}_lr_{l}_bs_{bs}_ep_{ep}_val_{validation_size}_es_{early_stopping}_pa_{pat}_md_{md}'
    print(f'Running combination with {parameters_name}')

    # Ensure the weights directory exists
    weights_dir = os.path.join(data_path, 'weights')
    if not os.path.exists(weights_dir):
        os.makedirs(weights_dir)

    load_weights_path = os.path.join(weights_dir, f'weights_{parameters_name}.weights.h5')

    siamese = SiameseNetwork(seed=seed, width=WIDTH, height=HEIGHT, cells=CEELS, num_classes=num_classes,
                             loss=loss_type, metrics=['accuracy'], optimizer=Adam(learning_rate=l), dropout_rate=0.4)

    print(load_weights_path, train_path, validation_size, bs,ep,early_stopping,pat,md)
    siamese.fit(weights_file=load_weights_path, train_path=train_path, validation_size=validation_size,
                batch_size=bs, epochs=ep, early_stopping=early_stopping, patience=pat, min_delta=md)

    # Remove the 'analyze' argument from the evaluate method
    loss, accuracy = siamese.evaluate(test_file=test_path, batch_size=bs)

    print(f'Loss on Testing set: {loss}')
    print(f'Accuracy on Testing set: {accuracy}')

    return loss, accuracy

def run():
    data_set_save_type = 'pickle'
    train_path = os.path.join(data_path, f'{train_name}.{data_set_save_type}')  # Path for train file
    test_path = os.path.join(data_path, f'{test_name}.{data_set_save_type}')  # Path for test file

    # Determine the number of classes based on the subfolders in the dataset
    num_classes = get_num_classes(data_path)

    if LOAD_DATA:
        try:
            loader = DataLoader(width=WIDTH, height=HEIGHT, cells=CEELS, data_path=data_path, output_path=train_path)
            loader.load(set_name=train_name, folder_name='')  # Pass empty string to load all subfolders
            loader = DataLoader(width=WIDTH, height=HEIGHT, cells=CEELS, data_path=data_path, output_path=test_path)
            loader.load(set_name=test_name, folder_name='')  # Pass empty string to load all subfolders
        except Exception as e:
            print(f"Error loading data: {str(e)}")
            return

    result_path = os.path.join(data_path, f'results.csv')  # Path for results
    results = {'lr': [], 'batch_size': [], 'epochs': [], 'patience': [], 'min_delta': [], 'seed': [], 'loss': [], 'accuracy': []}

    for l in lr:
        for bs in batch_size:
            for ep in epochs:
                for pat in patience:
                    for md in min_delta:
                        for seed in seeds:
                            loss, accuracy = run_combination(l=l, bs=bs, ep=ep, pat=pat, md=md, seed=seed,
                                                             train_path=train_path, test_path=test_path, num_classes=num_classes)
                            results['lr'].append(l)
                            results['batch_size'].append(bs)
                            results['epochs'].append(ep)
                            results['patience'].append(pat)
                            results['min_delta'].append(md)
                            results['seed'].append(seed)
                            results['loss'].append(loss)
                            results['accuracy'].append(accuracy)

    df_results = pd.DataFrame.from_dict(results)
    df_results.to_csv(result_path)


def initialize_seed(seed):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)


if __name__ == '__main__':
    if IS_EXPERIMENT:
        seeds = [0]
        lr = [0.00005]
        batch_size = [32]
        epochs = [10]
        patience = [5]
        min_delta = [0.1]
    else:
        seeds = [0]
        lr = [0.00005]
        batch_size = [32]
        epochs = [10]
        patience = [5]
        min_delta = [0.1]

    print(os.name)
    start_time = time.time()
    print('Starting the experiments')
    run()
    print(f'Total Running Time: {time.time() - start_time}')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
posix
Starting the experiments
Loading dataset from: /content/drive/MyDrive/6th_sem_immune_dataset/SubClassification/Nuclear/Pleomorphic
Error loading data: [Errno 2] No such file or directory: ''
Total Running Time: 0.002100229263305664
