# Experiment

In [None]:
from LibAUC.libauc.losses import MultiLabelAUCMLoss,CrossEntropyLoss
from LibAUC.libauc.optimizers import PESG,Adam
from LibAUC.libauc.models import resnet50 as Resnet50
from LibAUC.libauc.datasets import CheXpert
from LibAUC.libauc.metrics import auc_roc_score # for multi-task

from PIL import Image
import numpy as np
import pandas as pd
import torch 
import torchvision.transforms as transforms
from torch.utils.data import Dataset
import torch.nn.functional as F
import os
import cv2
import shutil
import warnings
import re
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt

In [None]:
def set_all_seeds(SEED):
    # REPRODUCIBILITY
    torch.manual_seed(SEED)
    np.random.seed(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
warnings.simplefilter(action='ignore',category=FutureWarning) # Delete Future Warning

In [None]:
SEED = 123
set_all_seeds(SEED)
BATCH_SIZE = 32
device = "cuda" if torch.cuda.is_available() else "cpu"
print(os.getcwd())

## Model Load

In [None]:
# model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
origin_model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
origin_model.load_state_dict(torch.load(os.path.join(os.getcwd(),'pth_files','origin_model_resnet50.pth')))
male_model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
male_model.load_state_dict(torch.load(os.path.join(os.getcwd(),'pth_files','male_model_resnet50.pth')))
female_model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
female_model.load_state_dict(torch.load(os.path.join(os.getcwd(),'pth_files','female_model_resnet50.pth')))
before40_model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
before40_model.load_state_dict(torch.load(os.path.join(os.getcwd(),'pth_files','before40_model_resnet50.pth')))
after40_model = Resnet50(pretrained=False,last_activation=None,activations='relu',num_classes=5)
after40_model.load_state_dict(torch.load(os.path.join(os.getcwd(),'pth_files','after40_model_resnet50.pth')))

In [None]:
files_lst = ['CheXpert_origin']
models_lst = [origin_model]

test_pred = []
test_true = []
for i in range(len(files_lst)):
    file = files_lst[i]
    model = models_lst[i]
    model.to(device)
    root = os.path.join(os.getcwd(),f'{file}\\')
    test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
    test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
    model.eval()
    with torch.no_grad():
        for jdx,data in enumerate(test_loader):
            test_data,test_labels = data
            test_data = test_data.cuda()
            y_pred = model(test_data)
            y_pred = torch.sigmoid(y_pred)
            test_pred.append(y_pred.cpu().detach().numpy())
            test_true.append(test_labels.numpy())

test_true = np.concatenate(test_true)
test_pred = np.concatenate(test_pred)
val_auc_mean = np.mean(auc_roc_score(test_true,test_pred))
origin_test_true = test_true
origin_test_pred = test_pred
print(val_auc_mean)

In [None]:
files_lst = ['CheXpert_male','CheXpert_female']
models_lst = [male_model,female_model]

test_pred = []
test_true = []
for i in range(len(files_lst)):
    file = files_lst[i]
    model = models_lst[i]
    model.to(device)
    root = os.path.join(os.getcwd(),f'{file}\\')
    test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
    test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
    model.eval()
    with torch.no_grad():
        for jdx,data in enumerate(test_loader):
            test_data,test_labels = data
            test_data = test_data.cuda()
            y_pred = model(test_data)
            y_pred = torch.sigmoid(y_pred)
            test_pred.append(y_pred.cpu().detach().numpy())
            test_true.append(test_labels.numpy())

test_true = np.concatenate(test_true)
test_pred = np.concatenate(test_pred)
val_auc_mean = np.mean(auc_roc_score(test_true,test_pred))
sex_test_true = test_true
sex_test_pred = test_pred
print(val_auc_mean)

In [None]:
files_lst = ['CheXpert_before40','CheXpert_after40']
models_lst = [before40_model,after40_model]

test_pred = []
test_true = []
for i in range(len(files_lst)):
    file = files_lst[i]
    model = models_lst[i]
    model.to(device)
    root = os.path.join(os.getcwd(),f'{file}\\')
    test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
    test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
    model.eval()
    with torch.no_grad():
        for jdx,data in enumerate(test_loader):
            test_data,test_labels = data
            test_data = test_data.cuda()
            y_pred = model(test_data)
            y_pred = torch.sigmoid(y_pred)
            test_pred.append(y_pred.cpu().detach().numpy())
            test_true.append(test_labels.numpy())

test_true = np.concatenate(test_true)
test_pred = np.concatenate(test_pred)
val_auc_mean = np.mean(auc_roc_score(test_true,test_pred)) 
age_test_true = test_true
age_test_pred = test_pred
print(val_auc_mean)

## Experiment per each symptom

In [None]:
origin_thres = np.zeros(origin_test_true.shape[1])
sex_thres = np.zeros(origin_test_true.shape[1])
age_thres = np.zeros(origin_test_true.shape[1])

for j in range(origin_test_true.shape[1]):
    fpr, tpr, thresholds = roc_curve(origin_test_true[:,j], origin_test_pred[:,j])
    J = tpr-fpr
    ix = np.argmax(J)
    origin_thres[j] = thresholds[ix]

    fpr, tpr, thresholds = roc_curve(sex_test_true[:,j], sex_test_pred[:,j])
    J = tpr-fpr
    ix = np.argmax(J)
    sex_thres[j] = thresholds[ix]

    fpr, tpr, thresholds = roc_curve(age_test_true[:,j], age_test_pred[:,j])
    J = tpr-fpr
    ix = np.argmax(J)
    age_thres[j] = thresholds[ix]

In [None]:
symptoms = ['Cardiomegaly','Edema','Consolidation','Atelectasis','Pleural Effusion']
symptoms_matrix_pd = pd.DataFrame(data={'Cardiomegaly':[0,0,0,0],'Edema':[0,0,0,0],'Consolidation':[0,0,0,0],'Atelectasis':[0,0,0,0],'Pleural Effusion':[0,0,0,0]},index=['FN','FP','TN','TP'])

# origin_model
model = origin_model
model.to(device)
root = os.path.join(os.getcwd(),'CheXpert_origin\\')
test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
model.eval()
with torch.no_grad():
    for jdx,data in enumerate(test_loader):
        if jdx % 1000 == 0 or jdx == len(test_loader)-1:
            print(f'origin_model: {jdx}')
            symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','origin_model_resnet50_matrix.csv'))
        
        test_data,test_labels = data
        test_data = test_data.cuda()
        y_pred = model(test_data)
        y_pred = torch.sigmoid(y_pred).cpu().detach().numpy()
        for i in range(test_labels.shape[0]):
            for j in range(test_labels.shape[1]):
                if test_labels[i][j] < 0.5: # Real Negative
                    if y_pred[i][j] < origin_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['TN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['FP',symptoms[j]] += 1
                else: # Real Positive
                    if y_pred[i][j] < origin_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['FN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['TP',symptoms[j]] += 1
            
symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','origin_model_resnet50_matrix.csv'))

In [None]:
symptoms = ['Cardiomegaly','Edema','Consolidation','Atelectasis','Pleural Effusion']
symptoms_matrix_pd = pd.DataFrame(data={'Cardiomegaly':[0,0,0,0],'Edema':[0,0,0,0],'Consolidation':[0,0,0,0],'Atelectasis':[0,0,0,0],'Pleural Effusion':[0,0,0,0]},index=['FN','FP','TN','TP'])

# male_model
model = male_model
model.to(device)
root = os.path.join(os.getcwd(),'CheXpert_male\\')
test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
model.eval()
with torch.no_grad():
    for jdx,data in enumerate(test_loader):
        if jdx % 1000 == 0 or jdx == len(test_loader)-1:
            print(f'male_model: {jdx}')
            symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','male_model_resnet50_matrix.csv'))
        
        test_data,test_labels = data
        test_data = test_data.cuda()
        y_pred = model(test_data)
        y_pred = torch.sigmoid(y_pred).cpu().detach().numpy()
        for i in range(test_labels.shape[0]):
            for j in range(test_labels.shape[1]):
                if test_labels[i][j] < 0.5: # Real Negative
                    if y_pred[i][j] < sex_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['TN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['FP',symptoms[j]] += 1
                else: # Real Positive
                    if y_pred[i][j] < sex_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['FN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['TP',symptoms[j]] += 1
            
symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','male_model_resnet50_matrix.csv'))

In [None]:
symptoms = ['Cardiomegaly','Edema','Consolidation','Atelectasis','Pleural Effusion']
symptoms_matrix_pd = pd.DataFrame(data={'Cardiomegaly':[0,0,0,0],'Edema':[0,0,0,0],'Consolidation':[0,0,0,0],'Atelectasis':[0,0,0,0],'Pleural Effusion':[0,0,0,0]},index=['FN','FP','TN','TP'])

# female_model
model = female_model
model.to(device)
root = os.path.join(os.getcwd(),'CheXpert_female\\')
test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
model.eval()
with torch.no_grad():
    for jdx,data in enumerate(test_loader):
        if jdx % 1000 == 0 or jdx == len(test_loader)-1:
            print(f'female_model: {jdx}')
            symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','female_model_resnet50_matrix.csv'))
        
        test_data,test_labels = data
        test_data = test_data.cuda()
        y_pred = model(test_data)
        y_pred = torch.sigmoid(y_pred).cpu().detach().numpy()
        for i in range(test_labels.shape[0]):
            for j in range(test_labels.shape[1]):
                if test_labels[i][j] < 0.5: # Real Negative
                    if y_pred[i][j] < sex_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['TN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['FP',symptoms[j]] += 1
                else: # Real Positive
                    if y_pred[i][j] < sex_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['FN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['TP',symptoms[j]] += 1
            
symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','female_model_resnet50_matrix.csv'))

In [None]:
symptoms = ['Cardiomegaly','Edema','Consolidation','Atelectasis','Pleural Effusion']
symptoms_matrix_pd = pd.DataFrame(data={'Cardiomegaly':[0,0,0,0],'Edema':[0,0,0,0],'Consolidation':[0,0,0,0],'Atelectasis':[0,0,0,0],'Pleural Effusion':[0,0,0,0]},index=['FN','FP','TN','TP'])

# before40_model
model = before40_model
model.to(device)
root = os.path.join(os.getcwd(),'CheXpert_before40\\')
test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
model.eval()
with torch.no_grad():
    for jdx,data in enumerate(test_loader):
        if jdx % 1000 == 0 or jdx == len(test_loader)-1:
            print(f'before40_model: {jdx}')
            symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','before40_model_resnet50_matrix.csv'))
        
        test_data,test_labels = data
        test_data = test_data.cuda()
        y_pred = model(test_data)
        y_pred = torch.sigmoid(y_pred).cpu().detach().numpy()
        for i in range(test_labels.shape[0]):
            for j in range(test_labels.shape[1]):
                if test_labels[i][j] < 0.5: # Real Negative
                    if y_pred[i][j] < age_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['TN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['FP',symptoms[j]] += 1
                else: # Real Positive
                    if y_pred[i][j] < age_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['FN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['TP',symptoms[j]] += 1
            
symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','before40_model_resnet50_matrix.csv'))

In [None]:
symptoms = ['Cardiomegaly','Edema','Consolidation','Atelectasis','Pleural Effusion']
symptoms_matrix_pd = pd.DataFrame(data={'Cardiomegaly':[0,0,0,0],'Edema':[0,0,0,0],'Consolidation':[0,0,0,0],'Atelectasis':[0,0,0,0],'Pleural Effusion':[0,0,0,0]},index=['FN','FP','TN','TP'])

# after40_model
model = after40_model
model.to(device)
root = os.path.join(os.getcwd(),'CheXpert_after40\\')
test_set = CheXpert(csv_path=root+'test.csv',image_root_path=root,use_upsampling=False,use_frontal=True,image_size=224,mode='valid',class_index=-1,verbose=False)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=BATCH_SIZE,num_workers=2,shuffle=False)
model.eval()
with torch.no_grad():
    for jdx,data in enumerate(test_loader):
        if jdx % 1000 == 0 or jdx == len(test_loader)-1:
            print(f'after40_model: {jdx}')
            symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','after40_model_resnet50_matrix.csv'))
        
        test_data,test_labels = data
        test_data = test_data.cuda()
        y_pred = model(test_data)
        y_pred = torch.sigmoid(y_pred).cpu().detach().numpy()
        for i in range(test_labels.shape[0]):
            for j in range(test_labels.shape[1]):
                if test_labels[i][j] < 0.5: # Real Negative
                    if y_pred[i][j] < age_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['TN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['FP',symptoms[j]] += 1
                else: # Real Positive
                    if y_pred[i][j] < age_thres[j]: # Predict Negative
                        symptoms_matrix_pd.at['FN',symptoms[j]] += 1
                    else: # Predict Positive
                        symptoms_matrix_pd.at['TP',symptoms[j]] += 1
            
symptoms_matrix_pd.to_csv(os.path.join(os.getcwd(),'matrix_files','after40_model_resnet50_matrix.csv'))