####Notebook for using CamAge model weights to re-train any datasetage related dataset####

In [None]:
#Importing useful libraries
import numpy as np
from PIL import Image
import cv2 as cv
import os
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pickle
import random
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import accuracy_score,recall_score , precision_score , classification_report , f1_score,roc_auc_score,average_precision_score,confusion_matrix
import torch
from torch import nn
from torchvision import transforms
from torch.utils.data import Dataset,DataLoader
from sklearn.model_selection import train_test_split
from PIL import Image
import numpy as np
from torchvision.models import Inception3
import pandas as pd
import os

In [None]:
def read_data(data_folder = "/CPT_Dataset/"): ## here path to dataset suer want to train (Use CPT dataset from zenodo for trail )
    dataset = []
    list_of_class = os.listdir(data_folder)
  
    for class_name in list_of_class:
        class_folder = os.path.join(data_folder,class_name)
        if not os.path.isfile(class_folder):
            list_of_images = os.listdir(class_folder)
            if "CPT1" in class_name:
                list_of_images = random.sample(list_of_images,k =1863)
            elif "CPT2" in class_name:
                list_of_images = random.sample(list_of_images,k = 2415)
            elif "CPT3" in class_name:
                list_of_images = random.sample(list_of_images,k = 2418)
            elif "CPT4" in class_name:
                list_of_images = random.sample(list_of_images,k = 2324)
            elif "CPT5" in class_name:
                list_of_images = random.sample(list_of_images,k = 2086)
            elif "Untreated" in class_name:
                list_of_images = random.sample(list_of_images,k = 3901)



            for image in tqdm(list_of_images):
                if image.endswith("jpg") or image.endswith("jpeg") or image.endswith("png"):
                    image_path = os.path.join(class_folder , image)
                    dataset.append(dict(image_path = image_path ,class_name = class_name))
    dataset = pd.DataFrame(dataset)
    return dataset
dataset = read_data()
class_numeric = {"Untreated" : 0,"CPT1" : 1 , "CPT2" : 2, "CPT3" : 3, "CPT4" : 4, "CPT5" : 5}


dataset["class_name"] = dataset["class_name"].apply(lambda x : class_numeric[x])

dataset

In [None]:
# to make coustom dataset and data loader
class CustomDataset(Dataset):
    def __init__(self,x,y,transform):
        super(CustomDataset,self).__init__()
        self.x = list(x)
        self.y = list(y)
        self.transform = transform 
    def __len__(self):
        return len(self.x)
    def __getitem__(self,index):
        image = self.x[index]
        
        image = Image.open(image).convert("RGB")
        image = self.transform(image)
#         image_name = os.path.basename(image)
#         label = self.y[index].item()  
        return (image , torch.tensor(self.y[index],dtype = torch.long))
def get_dataloader(dataset,image_resize,batch_size):
    x_train,x_test,y_train,y_test = train_test_split(dataset["image_path"],dataset["class_name"],test_size = 0.25,stratify=dataset["class_name"])
    transform = transforms.Compose([transforms.ToTensor(),transforms.Resize(image_resize),transforms.RandomHorizontalFlip(),transforms.RandomVerticalFlip(),
                                transforms.Normalize(mean = (0.485, 0.456, 0.406),std = (0.229, 0.224, 0.225)),
                                ])
    train_dataset = CustomDataset(x_train,y_train,transform)
    test_dataset = CustomDataset(x_test,y_test,transform)
    train_dataloader = DataLoader(dataset = train_dataset , batch_size = batch_size , shuffle = True)
    test_dataloader = DataLoader(dataset = test_dataset , batch_size = batch_size, shuffle = True)
    return train_dataloader,test_dataloader

In [None]:
## model structure

class ImageInceptionResNetV2(nn.Module):
    def __init__(self):
        super(ImageInceptionResNetV2, self).__init__()
        self.image_feature_extractor = torch.hub.load('pytorch/vision:v0.10.0', 'inception_v3', pretrained=True)
        
        self.fc = nn.Sequential(
            nn.Linear(in_features=1000, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(),
            nn.Linear(in_features=128, out_features=10),
            nn.Softmax(dim=1))
    def forward(self, x):
        
        inception_outputs = self.image_feature_extractor(x)
        output = inception_outputs
        output = self.fc(output)
        return output




class TrainModel:
    def __init__(self,model,dataset,batch_size,n_epochs,image_resize,lr,model_name):
        self.model_name = model_name
        self.dataset = dataset
        self.batch_size = batch_size
        self.n_epochs = n_epochs
        self.image_resize = image_resize
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.train_dataloader,self.test_dataloader = get_dataloader(self.dataset,self.image_resize,
                                                                    self.batch_size)
        self.model = model.to(self.device)
        self.lr = lr
        self.optimizer = torch.optim.Adam(self.model.parameters(),lr = self.lr)
        self.criterion = nn.CrossEntropyLoss()
    def train_test(self):
        
        for epoch in range(self.n_epochs):
            image_names = [item for item in self.test_dataloader.dataset.x]
#             print(image_names)
            epoch_image_names_file = f"epoch_{epoch}_image_names.txt"  
          
            with open(epoch_image_names_file, 'w') as f:
            	f.write('\n'.join(image_names))
#             	print(f"Saved image names for epoch {epoch} to {epoch_image_names_file}")
            

            image_names = [item for item in self.train_dataloader.dataset.x]
            epoch_image_names_file_train = f"epoch_{epoch}_image_names_train.txt"            
            with open(epoch_image_names_file_train, 'w') as f:
            	f.write('\n'.join(image_names))
#             	print(f"Saved image names for epoch {epoch} to {epoch_image_names_file_train}")

    
            total_loss_train = 0.0
            self.ground_truth = []
            self.prediction = []
            self.probability = []
            for index,(image_train,image_test) in enumerate(zip(self.train_dataloader,self.test_dataloader)):
                train_image = image_train[0].to(self.device)
                train_label = image_train[1].to(self.device)
                self.model.zero_grad()
                output_train = self.model(train_image)
                loss_train = self.criterion(output_train , train_label)
                loss_train.backward()
                total_loss_train += loss_train.item()
                self.optimizer.step()
                if(index % 10 == 0):
                    print("|EPOCH : {0}|{1},BATCH : {2}|{3} , LOSS train: {4}".format(epoch+1,self.n_epochs,index,len(self.train_dataloader),total_loss_train))
              #print(".....................Training End......................")
              #print(".....................Testing Start.....................")
                test_image = image_test[0].to(self.device)
                test_label= image_test[1].to(self.device)
                self.model.zero_grad()
                output_test = self.model(test_image)
                pred = torch.argmax(output_test,dim=1).cpu().tolist()
                self.ground_truth.extend(test_label.tolist())
                self.prediction.extend(pred)
                self.probability.extend(output_test.cpu().tolist())
              #if(index % 10 == 0):
                 # print("|BATCH : {0}|{1}".format(index,len(self.test_dataloader)))
              #print(".....................Testing End......................") 
            self.probability = np.array(self.probability)
            self.one_hot_encoding = list()
            for label in self.ground_truth:
                vector = [0,0,0,0,0,0]
                vector[label] = 1
                self.one_hot_encoding.append(vector)
            self.one_hot_encoding = np.array(self.one_hot_encoding)
            average_precision = dict()
            ground_truth_file = "ground_truth_epoch{}.npy".format(epoch)
            probability_file = "predicted_probabilities_epoch{}.npy".format(epoch)
            np.save(ground_truth_file, np.array(self.ground_truth))
            np.save(probability_file, np.array(self.probability))
            for i in range(6):
                average_precision = average_precision_score(self.one_hot_encoding[:,i],self.probability[:,i])
                torch.save(self.model.state_dict(),os.path.join("model_epoch{}.pt".format(epoch)))
                Classification_report_dataframe = pd.DataFrame()
                Classification_report_dataframe['Accuracy Score'] = accuracy_score(self.ground_truth,self.prediction)*100
                Classification_report_dataframe['Reacll Score'] = recall_score(self.ground_truth,self.prediction,average = "weighted")*100
                Classification_report_dataframe['Precision'] = precision_score(self.ground_truth,self.prediction,average = "weighted")*100
                Classification_report_dataframe['F1 Score'] = f1_score(self.ground_truth,self.prediction,average = "weighted")*100
                Classification_report_dataframe['ROC_AUC Score'] = roc_auc_score(self.ground_truth,self.probability,multi_class = "ovo")*100
              #Classification_report_dataframe['Average Precision'] =  np.mean(list(average_precision.values()))*100
              #
              #Classification_report_dataframe.to_csv("Classification_report_dataframe"+str(epoch)+".csv")
                print("Epoch Report.....>>>>>>>>>>>>>>>>",epoch)


                print("Accuracy : ",accuracy_score(self.ground_truth,self.prediction)*100)
                print("Recall : ",recall_score(self.ground_truth,self.prediction,average = "weighted")*100)
                print("Precision : ",precision_score(self.ground_truth,self.prediction,average = "weighted")*100)
                print("F1_Score : ",f1_score(self.ground_truth,self.prediction,average = "weighted")*100)
                print("ROC AUC Score : ",roc_auc_score(self.ground_truth,self.probability,multi_class = "ovo")*100)
              #print("AUPRC :" , np.mean(list(average_precision.values()))*100)
                print("Classification Report : ")
                print(classification_report(self.ground_truth,self.prediction))
                Classification_report = classification_report(self.ground_truth,self.prediction)
              #Classification_report.to_csv("Classification_report"+str(epoch)+".csv")
            res = []
            for l in [0,1,2,3,4,5]:
                prec,recall,_,_ = precision_recall_fscore_support(np.array(self.ground_truth)==l,
                                                                  np.array(self.prediction)==l,
                                                                  pos_label=True,average=None)
                res.append(recall[1])
            print("Specificity : ",np.mean(res)*100)
            Classification_report_dataframe['Specificity'] = np.mean(res)*100
	  #print(Classification_report_dataframe)


    def train_test_dl_model(self):
        self.train_test()



In [None]:
# Step 1: Load Pre-trained Model with User's Weights
model = ImageInceptionResNetV2()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
weights_path = "/storage/subhadeepd/CamAge/Images/Phase_Contrast_Images/Refined_Models/10_Label_Based/IR_05_09_23/Final_model_full_data/model_epoch200.pt"
model.load_state_dict(torch.load(weights_path))
model.eval()

In [None]:
# Step 2: Modify Model Output Layer
class ImageInceptionResNetV2Modified(nn.Module):
    def __init__(self, pretrained_model, num_classes):
        super(ImageInceptionResNetV2Modified, self).__init__()
        self.image_feature_extractor = pretrained_model.image_feature_extractor  # Use the same feature extractor
        self.fc = nn.Sequential(
            nn.Linear(in_features=1000, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(),
            nn.Linear(in_features=128, out_features=num_classes),  # Modify output features to the number of classes
            nn.Softmax(dim=1))

    def forward(self, x):
        inception_outputs = self.image_feature_extractor(x)
        output = inception_outputs
        output = output.view(output.size(0), -1)
        output = self.fc(output)
        return output
# Instantiate and load weights into the modified model
num_classes = 6  # Specify the number of classes in your dataset
modified_model = ImageInceptionResNetV2Modified(model, num_classes)
# modified_model.load_state_dict(torch.load(weights_path), strict=False)  # Load weights into the modified model, set strict=False to ignore mismatched parameters

# If you want to initialize the last fully connected layer of the modified model with random weights instead of using the pre-trained weights, you can uncomment the following line
# modified_model.fc[4] = nn.Linear(128, num_classes)



In [None]:

## Here user can chaneg the number of epochs, batch size and learning rate

os.chdir("path/to working /directory")
train_model = TrainModel(model = modified_model,dataset = dataset,
                         batch_size = 32,n_epochs=500,image_resize = (299, 299),
                         lr = 0.0001,model_name = "trained_new_model.pt")
train_model.train_test_dl_model()
