In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet50,vgg16, mobilenet_v3_large ,resnet101
from torch.utils.data import DataLoader
from torch import nn, optim
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm import tqdm
import torch.nn.functional as F
import pandas as pd
import matplotlib.pyplot as plt
from typing import List
from collections import Counter
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np
from torch.utils.data import Subset
from torchvision.datasets import CIFAR10
from sklearn.model_selection import train_test_split
from torchvision import datasets
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import numpy as np
from collections import Counter
from data.ImbalanceCIFAR import IMBALANCECIFAR10, IMBALANCECIFAR100
from data import dataloader
from pathlib import Path
from sklearn.metrics import confusion_matrix
import csv
def load_cifar10(batch_size, num_workers, val_indices):
    # Adjust these transforms to the needs of your neural network
    transform = transforms.Compose([
        transforms.Resize(224),  # Resize images to 224x224
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    full_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

    # Split the trainset into train and validation sets using given indices
    valset = torch.utils.data.Subset(full_trainset, val_indices)
    train_indices = [idx for idx in range(len(full_trainset)) if idx not in val_indices]
    trainset = torch.utils.data.Subset(full_trainset, train_indices)

    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    valloader = DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    return trainloader, valloader, testloader




def generate_indices(val_size, total_size, seed):
    np.random.seed(seed)
    indices = np.random.permutation(total_size)
    val_indices = indices[:val_size]
    return val_indices


def compute_uce(probs, targets, n_bins=10):
    _, nattrs =probs.size()
    nattrs = torch.tensor(nattrs) 
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    bin_lowers = bin_boundaries[:-1]
    bin_uppers = bin_boundaries[1:]
    uce = 0
    bin_uncertainties = []
    bin_errors = []
    prop_in_bin_values = []
    bin_n_samples = []
    bin_variances = []
    # Compute the uncertainty values (entropy)
    uncertainties = (1/torch.log(nattrs))*(-torch.sum(probs * torch.log(probs + 1e-12), dim=1))
    for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
        in_bin = (uncertainties >= bin_lower) * (uncertainties < bin_upper)
        prop_in_bin = in_bin.float().mean()
        prop_in_bin_values.append(prop_in_bin.item() if prop_in_bin.item() > 0 else None)
        if prop_in_bin.item() > 0:
            sample_indices = torch.where(in_bin)[0]
            bin_targets = targets[sample_indices]
            bin_probs = probs[sample_indices]
            error_in_bin = (bin_targets != torch.argmax(bin_probs, dim=1)).float().mean()
            avg_uncertainty_in_bin = uncertainties[in_bin].mean()
            uce += torch.abs(avg_uncertainty_in_bin - error_in_bin) * prop_in_bin
            bin_uncertainties.append(avg_uncertainty_in_bin.item())
            bin_errors.append(error_in_bin.item())
            n_samples_in_bin = sample_indices.size(0)
            bin_n_samples.append(n_samples_in_bin)
            bin_variances.append(torch.var((bin_targets != torch.argmax(bin_probs, dim=1)).float()).item())
        else:
            bin_uncertainties.append(None)
            bin_errors.append(None)
            bin_n_samples.append(None)
            bin_variances.append(None)

    return uce, bin_uncertainties, bin_errors, prop_in_bin_values, bin_n_samples, bin_variances
def create_model(mc_dropout=False, dropout_rate=0.1):
    model = resnet50(pretrained=True)
    num_ftrs = model.fc.in_features
    if mc_dropout:
        model.fc = nn.Sequential(nn.Dropout(dropout_rate), nn.Linear(num_ftrs, 10))
    else:
        model.fc = nn.Linear(num_ftrs, 10)
    return model

def validate(model, valloader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for images, labels,_ in valloader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            preds = outputs.argmax(dim=1)

            # Append labels and predictions for each batch
            all_labels.append(labels.cpu())
            all_preds.append(preds.cpu())
            running_loss += loss.item() * images.size(0)

    accuracy = accuracy_score(torch.cat(all_labels).numpy(), torch.cat(all_preds).numpy())
    average_loss = running_loss / len(valloader.dataset)
    return average_loss, accuracy

def train(model, trainloader, valloader, criterion, optimizer, device, num_epochs, model_save_path):
    best_accuracy = 0.0
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for i, (images, labels,_) in tqdm(enumerate(trainloader)):
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
        
        training_loss = running_loss / len(trainloader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {training_loss}")

        # Validate after each epoch and save model if validation accuracy improves
        val_loss, val_accuracy = validate(model, valloader, criterion, device)
        print(f"Validation Loss: {val_loss}, Validation Accuracy: {val_accuracy}")

        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            print(f"Improved validation accuracy: {best_accuracy}, saving model...\n")
            torch.save(model.state_dict(), model_save_path)

    return model

def test(model, testloader, criterion, device, mc_dropout=False, T=100):
    model.eval()
    if mc_dropout:
        enable_dropout(model)  # Only activate Dropout layers

    running_loss = 0.0
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for images, labels,_ in tqdm(testloader):
            images = images.to(device)
            labels = labels.to(device)

            if mc_dropout:
                # Apply dropout by predicting T times and averaging
                outputs = torch.stack([model(images) for _ in range(T)], dim=0)
                output_mean = outputs.mean(dim=0)
                loss = criterion(output_mean, labels)
                preds = output_mean.argmax(dim=1)
                all_probs.append(F.softmax(output_mean, dim=1).cpu())  # Store output_mean for MC Dropout mode
            else:
                outputs = model(images)
                loss = criterion(outputs, labels)
                preds = outputs.argmax(dim=1)
                all_probs.append(F.softmax(outputs, dim=1).cpu())
                
            # Append labels and predictions for each batch
            all_labels.append(labels.cpu())
            all_preds.append(preds.cpu())

    all_probs = torch.cat(all_probs).numpy()
    all_labels = torch.cat(all_labels).numpy()
    all_preds = torch.cat(all_preds).numpy()
    
    calibration_error = running_loss / len(testloader.dataset)
    accuracy = accuracy_score(all_labels, all_preds)
    uce, _, _, _, _, _ = compute_uce(torch.from_numpy(all_probs), torch.from_numpy(all_labels))

    return calibration_error, accuracy, uce

def enable_dropout(model):
    """ Function to enable the dropout layers during test-time """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.train()
def load_model(model, load_path):
    model.load_state_dict(torch.load(load_path))
    return model

class FocalLoss(nn.Module):
    def __init__(self, gamma=2, alpha=0.5, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction
        self.ce_loss = nn.CrossEntropyLoss(reduction='none')

    def forward(self, inputs, targets):
        ce_loss = self.ce_loss(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss
        
def plot_and_save_confusion_matrix(logits, labels, save_path=None):
    # Calculate predicted labels
    _, preds = torch.max(logits, dim=1)

    # Calculate confusion matrix
    cm = confusion_matrix(labels.cpu().numpy(), preds.cpu().numpy())

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    ax.figure.colorbar(im, ax=ax)

    # Transform labels to numpy for x and y axis
    classes = labels.unique().cpu().numpy()

    # Set the x and y axis labels
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    # Loop over data dimensions and create text annotations
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i, j] > cm.max() / 2. else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    # Save the figure if a path is provided
    if save_path is not None:
        plt.savefig(save_path)
        print(f"Saved confusion matrix to {save_path}")

    # Show the figure
    plt.show()


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def create_dataloaders(seed, batch_size_,imb_ratio):

    
    train_loader = dataloader.load_data(data_root='./cifar-10-batches-py', dataset='CIFAR10_LT'
                                        , phase='train', batch_size=batch_size_, shuffle=True,cifar_imb_ratio = imb_ratio)
    val_loader = dataloader.load_data(data_root='./cifar-10-batches-py', dataset='CIFAR10_LT'
                                      , phase='val', batch_size=batch_size_, shuffle=False,cifar_imb_ratio = imb_ratio)
    test_loader = dataloader.load_data(data_root='./cifar-10-batches-py', dataset='CIFAR10_LT'
                                       , phase='test', batch_size=batch_size_, shuffle=False,cifar_imb_ratio = imb_ratio)

    
#     train_dataset=IMBALANCECIFAR10('train', imbalance_ratio=20, root='dataset/')
#     val_dataset=IMBALANCECIFAR10('val', imbalance_ratio=20, root='dataset/')
#     test_dataset=IMBALANCECIFAR10('test', imbalance_ratio=20, root='dataset/')
#     # 創建數據加載器
#     train_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=4)
#     val_loader = DataLoader(val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=4)
#     test_loader = DataLoader(test_dataset, batch_size=test_batch_size, num_workers=4)

    return train_loader, val_loader, test_loader


### 測試不同的DATASET SPLILT

In [3]:
def main(seed_list,ep,save_model,parm,imb_ratio):
    learning_rate = 0.0005
    batch_size = 512
    

    num_epochs = ep
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Define the size of validation set and total size of dataset
    val_size = 5000
    total_size = 50000

    for seed in seed_list:
        print(f"Training model with different split = {seed}")

        trainloader, valloader, testloader = create_dataloaders(seed, batch_size,imb_ratio)        

        criterion = FocalLoss(alpha=parm[0], gamma=parm[1])

#         dropout_rates = [0.5, 0.4, 0.3 ,0.2, 0.1]
        dropout_rates = [0.5]

        results = []

        for mc_dropout in [False]:  # use MC dropout
            for dropout_rate in dropout_rates:
                print(f"Training model with MC Dropout = {mc_dropout}, Dropout Rate = {dropout_rate}")
                model = create_model(mc_dropout, dropout_rate)
                model = model.to(device)
                
#                 optimizer = torch.optim.SGD(model.parameters(), learning_rate,
#                             momentum=0.9,
#                             weight_decay=2e-4)
                optimizer = optim.Adam(model.parameters(), lr=learning_rate)

                # Create a file name to save the best model for each dropout rate
                best_model_save_path = f"2_best_model_seed_{seed}_dropout_{dropout_rate}_false_ep{num_epochs}_{save_model}.pt"

                # Train the model, validating and saving the best model after each epoch
                model = train(model, trainloader, valloader, criterion, optimizer, device, num_epochs, best_model_save_path)

                calibration_error, accuracy, uce = test(model, testloader, criterion, device, mc_dropout)
                print(f"Test Calibration Error: {calibration_error}, Test Accuracy: {accuracy}, UCE: {uce}\n")

                results.append({
                    'dropout_rate': dropout_rate,
                    'calibration_error': calibration_error,
                    'accuracy': accuracy,
                    'uce': uce
                })

        #         # Save the final model state as well, if needed
        #         final_model_save_path = f"final_model_seed_{seed}_dropout_{dropout_rate}_true_.pt"
        #         torch.save(model.state_dict(), final_model_save_path)

            # Save results to csv file
#             df = pd.DataFrame(results)
#             df.to_csv(str(seed)+'_'+'dropout_experiments_results.csv', index=False)

            # Print out results
            for result in results:
                print(f"Dropout Rate: {result['dropout_rate']}, Test Calibration Error: {result['calibration_error']}, Test Accuracy: {result['accuracy']}, UCE: {result['uce']}")

In [3]:
# if __name__ == "__main__":
    
#     main([3],10,1,parm = [1.,0.],imb_ratio= 50)
#     main([4],35,1,parm = [0.6,3.],imb_ratio= 50)
#     main([5],20,3,parm = [0.5,8.],imb_ratio= 50)

In [4]:
def find_error_rates(uncertainties, bin_uncertainties, bin_errors):
    error_rates = []
    for uncertainty in uncertainties:
        found = False
        for idx, (bin_uncertainty_lower, bin_uncertainty_upper, bin_error) in enumerate(zip(bin_uncertainties[:-1], bin_uncertainties[1:], bin_errors)):
            if bin_uncertainty_lower is not None and bin_uncertainty_upper is not None and bin_error is not None:
                if bin_uncertainty_lower <= uncertainty.item() < bin_uncertainty_upper:

                    error_rates.append(bin_error)
                    found = True
                    break
        if not found:
            found_= False
            if bin_uncertainties[0] is not None: 
                if 0 <=uncertainty< bin_uncertainties[0]:
                    error_rates.append(bin_errors[0])
                    found_= True
                else:    
                    for bin_error in reversed(bin_errors):
                        if bin_error is not None:
                            error_rates.append(bin_error)
                            found_= True
                            break
            else:
                if bin_uncertainties[1] is not None: 
                    error_rates.append(bin_errors[1])
                    found_= True
                else:
                    error_rates.append(bin_errors[2])
                    found_= True
                    
            if not found_:
                if bin_uncertainties[4] is not None and 0 <=uncertainty< bin_uncertainties[4]: 
                    error_rates.append(bin_errors[4])
                    found_= True
            if not found_:
                if bin_uncertainties[8] is not None and bin_uncertainties[8] <=uncertainty< 1: 
                    error_rates.append(bin_errors[8])
                    found_= True
                

    return torch.tensor(error_rates)

def cal_accuracy(y_true, y_pred):

    # 計算正確預測的數量
    correct_predictions = torch.sum(y_true == y_pred)

    # 計算準確度
    accuracy = correct_predictions.item() / y_true.size(0)

    return accuracy
def choose_best_expert_ex(probs_expert1, probs_expert2, targets,val_uce_list_ep1,val_uce_list_ep2,weight_ep1,weight_ep2, n_bins=10):
    # Compute UCE and bin values for both experts

#     uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_expert1, targets, n_bins)
#     uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_expert2, targets, n_bins)

    uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1,bin_variances_ep1 = val_uce_list_ep1[0],val_uce_list_ep1[1],val_uce_list_ep1[2],val_uce_list_ep1[3],val_uce_list_ep1[4],val_uce_list_ep1[5]
    uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2,bin_variances_ep2 = val_uce_list_ep2[0],val_uce_list_ep2[1],val_uce_list_ep2[2],val_uce_list_ep2[3],val_uce_list_ep2[4],val_uce_list_ep2[5]

    # Compute uncertainties for both experts
    _, nattrs = probs_expert1.size()
    nattrs = torch.tensor(nattrs)
#     print(probs_expert1)
    uncertainties_expert1 = (1/torch.log(nattrs))*(-torch.sum(probs_expert1 * torch.log(probs_expert1 + 1e-12), dim=1))
    uncertainties_expert2 = (1/torch.log(nattrs))*(-torch.sum(probs_expert2 * torch.log(probs_expert2 + 1e-12), dim=1))
    # Find error rates for both experts
#     print(uncertainties_expert1,bin_uncertainties_expert1,bin_errors_expert1)
    error_rates_expert1 = find_error_rates(uncertainties_expert1, bin_uncertainties_expert1, bin_errors_expert1)
    error_rates_expert2 = find_error_rates(uncertainties_expert2, bin_uncertainties_expert2, bin_errors_expert2)
    # Choose the expert with lower error rate for each sample
    
    error_rates_expert1 = (error_rates_expert1/weight_ep1)
    error_rates_expert2 = (error_rates_expert2/weight_ep2)
    
    chosen_expert = (error_rates_expert1 < error_rates_expert2).cuda()

    # Get the predictions from both experts
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)

    # Choose the final prediction based on the chosen expert
    final_predictions = torch.where(chosen_expert, preds_expert1, preds_expert2)

    return final_predictions


def choose_best_three_expert(probs_expert1,probs_expert2,probs_expert3, targets,val_uce_list_ep1,val_uce_list_ep2,val_uce_list_ep3,weight_ep1,weight_ep2,weight_ep3, n_bins=10):
    device = torch.device("cuda:0")
    
#     uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_expert1, targets, n_bins)
#     uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_expert2, targets_pairs, n_bins)
#     uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3 = compute_uce(probs_expert3, targets, n_bins)
    uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1,bin_variances_ep1 = val_uce_list_ep1[0],val_uce_list_ep1[1],val_uce_list_ep1[2],val_uce_list_ep1[3],val_uce_list_ep1[4],val_uce_list_ep1[5]
    uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2,bin_variances_ep2 = val_uce_list_ep2[0],val_uce_list_ep2[1],val_uce_list_ep2[2],val_uce_list_ep2[3],val_uce_list_ep2[4],val_uce_list_ep2[5]
    uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3,bin_variances_ep3 = val_uce_list_ep3[0],val_uce_list_ep3[1],val_uce_list_ep3[2],val_uce_list_ep3[3],val_uce_list_ep3[4],val_uce_list_ep3[5]



    # Compute uncertainties for both experts
    _, nattrs = probs_expert1.size()
    _, nattrs = probs_expert2.size()
    nattrs = torch.tensor(nattrs)
    uncertainties_expert1 = (1/torch.log(nattrs))*(-torch.sum(probs_expert1 * torch.log(probs_expert1 + 1e-12), dim=1))
    uncertainties_expert2 = (1/torch.log(nattrs))*(-torch.sum(probs_expert2 * torch.log(probs_expert2 + 1e-12), dim=1))
    uncertainties_expert3 = (1/torch.log(nattrs))*(-torch.sum(probs_expert3 * torch.log(probs_expert3 + 1e-12), dim=1))

    # Find error rates for both experts
    error_rates_expert1 = find_error_rates(uncertainties_expert1, bin_uncertainties_expert1, bin_errors_expert1)
    error_rates_expert2 = find_error_rates(uncertainties_expert2, bin_uncertainties_expert2, bin_errors_expert2)
    error_rates_expert3 = find_error_rates(uncertainties_expert3, bin_uncertainties_expert3, bin_errors_expert3)
    # Choose the expert with lower error rate for each sample
    error_rates_expert1 = (error_rates_expert1/weight_ep1)
    error_rates_expert2 = (error_rates_expert2/weight_ep2)
    error_rates_expert3 = (error_rates_expert3/weight_ep3)
    
    
    # Get the predictions from both experts
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)
    preds_expert3 = torch.argmax(probs_expert3, dim=1)

    # 將三個錯誤率堆疊成一個張量
    error_rates = torch.stack([error_rates_expert1, error_rates_expert2, error_rates_expert3])

    # 找出最小錯誤率的索引
    _, min_error_rate_indices = torch.min(error_rates, dim=0)
    min_error_rate_indices = min_error_rate_indices.to(device)

    # 根據最小錯誤率的索引選擇最終的預測
    final_predictions = torch.where(min_error_rate_indices == 0, preds_expert1, 
                                    torch.where(min_error_rate_indices == 1, preds_expert2, preds_expert3))
    std_deviation_per_position = torch.std(error_rates, dim=0)
    mean_value = torch.mean(std_deviation_per_position)
    print("mean: ",mean_value)
    return final_predictions



def voting(preds_expert1: List[int], preds_expert2: List[int], preds_expert3: List[int], default_expert: int) -> torch.Tensor:
    assert default_expert in [1, 2, 3], "Default expert must be either 1, 2, or 3"
    
    final_preds = []
    for p1, p2, p3 in zip(preds_expert1, preds_expert2, preds_expert3):
        vote_counts = Counter([p1, p2, p3])
        max_vote_count = max(vote_counts.values())
        most_common = [k for k, v in vote_counts.items() if v == max_vote_count]
        
        if len(most_common) > 1:
            if default_expert == 1:
                final_preds.append(p1)
            elif default_expert == 2:
                final_preds.append(p2)
            else:  # default_expert == 3
                final_preds.append(p3)
        else:
            final_preds.append(most_common[0])
    

    final_preds = torch.tensor(final_preds, dtype=torch.int64)
    return final_preds
def weighted_voting(preds_expert1: List[int], preds_expert2: List[int], preds_expert3: List[int], weights: List[float], default_expert: int) -> torch.Tensor:
    assert default_expert in [1, 2, 3], "Default expert must be either 1, 2, or 3"
    
    final_preds = []
    for p1, p2, p3 in zip(preds_expert1, preds_expert2, preds_expert3):
        weighted_vote_counts = Counter()
        for pred, weight in zip([p1, p2, p3], weights):
            weighted_vote_counts[pred] += weight
        
        max_vote_count = max(weighted_vote_counts.values())
        most_common = [k for k, v in weighted_vote_counts.items() if v == max_vote_count]
        
        if len(most_common) > 1:
            if default_expert == 1:
                final_preds.append(p1)
            elif default_expert == 2:
                final_preds.append(p2)
            else:  # default_expert == 3
                final_preds.append(p3)
        else:
            final_preds.append(most_common[0])
    
    final_preds = torch.tensor(final_preds, dtype=torch.int64)

    # 将列表转换为一个 PyTorch 张量
    final_preds = torch.tensor(final_preds)

    return final_preds


def voting2(preds_expert1: List[int], preds_expert2: List[int], default_expert: int) -> torch.Tensor:
    assert default_expert in [1, 2], "Default expert must be either 1 or 2"
    
    final_preds = []
    for p1, p2 in zip(preds_expert1, preds_expert2):
        vote_counts = Counter([p1, p2])
        max_vote_count = max(vote_counts.values())
        most_common = [k for k, v in vote_counts.items() if v == max_vote_count]
        
        if len(most_common) > 1:
            if default_expert == 1:
                final_preds.append(p1)
            else:  # default_expert == 2
                final_preds.append(p2)
        else:
            final_preds.append(most_common[0])
    
    final_preds = torch.tensor(final_preds, dtype=torch.int64)
    return final_preds

def weighted_voting2(preds_expert1: List[int], preds_expert2: List[int], weights: List[float], default_expert: int) -> torch.Tensor:
    assert default_expert in [1, 2], "Default expert must be either 1 or 2"
    
    final_preds = []
    for p1, p2 in zip(preds_expert1, preds_expert2):
        weighted_vote_counts = Counter()
        for pred, weight in zip([p1, p2], weights):
            weighted_vote_counts[pred] += weight
        
        max_vote_count = max(weighted_vote_counts.values())
        most_common = [k for k, v in weighted_vote_counts.items() if v == max_vote_count]
        
        if len(most_common) > 1:
            if default_expert == 1:
                final_preds.append(p1)
            else:  # default_expert == 2
                final_preds.append(p2)
        else:
            final_preds.append(most_common[0])
    
    final_preds = torch.tensor(final_preds, dtype=torch.int64)
    return final_preds
def product_of_experts(predictions):
    # Multiply predictions together
    product = torch.prod(predictions, dim=0)
    
    # Normalize result
    product /= torch.sum(product)
    
    return product

from matplotlib.colors import Normalize
import matplotlib.cm as cm

def plot_dot_UCE_diagram(uce_value, bin_uncertainties, bin_errors, prop_in_bin_values, bin_n_samples, bin_variances, model_index, threshold=0.005):
    global save_name
    plt.figure(figsize=(6, 6))
    plt.plot([0, 1], [0, 1], linestyle="--", color="gray", label="Perfect calibration")
    
    # 筛选prop_in_bin值大于等于threshold的点
    valid_indices = [i for i, prop in enumerate(prop_in_bin_values) if prop is not None and prop >= threshold]
    valid_bin_uncertainties = [bin_uncertainties[i] for i in valid_indices]
    valid_bin_errors = [bin_errors[i] for i in valid_indices]
    valid_prop_in_bin_values = [prop_in_bin_values[i] for i in valid_indices]
    valid_bin_n_samples  = [bin_n_samples[i] for i in valid_indices]
    valid_bin_variances  = [bin_variances[i] for i in valid_indices]
    
    # 计算中心点
    centers = [0.05 + 0.1 * i for i in range(10)]
    bins = [0 + 0.1 * i for i in range(11)]  # Including the rightmost edge for binning
    
    # 对bin_uncertainties值进行分箱，并找到对应的中心点
    hist_values = np.digitize(valid_bin_uncertainties, bins) - 1
    hist_centers = [centers[i] for i in hist_values]
    
    # Use Normalize and colormap to change the color of the bars based on bin_n_samples
    norm = Normalize(vmin=min(valid_bin_n_samples), vmax=max(valid_bin_n_samples))
    colormap = cm.cividis   # Changed to plasma colormap
    colors = [colormap(norm(value)) for value in valid_bin_n_samples]
    
    plt.bar(hist_centers, valid_bin_errors, width=0.05, color=colors)
    plt.xlabel("Uncertainty", fontsize=14)
    plt.ylabel("Error", fontsize=14)
    plt.title("Reliability Diagram for Model {} (UCE={:.4f})".format(model_index , uce_value.item()), fontsize=16)
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.xticks(np.arange(0, 1.1, 0.1), fontsize=12)
    plt.yticks(np.arange(0, 1.1, 0.1), fontsize=12)
    plt.grid(color='gray', linestyle='-', linewidth=0.5, alpha=0.3)
    plt.gca().set_axisbelow(True)
    plt.legend(fontsize=12)
    plt.tight_layout()
    
    sm = cm.ScalarMappable(cmap=colormap, norm=norm)
    sm.set_array([])  # You can set a dummy array
    cbar = plt.colorbar(sm, orientation='vertical', label='Number of Samples')
    cbar.set_label('Number of Samples', rotation=270, labelpad=15)

    for i, txt in enumerate(valid_bin_n_samples):
        plt.annotate("n={}".format(txt), (hist_centers[i], valid_bin_errors[i]), fontsize=8, ha='center', va='bottom', textcoords="offset points", xytext=(0,5))
#         plt.annotate("var={:.2f}".format(valid_bin_variances[i]), (hist_centers[i], valid_bin_errors[i]), fontsize=8, ha='center', va='bottom', textcoords="offset points", xytext=(0,20))

    Path('plt/').mkdir(parents=True, exist_ok=True)
    plt.savefig('plt/'+save_name +"UCE_model_{}.svg".format(model_index))
    plt.close()

In [5]:
def choose_best_three_expert(probs_expert1,probs_expert2,probs_expert3 ,targets,val_uce_list_ep1,val_uce_list_ep2,val_uce_list_ep3, n_bins=10):

    device = torch.device("cuda:0")

#     uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_expert1, targets, n_bins)
#     uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_expert2, targets_pairs, n_bins)
#     uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3 = compute_uce(probs_expert3, targets, n_bins)
    uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1,bin_variances_ep1 = val_uce_list_ep1[0],val_uce_list_ep1[1],val_uce_list_ep1[2],val_uce_list_ep1[3],val_uce_list_ep1[4],val_uce_list_ep1[5]
    uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2,bin_variances_ep2 = val_uce_list_ep2[0],val_uce_list_ep2[1],val_uce_list_ep2[2],val_uce_list_ep2[3],val_uce_list_ep2[4],val_uce_list_ep2[5]
    uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3,bin_variances_ep3 = val_uce_list_ep3[0],val_uce_list_ep3[1],val_uce_list_ep3[2],val_uce_list_ep3[3],val_uce_list_ep3[4],val_uce_list_ep3[5]



    # Compute uncertainties for both experts
    _, nattrs = probs_expert1.size()
    nattrs = torch.tensor(nattrs)
    uncertainties_expert1 = (1/torch.log(nattrs))*(-torch.sum(probs_expert1 * torch.log(probs_expert1 + 1e-12), dim=1))
    uncertainties_expert2 = (1/torch.log(nattrs))*(-torch.sum(probs_expert2 * torch.log(probs_expert2 + 1e-12), dim=1))
    uncertainties_expert3 = (1/torch.log(nattrs))*(-torch.sum(probs_expert3 * torch.log(probs_expert3 + 1e-12), dim=1))

    # Find error rates for both experts
    error_rates_expert1 = find_error_rates(uncertainties_expert1, bin_uncertainties_expert1, bin_errors_expert1)
    error_rates_expert2 = find_error_rates(uncertainties_expert2, bin_uncertainties_expert2, bin_errors_expert2)
    error_rates_expert3 = find_error_rates(uncertainties_expert3, bin_uncertainties_expert3, bin_errors_expert3)
    # Choose the expert with lower error rate for each sample

    # Get the predictions from both experts
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)
    preds_expert3 = torch.argmax(probs_expert3, dim=1)

    # 將三個錯誤率堆疊成一個張量
    error_rates = torch.stack([error_rates_expert1, error_rates_expert2, error_rates_expert3])

    # 找出最小錯誤率的索引
    _, min_error_rate_indices = torch.min(error_rates, dim=0)
    min_error_rate_indices = min_error_rate_indices.to(device)
    # 根據最小錯誤率的索引選擇最終的預測
    final_predictions = torch.where(min_error_rate_indices == 0, preds_expert1,
                                    torch.where(min_error_rate_indices == 1, preds_expert2, preds_expert3))
    
    
    POE_probs_ = torch.stack([probs_expert1, probs_expert2,probs_expert3])
    POE_probs = product_of_experts(POE_probs_).to(device)
    POE_pred = torch.argmax(POE_probs, axis=1)
    
    POE_pred = torch.tensor(POE_pred)  # Convert numpy array to torch tensor

    SOE_probs_ = (probs_expert1+probs_expert2+probs_expert3)/3
    SOE_probs_=SOE_probs_.to(device)
    SOE_pred = torch.argmax(SOE_probs_, axis=1).to(device)
    
    
    

    initial_predictions_probs = torch.where(min_error_rate_indices.unsqueeze(-1) == 0, probs_expert1,
                                           torch.where(min_error_rate_indices.unsqueeze(-1) == 1, probs_expert2, probs_expert3))
    
    uce_expert_SPE, _, _, _,_,_ =compute_uce(initial_predictions_probs, targets)
    uce_expert_SOE, _, _, _,_,_ =compute_uce(SOE_probs_, targets)
    uce_expert_POE, _, _, _,_,_ =compute_uce(POE_probs, targets)
    print("SPE_UCE: ",uce_expert_SPE,"SOE_UCE: ",uce_expert_SOE,"POE_UCE: ",uce_expert_POE)
    return final_predictions

In [6]:
def ensemble_learning(models, data_loaders, criterion, device, phase,mc_dropout=False, T=1):
    
    global val_uce_list_ep1
    global val_uce_list_ep2
    global val_uce_list_ep3
    global save_name
    
    model_1, model_2, model_3 = models['model_3'], models['model_4'], models['model_5']

    model_1.eval()
    model_2.eval()
    model_3.eval()
    
    if mc_dropout:
        enable_dropout(model_1)  
        enable_dropout(model_2)  
        enable_dropout(model_3)  

    data_loader_1, data_loader_2, data_loader_3 = data_loaders['data_loader_3'][phase], data_loaders['data_loader_4'][phase], data_loaders['data_loader_5'][phase]

    running_loss_1 = 0.0
    running_loss_2 = 0.0
    running_loss_3 = 0.0
    
    probas_1 = []
    probas_2 = []
    probas_3 = []
    all_preds_1 = []
    all_preds_2 = []
    all_preds_3 = []
    all_labels_1 = []
    all_labels_2 = []
    all_labels_3 = []

    with torch.no_grad():
        if mc_dropout:
            print("use MC")
            for data_1, labels_1 in data_loader_1:
                data_1 = data_1.to(device)
                labels_1 = labels_1.to(device)

                outputs_1 = torch.zeros_like(model_1(data_1))  # create a zero tensor of the same size as the model output
                for _ in range(T):
                    outputs_1 += model_1(data_1)
                outputs_1 /= T  # take the average over T runs

                probas_1.append(F.softmax(outputs_1, dim=1).cpu())
                all_preds_1.append(outputs_1.argmax(dim=1).cpu())
                all_labels_1.append(labels_1.cpu())

            for data_2, labels_2 in data_loader_2:
                data_2 = data_2.to(device)
                labels_2 = labels_2.to(device)

                outputs_2 = torch.zeros_like(model_2(data_2))  # create a zero tensor of the same size as the model output
                for _ in range(T):
                    outputs_2 += model_2(data_2)
                outputs_2 /= T  # take the average over T runs

                probas_2.append(F.softmax(outputs_2, dim=1).cpu())
                all_preds_2.append(outputs_2.argmax(dim=1).cpu())
                all_labels_2.append(labels_2.cpu())

            for data_3, labels_3 in data_loader_3:
                data_3 = data_3.to(device)
                labels_3 = labels_3.to(device)

                outputs_3 = torch.zeros_like(model_3(data_3))  # create a zero tensor of the same size as the model output
                for _ in range(T):
                    outputs_3 += model_3(data_3)
                outputs_3 /= T  # take the average over T runs

                probas_3.append(F.softmax(outputs_3, dim=1).cpu())
                all_preds_3.append(outputs_3.argmax(dim=1).cpu())
                all_labels_3.append(labels_3.cpu())
        else:
            print("use no MC")
            for data_1, labels_1,_ in data_loader_1:
                data_1 = data_1.to(device)
                labels_1 = labels_1.to(device)

                outputs_1 = model_1(data_1)
                probas_1.append(F.softmax(outputs_1, dim=1).cpu())
                all_preds_1.append(outputs_1.argmax(dim=1).cpu())
                all_labels_1.append(labels_1.cpu())

            for data_2, labels_2,_ in data_loader_2:
                data_2 = data_2.to(device)
                labels_2 = labels_2.to(device)

                outputs_2 = model_2(data_2)
                probas_2.append(F.softmax(outputs_2, dim=1).cpu())
                all_preds_2.append(outputs_2.argmax(dim=1).cpu())
                all_labels_2.append(labels_2.cpu())

            for data_3, labels_3,_ in data_loader_3:
                data_3 = data_3.to(device)
                labels_3 = labels_3.to(device)

                outputs_3 = model_3(data_3)
                probas_3.append(F.softmax(outputs_3, dim=1).cpu())
                all_preds_3.append(outputs_3.argmax(dim=1).cpu())
                all_labels_3.append(labels_3.cpu())

    probas_1 = torch.cat(probas_1)
    probas_2 = torch.cat(probas_2)
    probas_3 = torch.cat(probas_3)

    all_preds_1 = torch.cat(all_preds_1).numpy()
    all_preds_2 = torch.cat(all_preds_2).numpy()
    all_preds_3 = torch.cat(all_preds_3).numpy()

    all_labels_1 = torch.cat(all_labels_1).numpy()
    all_labels_2 = torch.cat(all_labels_2).numpy()
    all_labels_3 = torch.cat(all_labels_3).numpy()

    accuracy_1 = (all_labels_1 == all_preds_1).mean()
    accuracy_2 = (all_labels_2 == all_preds_2).mean()
    accuracy_3 = (all_labels_3 == all_preds_3).mean()

    
    print(phase+"acc_1: ",accuracy_1,phase+"acc_2: ",accuracy_2,phase+"acc_3: ",accuracy_3)
    
    calibration_error = 0
    

    probs_1 = probas_1.cuda()
    probs_2 = probas_2.cuda()
    probs_3 = probas_3.cuda()
    
    all_labels_1 = torch.from_numpy(all_labels_1).cuda()
    all_labels_2 = torch.from_numpy(all_labels_2).cuda()
    all_labels_3 = torch.from_numpy(all_labels_3).cuda()
    all_labels = all_labels_1
    
    all_preds_1 = torch.from_numpy(all_preds_1).cuda()
    all_preds_2 = torch.from_numpy(all_preds_2).cuda()
    all_preds_3 = torch.from_numpy(all_preds_3).cuda()
    
    if phase == "val" or 'train':
        
        uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_1,all_labels_1)
        val_uce_list_ep1.append(uce_expert1)
        val_uce_list_ep1.append(bin_uncertainties_expert1)
        val_uce_list_ep1.append(bin_errors_expert1)
        val_uce_list_ep1.append(prop_in_bin_values_expert1)
        val_uce_list_ep1.append(bin_n_samples_ep1)
        val_uce_list_ep1.append(bin_variances_ep1)
        
        uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_2, all_labels_2)
        val_uce_list_ep2.append(uce_expert2)
        val_uce_list_ep2.append(bin_uncertainties_expert2)
        val_uce_list_ep2.append(bin_errors_expert2)
        val_uce_list_ep2.append(prop_in_bin_values_expert2)
        val_uce_list_ep2.append(bin_n_samples_ep2)
        val_uce_list_ep2.append(bin_variances_ep2)
        
        uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3 = compute_uce(probs_3, all_labels_3)
        val_uce_list_ep3.append(uce_expert3)
        val_uce_list_ep3.append(bin_uncertainties_expert3)
        val_uce_list_ep3.append(bin_errors_expert3)
        val_uce_list_ep3.append(prop_in_bin_values_expert3)
        val_uce_list_ep3.append(bin_n_samples_ep3)
        val_uce_list_ep3.append(bin_variances_ep3)

    
    if phase == "test": 
        
        uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_1,all_labels_1)       
        uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_2, all_labels_2)
        uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3 = compute_uce(probs_3, all_labels_3)
        print(phase+" uce_expert1: ",float(uce_expert1),phase+" uce_expert2: ",float(uce_expert2),phase+" uce_expert3: ",float(uce_expert3))
        
#         plot_and_save_confusion_matrix(probs_1,all_labels_1,'plt/confusion_matrix/'+save_name+'1.svg')
#         plot_and_save_confusion_matrix(probs_2,all_labels_2,'plt/confusion_matrix/'+save_name+'2.svg')
#         plot_and_save_confusion_matrix(probs_3,all_labels_3,'plt/confusion_matrix/'+save_name+'3.svg')
        
        plot_dot_UCE_diagram( uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1,1)
        plot_dot_UCE_diagram( uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2,2)
        plot_dot_UCE_diagram( uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3,3)
        
        
        weight_ep1 = 1.0
        weight_ep2 = 1.0
        weight_ep3 = 1.0
        
#         print(probs_1.size())
        table_pred_ep12 =   choose_best_expert_ex(probs_1, probs_2, all_labels,val_uce_list_ep1,val_uce_list_ep2,weight_ep1,weight_ep2)
#         print(table_pred_ep12.size(),all_labels.size())
        table_acc_ep12 = cal_accuracy(table_pred_ep12,all_labels)

        table_expert_ep13 = choose_best_expert_ex(probs_1, probs_3, all_labels,val_uce_list_ep1,val_uce_list_ep3,weight_ep1,weight_ep3)
        table_acc_ep13 = cal_accuracy(table_expert_ep13,all_labels)

        table_pred_ep23 =   choose_best_expert_ex(probs_2, probs_3, all_labels,val_uce_list_ep2,val_uce_list_ep3,weight_ep2,weight_ep3)
        table_acc_ep23 = cal_accuracy(table_pred_ep23,all_labels)

        # 計算3位專家綜合準確度
        tabel_pred_ep123 = choose_best_three_expert(probs_1,probs_2,probs_3,all_labels,val_uce_list_ep1,val_uce_list_ep2,val_uce_list_ep3)
        tabel_acc_ep123 = cal_accuracy(tabel_pred_ep123,all_labels)
        print("-----------------------------ensemble_learning-----------------------------------")
        print("12:",table_acc_ep12,"23",table_acc_ep23,"13",table_acc_ep13,"tabel_acc_ep123",tabel_acc_ep123)
        
        
        simple_voting_pred =voting(all_preds_1, all_preds_2, all_preds_3,3).cuda()
        simple_voting_acc_123 =cal_accuracy(simple_voting_pred,all_labels)
        
        simple_voting_pred =voting2(all_preds_1, all_preds_2,2).cuda()
        simple_voting_acc_12 =cal_accuracy(simple_voting_pred,all_labels)
        
        simple_voting_pred =voting2(all_preds_1, all_preds_3,1).cuda()
        simple_voting_acc_13 =cal_accuracy(simple_voting_pred,all_labels)
        
        simple_voting_pred =voting2(all_preds_2, all_preds_3,2).cuda()
        simple_voting_acc_23 =cal_accuracy(simple_voting_pred,all_labels)
        
        #加權投票，都不一樣選2
        weighted_voting_pred =weighted_voting(all_preds_1, all_preds_2, all_preds_3,[0.6,0.3,0.1],2).cuda()
        weighted_voting_acc_123 =cal_accuracy(weighted_voting_pred,all_labels)
    
        weighted_voting_pred =weighted_voting2(all_preds_1, all_preds_2, [0.8,0.2],2).cuda()
        weighted_voting_acc_12 =cal_accuracy(weighted_voting_pred,all_labels)
        
        weighted_voting_pred =weighted_voting2(all_preds_1, all_preds_3, [0.7,0.3],1).cuda()
        weighted_voting_acc_13 =cal_accuracy(weighted_voting_pred,all_labels)
        
        weighted_voting_pred =weighted_voting2(all_preds_2, all_preds_3, [0.8,0.2],1).cuda()
        weighted_voting_acc_23 =cal_accuracy(weighted_voting_pred,all_labels)
        
        #Product of Experts 綜合ep13
        def POE_acc(probs_1,probs_2,all_labels):
            POE_probs_ = torch.stack([probs_1, probs_2])
            POE_probs = product_of_experts(POE_probs_)
            POE_pred = torch.argmax(POE_probs, axis=1)
            POE_acc =cal_accuracy(POE_pred,all_labels)
            return POE_acc
        def SOE_acc(probs_1,probs_2,all_labels):
            SOE_probs = (probs_1+probs_2)/2
            SOE_pred = torch.argmax(SOE_probs, axis=1)
            SOE_acc =cal_accuracy(SOE_pred,all_labels)
            return SOE_acc
        POE_12=POE_acc(probs_1,probs_2,all_labels)
        POE_13=POE_acc(probs_1,probs_3,all_labels)
        POE_23=POE_acc(probs_2,probs_3,all_labels)
        
        SOE_12=SOE_acc(probs_1,probs_2,all_labels)
        SOE_13=SOE_acc(probs_1,probs_3,all_labels)
        SOE_23=SOE_acc(probs_2,probs_3,all_labels)
        
        POE_probs_ = torch.stack([probs_1, probs_2,probs_3])
        POE_probs = product_of_experts(POE_probs_)
        POE_pred = torch.argmax(POE_probs, axis=1)
        POE_123 =cal_accuracy(POE_pred,all_labels)
        
        SOE_probs = (probs_1+probs_2+probs_3)/3
        SOE_pred = torch.argmax(SOE_probs, axis=1)
        SOE_123 =cal_accuracy(SOE_pred,all_labels)
        
        
        print("simple_voting_acc: ")
        print("12",simple_voting_acc_12,"23",simple_voting_acc_23,"13",simple_voting_acc_13,"123",simple_voting_acc_123)
        print("weighted_voting_pred: ")
        print("12",weighted_voting_acc_12,"23",weighted_voting_acc_23,"13",weighted_voting_acc_13,"123",weighted_voting_acc_123)
        print("POE_12: ",POE_12,"POE_23: ",POE_23,"POE_13: ",POE_13,"POE_123: ",POE_123,)
        print("SOE_12: ",SOE_12,"SOE_23: ",SOE_23,"SOE_13: ",SOE_13,"SOE_123: ",SOE_123,)

        
        with open(save_name+'output_data.csv', 'a', newline='') as csvfile:  # 使用'a'模式以追加數據到文件
            csv_writer = csv.writer(csvfile)

            # 寫入標題
            csv_writer.writerow(['Type', '12', '13', '23', '123'])

            csv_writer.writerow(['table_acc', table_acc_ep12, table_acc_ep13, table_acc_ep23,tabel_acc_ep123])
            # 寫入simple_voting_acc資料

            # 寫入SOE資料
            csv_writer.writerow(['SOE', SOE_12, SOE_13, SOE_23, SOE_123]) 

            # 寫入POE資料
            csv_writer.writerow(['POE', POE_12, POE_13, POE_23, POE_123])

            csv_writer.writerow(['simple_voting_acc', simple_voting_acc_12, simple_voting_acc_13, simple_voting_acc_23, simple_voting_acc_123])

            # 寫入weighted_voting_pred資料
            csv_writer.writerow(['weighted_voting_pred', weighted_voting_acc_12, weighted_voting_acc_13, weighted_voting_acc_23, weighted_voting_acc_123])


            # 寫入準確度數據
            csv_writer.writerow(['accuracy', accuracy_1, accuracy_2, accuracy_3])

            # 寫入UCE數據
            csv_writer.writerow(['uce_expert', float(uce_expert1), float(uce_expert2), float(uce_expert3)])

        print("資料已存入output_data.csv")        
        
    accuracy = 0
    
    
        
        
        
    return calibration_error, accuracy

In [7]:
import os
def choose_best_three_expert(probs_expert1,probs_expert2,probs_expert3 ,targets,val_uce_list_ep1,val_uce_list_ep2,val_uce_list_ep3, n_bins=10):
    device = torch.device("cuda:0")
    
#     uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1, bin_variances_ep1 = compute_uce(probs_expert1, targets, n_bins)
#     uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2, bin_variances_ep2 = compute_uce(probs_expert2, targets_pairs, n_bins)
#     uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3, bin_variances_ep3 = compute_uce(probs_expert3, targets, n_bins)
    uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1,bin_variances_ep1 = val_uce_list_ep1[0],val_uce_list_ep1[1],val_uce_list_ep1[2],val_uce_list_ep1[3],val_uce_list_ep1[4],val_uce_list_ep1[5]
    uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2,bin_variances_ep2 = val_uce_list_ep2[0],val_uce_list_ep2[1],val_uce_list_ep2[2],val_uce_list_ep2[3],val_uce_list_ep2[4],val_uce_list_ep2[5]
    uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3,bin_variances_ep3 = val_uce_list_ep3[0],val_uce_list_ep3[1],val_uce_list_ep3[2],val_uce_list_ep3[3],val_uce_list_ep3[4],val_uce_list_ep3[5]



    # Compute uncertainties for both experts
    _, nattrs = probs_expert1.size()
    nattrs = torch.tensor(nattrs)
    uncertainties_expert1 = (1/torch.log(nattrs))*(-torch.sum(probs_expert1 * torch.log(probs_expert1 + 1e-12), dim=1))
    uncertainties_expert2 = (1/torch.log(nattrs))*(-torch.sum(probs_expert2 * torch.log(probs_expert2 + 1e-12), dim=1))
    uncertainties_expert3 = (1/torch.log(nattrs))*(-torch.sum(probs_expert3 * torch.log(probs_expert3 + 1e-12), dim=1))

    # Find error rates for both experts
    error_rates_expert1 = find_error_rates(uncertainties_expert1, bin_uncertainties_expert1, bin_errors_expert1)
    error_rates_expert2 = find_error_rates(uncertainties_expert2, bin_uncertainties_expert2, bin_errors_expert2)
    error_rates_expert3 = find_error_rates(uncertainties_expert3, bin_uncertainties_expert3, bin_errors_expert3)
    # Choose the expert with lower error rate for each sample

    # Get the predictions from both experts
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)
    preds_expert3 = torch.argmax(probs_expert3, dim=1)

    # 將三個錯誤率堆疊成一個張量
    error_rates = torch.stack([error_rates_expert1, error_rates_expert2, error_rates_expert3]).to(device)

    # 找出最小錯誤率的索引
    _, min_error_rate_indices = torch.min(error_rates, dim=0)

    # 根據最小錯誤率的索引選擇最終的預測
    final_predictions = torch.where(min_error_rate_indices == 0, preds_expert1,
                                    torch.where(min_error_rate_indices == 1, preds_expert2, preds_expert3))
    
    compute_mae_error_and_uncertainty(probs_expert1, probs_expert2, probs_expert3, targets,
    uncertainties_expert1, uncertainties_expert2, uncertainties_expert3, error_rates_expert1, error_rates_expert2, error_rates_expert3)
    
    
    
    global save_name
    df = analyze_errors(error_rates, final_predictions, targets)
    
    # Check if the directory 'df' exists, if not, create it
    if not os.path.exists('df'):
        os.makedirs('df')
    
    # Save the dataframe to a CSV file in the 'df' directory
    df.to_csv('df/'+save_name+'_error_analysis.csv', index=True)
    
    
    
    return final_predictions

std_deviation_per_position = None
error_rates = None
POE_pred = None
final_predictions = None

def choose_best_three_expert_new(probs_expert1,probs_expert2,probs_expert3 ,targets,val_uce_list_ep1,val_uce_list_ep2,val_uce_list_ep3, n_bins=10):
    global std_deviation_per_position
    global error_rates
    global threshold_
    global POE_pred
    global final_predictions
    uce_expert1, bin_uncertainties_expert1, bin_errors_expert1, prop_in_bin_values_expert1,bin_n_samples_ep1,bin_variances_ep1 = val_uce_list_ep1[0],val_uce_list_ep1[1],val_uce_list_ep1[2],val_uce_list_ep1[3],val_uce_list_ep1[4],val_uce_list_ep1[5]
    uce_expert2, bin_uncertainties_expert2, bin_errors_expert2, prop_in_bin_values_expert2,bin_n_samples_ep2,bin_variances_ep2 = val_uce_list_ep2[0],val_uce_list_ep2[1],val_uce_list_ep2[2],val_uce_list_ep2[3],val_uce_list_ep2[4],val_uce_list_ep2[5]
    uce_expert3, bin_uncertainties_expert3, bin_errors_expert3, prop_in_bin_values_expert3,bin_n_samples_ep3,bin_variances_ep3 = val_uce_list_ep3[0],val_uce_list_ep3[1],val_uce_list_ep3[2],val_uce_list_ep3[3],val_uce_list_ep3[4],val_uce_list_ep3[5]



    # Compute uncertainties for both experts
    _, nattrs = probs_expert1.size()
    nattrs = torch.tensor(nattrs)
    uncertainties_expert1 = (1/torch.log(nattrs))*(-torch.sum(probs_expert1 * torch.log(probs_expert1 + 1e-12), dim=1))
    uncertainties_expert2 = (1/torch.log(nattrs))*(-torch.sum(probs_expert2 * torch.log(probs_expert2 + 1e-12), dim=1))
    uncertainties_expert3 = (1/torch.log(nattrs))*(-torch.sum(probs_expert3 * torch.log(probs_expert3 + 1e-12), dim=1))

    # Find error rates for both experts
    error_rates_expert1 = find_error_rates(uncertainties_expert1, bin_uncertainties_expert1, bin_errors_expert1)
    error_rates_expert2 = find_error_rates(uncertainties_expert2, bin_uncertainties_expert2, bin_errors_expert2)
    error_rates_expert3 = find_error_rates(uncertainties_expert3, bin_uncertainties_expert3, bin_errors_expert3)
    # Choose the expert with lower error rate for each sample

    # Get the predictions from both experts
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)
    preds_expert3 = torch.argmax(probs_expert3, dim=1)

    # 將三個錯誤率堆疊成一個張量
    error_rates = torch.stack([error_rates_expert1, error_rates_expert2, error_rates_expert3])
    
    std_deviation_per_position = torch.std(error_rates, dim=0)
    mean_value = torch.mean(std_deviation_per_position)
    print("mean: ",mean_value)
#     print(std_deviation_per_position)
    # 找出最小錯誤率的索引
    _, min_error_rate_indices = torch.min(error_rates, dim=0)
    
    POE_probs_ = torch.stack([probs_expert1, probs_expert2,probs_expert3])
    POE_probs = product_of_experts(POE_probs_)
    POE_pred = np.argmax(POE_probs, axis=1)
    
    POE_pred = torch.tensor(POE_pred)  # Convert numpy array to torch tensor

    SOE_probs_ = (probs_expert1+probs_expert2+probs_expert3)/3
    SOE_pred = np.argmax(SOE_probs_, axis=1)

    # 根據最小錯誤率的索引選擇最終的預測
    final_predictions = torch.where(min_error_rate_indices == 0, preds_expert1,
                                      torch.where(min_error_rate_indices == 1, preds_expert2, preds_expert3))

    initial_predictions_probs = torch.where(min_error_rate_indices.unsqueeze(-1) == 0, probs_expert1,
                                           torch.where(min_error_rate_indices.unsqueeze(-1) == 1, probs_expert2, probs_expert3))
    
    POE_probs_ = torch.stack([POE_probs,SOE_probs_,initial_predictions_probs])
    POE_initial_predictions_probs = product_of_experts(POE_probs_)
    SOE_initial_predictions_probs = (SOE_probs_+POE_probs+initial_predictions_probs)/2
    
    POE_final_predictions = np.argmax(POE_initial_predictions_probs, axis=1)
    
    SOE_final_predictions = np.argmax(SOE_initial_predictions_probs, axis=1)
    
    POE_acc =accuracy(POE_final_predictions,targets)
    SOE_acc =accuracy(SOE_final_predictions,targets)
    print("POE_SPE_acc: ",POE_acc,"SOE_SPE_acc: ",SOE_acc)
#     threshold = torch.quantile(std_deviation_per_position, threshold_)

#     threshold = torch.quantile(uncertainties_, threshold_)
#     print("threshold: ",threshold)
    # 根據 std_deviation_per_position 更新預測
#     final_predictions = torch.where(std_deviation_per_position < threshold, POE_pred, initial_predictions)
#     final_predictions = torch.where(std_deviation_per_position < threshold, initial_predictions , POE_pred)

    return SOE_final_predictions

def compute_uce(probs, targets, n_bins=10):
    _, nattrs =probs.size()
    nattrs = torch.tensor(nattrs)
    bin_boundaries = np.linspace(0, 1, n_bins + 1)
    bin_lowers = bin_boundaries[:-1]
    bin_uppers = bin_boundaries[1:]
    uce = 0
    bin_uncertainties = []
    bin_errors = []
    prop_in_bin_values = []
    bin_n_samples = []
    bin_variances = []
    # Compute the uncertainty values (entropy)
    uncertainties = (1/torch.log(nattrs))*(-torch.sum(probs * torch.log(probs + 1e-12), dim=1))
    for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
        in_bin = (uncertainties >= bin_lower) * (uncertainties < bin_upper)
        prop_in_bin = in_bin.float().mean()
        prop_in_bin_values.append(prop_in_bin.item() if prop_in_bin.item() > 0 else None)
        if prop_in_bin.item() > 0:
            sample_indices = torch.where(in_bin)[0]
            bin_targets = targets[sample_indices]
            bin_probs = probs[sample_indices]
            error_in_bin = (bin_targets != torch.argmax(bin_probs, dim=1)).float().mean()
            avg_uncertainty_in_bin = uncertainties[in_bin].mean()
            uce += torch.abs(avg_uncertainty_in_bin - error_in_bin) * prop_in_bin
            bin_uncertainties.append(avg_uncertainty_in_bin.item())
            bin_errors.append(error_in_bin.item())
            n_samples_in_bin = sample_indices.size(0)
            bin_n_samples.append(n_samples_in_bin)
            bin_variances.append(torch.var((bin_targets != torch.argmax(bin_probs, dim=1)).float()).item())
        else:
            bin_uncertainties.append(None)
            bin_errors.append(None)
            bin_n_samples.append(None)
            bin_variances.append(None)

    return uce, bin_uncertainties, bin_errors, prop_in_bin_values, bin_n_samples, bin_variances


def compute_mae_error_and_uncertainty(probs_expert1, probs_expert2, probs_expert3, targets, uncertainties_expert1, uncertainties_expert2, uncertainties_expert3, error_rates_expert1, error_rates_expert2, error_rates_expert3):
    device = torch.device("cuda:0")
    avg_uncertainty = (uncertainties_expert1 + uncertainties_expert2 + uncertainties_expert3) / 3
    error_rates_expert1 = error_rates_expert1.to(device)
    error_rates_expert2 = error_rates_expert2.to(device)
    error_rates_expert3 = error_rates_expert3.to(device)
    
    # Get the predictions from each expert
    preds_expert1 = torch.argmax(probs_expert1, dim=1)
    preds_expert2 = torch.argmax(probs_expert2, dim=1)
    preds_expert3 = torch.argmax(probs_expert3, dim=1)

    # Calculate the real error rates
    real_error_expert1 = (preds_expert1 != targets).float().mean()
    real_error_expert2 = (preds_expert2 != targets).float().mean()
    real_error_expert3 = (preds_expert3 != targets).float().mean()
    
    
    # Calculate MAE for error rates
    mae_error_expert1 = torch.abs(error_rates_expert1 - real_error_expert1).mean()
    mae_error_expert2 = torch.abs(error_rates_expert2 - real_error_expert2).mean()
    mae_error_expert3 = torch.abs(error_rates_expert3 - real_error_expert3).mean()
    

#     # Calculate MAE for uncertainties using average uncertainty as the reference
#     mae_uncertainty_expert1 = torch.abs(uncertainties_expert1 - avg_uncertainty).mean()
#     mae_uncertainty_expert2 = torch.abs(uncertainties_expert2 - avg_uncertainty).mean()
#     mae_uncertainty_expert3 = torch.abs(uncertainties_expert3 - avg_uncertainty).mean()

    # Print results
    print("MAE Error Expert 1:", mae_error_expert1.item())
    print("MAE Error Expert 2:", mae_error_expert2.item())
    print("MAE Error Expert 3:", mae_error_expert3.item())
#     print("MAE Uncertainty Expert 1:", mae_uncertainty_expert1.item())
#     print("MAE Uncertainty Expert 2:", mae_uncertainty_expert2.item())
#     print("MAE Uncertainty Expert 3:", mae_uncertainty_expert3.item())
    
    
def analyze_errors(error_rates, final_predictions, targets, n_samples=500):
    samples_idx = torch.randint(0, len(targets), (n_samples,))
    
    expert1_error_rates = error_rates[0][samples_idx].tolist()
    expert2_error_rates = error_rates[1][samples_idx].tolist()
    expert3_error_rates = error_rates[2][samples_idx].tolist()
    
    spe_results = final_predictions[samples_idx].tolist()
    real_results = targets[samples_idx].tolist()
    
    # Checking if SPE results are correct
    spe_is_correct = [1 if spe_results[i] == real_results[i] else 0 for i in range(n_samples)]
    
    data = {
        "專家1錯誤率": expert1_error_rates,
        "專家2錯誤率": expert2_error_rates,
        "專家3錯誤率": expert3_error_rates,
        "SPE選擇結果": spe_results,
        "真實結果": real_results,
        "SPE結果是否正確": spe_is_correct
    }
    
    df = pd.DataFrame(data, index=[f"樣本{i+1}" for i in range(n_samples)])
    
    # Analyze the variance
    correct_predictions = df[df["SPE結果是否正確"] == 1]
    incorrect_predictions = df[df["SPE結果是否正確"] == 0]
    
    variance_correct = (correct_predictions[["專家1錯誤率", "專家2錯誤率", "專家3錯誤率"]].var(axis=1).mean())
    variance_incorrect = (incorrect_predictions[["專家1錯誤率", "專家2錯誤率", "專家3錯誤率"]].var(axis=1).mean())
    
    print("Average variance for correctly predicted samples:", variance_correct)
    print("Average variance for incorrectly predicted samples:", variance_incorrect)
    
    return df


In [8]:
k_fold = "1_"
save_name = '1_focal_best'
batch_size = 512
num_workers = 4
imb_ratio = 50

# 建立一個字典來存儲每個數據加載器
data_loaders = {}

val_uce_list_ep1 = []
val_uce_list_ep2 = []
val_uce_list_ep3 = []


for seed in [3,4,5]:
    print(f"Training model with different split = {seed}")

    trainloader, valloader, testloader = create_dataloaders(seed, batch_size,imb_ratio)        
    
    # 存儲進字典，作為該模型的數據加載器
    data_loaders[f'data_loader_{seed}'] = {
        'train': trainloader, 
        'val': valloader, 
        'test': testloader
    }


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# seeds = [3, 4]
dropout_rate = 0.5

criterion = nn.CrossEntropyLoss()

models = {}  # Store models in a dictionary

num_epochs = 10
save_model = 1

seeds = [3,4,5]
for seed in seeds:
    print(f"\nLoading and testing model with seed = {seed} and dropout_rate = {dropout_rate}")
    model = create_model(mc_dropout=False, dropout_rate=dropout_rate)  # Assuming same model structure
    model = model.to(device)
    model_path = f"./model/"+save_model +"_best_model_seed_{seed}.pt"
    model = load_model(model, model_path)  # Load from saved model
    models[f"model_{seed}"] = model  # Store the model in the dictionary with a unique key

    

print("\nTesting on validation set:")
val_calibration_error, val_accuracy = ensemble_learning(models, data_loaders, criterion, device,"train",mc_dropout=False)
print(f"Validation Calibration Error: {val_calibration_error}, Validation Accuracy: {val_accuracy}\n")

print("\nTesting on test set:")
print("------------------------epochs",num_epochs ,"------------------------------------------")
print("dropout_rate: ",dropout_rate)
test_calibration_error, test_accuracy = ensemble_learning(models, data_loaders, criterion, device,"test",mc_dropout=False)
print(f"Test Calibration Error: {test_calibration_error}, Test Accuracy: {test_accuracy}\n")



Training model with different split = 3
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_test.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
test Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Training model with different split = 4
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val 

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "



Loading and testing model with seed = 4 and dropout_rate = 0.5

Loading and testing model with seed = 5 and dropout_rate = 0.5

Testing on validation set:
use no MC
trainacc_1:  0.7344241211774792 trainacc_2:  0.7229208345241498 trainacc_3:  0.7219205487282081
Validation Calibration Error: 0, Validation Accuracy: 0


Testing on test set:
------------------------epochs 20 ------------------------------------------
dropout_rate:  0.5
use no MC
testacc_1:  0.6207 testacc_2:  0.6195 testacc_3:  0.6474
test uce_expert1:  0.11675038933753967 test uce_expert2:  0.05922088027000427 test uce_expert3:  0.23270516097545624
MAE Error Expert 1: 0.22738304734230042
MAE Error Expert 2: 0.2504487633705139
MAE Error Expert 3: 0.19599711894989014
Average variance for correctly predicted samples: 0.011073343945574503
Average variance for incorrectly predicted samples: 0.01119238963279057
-----------------------------ensemble_learning-----------------------------------
12: 0.6369 23 0.6536 13 0.6521 tabe



simple_voting_acc: 
12 0.6195 23 0.6474 13 0.6207 123 0.6474
weighted_voting_pred: 
12 0.6207 23 0.6195 13 0.6207 123 0.6207
POE_12:  0.6277 POE_23:  0.6484 POE_13:  0.632 POE_123:  0.6357
SOE_12:  0.6301 SOE_23:  0.6476 SOE_13:  0.6331 SOE_123:  0.6379
資料已存入output_data.csv
Test Calibration Error: 0, Test Accuracy: 0



In [15]:
k_fold = "2_"
save_name = '2_focal_best'
batch_size = 512
num_workers = 4
imb_ratio = 50

# 建立一個字典來存儲每個數據加載器
data_loaders = {}

val_uce_list_ep1 = []
val_uce_list_ep2 = []
val_uce_list_ep3 = []


for seed in [3,4,5]:
    print(f"Training model with different split = {seed}")

    trainloader, valloader, testloader = create_dataloaders(seed, batch_size,imb_ratio)        
    
    # 存儲進字典，作為該模型的數據加載器
    data_loaders[f'data_loader_{seed}'] = {
        'train': trainloader, 
        'val': valloader, 
        'test': testloader
    }


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# seeds = [3, 4]
dropout_rate = 0.5

criterion = nn.CrossEntropyLoss()

models = {}  # Store models in a dictionary

num_epochs = 10
save_model = 1

seeds = [3,4,5]
for seed in seeds:
    print(f"\nLoading and testing model with seed = {seed} and dropout_rate = {dropout_rate}")
    model = create_model(mc_dropout=False, dropout_rate=dropout_rate)  # Assuming same model structure
    model = model.to(device)
    model_path = f"./model/"+save_model +"_best_model_seed_{seed}.pt"
    model = load_model(model, model_path)  # Load from saved model
    models[f"model_{seed}"] = model  # Store the model in the dictionary with a unique key

    

print("\nTesting on validation set:")
val_calibration_error, val_accuracy = ensemble_learning(models, data_loaders, criterion, device,"train",mc_dropout=False)
print(f"Validation Calibration Error: {val_calibration_error}, Validation Accuracy: {val_accuracy}\n")

print("\nTesting on test set:")
print("------------------------epochs",num_epochs ,"------------------------------------------")
print("dropout_rate: ",dropout_rate)
test_calibration_error, test_accuracy = ensemble_learning(models, data_loaders, criterion, device,"test",mc_dropout=False)
print(f"Test Calibration Error: {test_calibration_error}, Test Accuracy: {test_accuracy}\n")



Training model with different split = 3
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_test.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
test Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Training model with different split = 4
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val 



simple_voting_acc: 
12 0.7276 23 0.6149 13 0.6297 123 0.6149
weighted_voting_pred: 
12 0.6297 23 0.7276 13 0.6297 123 0.6297
POE_12:  0.6813 POE_23:  0.7101 POE_13:  0.6433 POE_123:  0.6809
SOE_12:  0.683 SOE_23:  0.7162 SOE_13:  0.6431 SOE_123:  0.6906
資料已存入output_data.csv
Test Calibration Error: 0, Test Accuracy: 0



In [16]:
k_fold = "3_"
save_name = '3_focal_best'
batch_size = 512
num_workers = 4
imb_ratio = 50

# 建立一個字典來存儲每個數據加載器
data_loaders = {}

val_uce_list_ep1 = []
val_uce_list_ep2 = []
val_uce_list_ep3 = []


for seed in [3,4,5]:
    print(f"Training model with different split = {seed}")

    trainloader, valloader, testloader = create_dataloaders(seed, batch_size,imb_ratio)        
    
    # 存儲進字典，作為該模型的數據加載器
    data_loaders[f'data_loader_{seed}'] = {
        'train': trainloader, 
        'val': valloader, 
        'test': testloader
    }


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# seeds = [3, 4]
dropout_rate = 0.5

criterion = nn.CrossEntropyLoss()

models = {}  # Store models in a dictionary

num_epochs = 10
save_model = 1

seeds = [3,4,5]
for seed in seeds:
    print(f"\nLoading and testing model with seed = {seed} and dropout_rate = {dropout_rate}")
    model = create_model(mc_dropout=False, dropout_rate=dropout_rate)  # Assuming same model structure
    model = model.to(device)
    model_path = f"./model/"+save_model +"_best_model_seed_{seed}.pt"
    model = load_model(model, model_path)  # Load from saved model
    models[f"model_{seed}"] = model  # Store the model in the dictionary with a unique key

    

print("\nTesting on validation set:")
val_calibration_error, val_accuracy = ensemble_learning(models, data_loaders, criterion, device,"train",mc_dropout=False)
print(f"Validation Calibration Error: {val_calibration_error}, Validation Accuracy: {val_accuracy}\n")

print("\nTesting on test set:")
print("------------------------epochs",num_epochs ,"------------------------------------------")
print("dropout_rate: ",dropout_rate)
test_calibration_error, test_accuracy = ensemble_learning(models, data_loaders, criterion, device,"test",mc_dropout=False)
print(f"Test Calibration Error: {test_calibration_error}, Test Accuracy: {test_accuracy}\n")



Training model with different split = 3
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_test.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
test Mode: Contain 10000 images
10000
No sampler.
Shuffle is False.
Training model with different split = 4
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_train.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
train Mode: Contain 13996 images
13996
No sampler.
Shuffle is True.
Loading data from ./data/CIFAR10_LT/CIFAR10_LT_val.txt
====> CIFAR10 Imbalance Ratio:  50
Files already downloaded and verified
val 



simple_voting_acc: 
12 0.7361 23 0.6522 13 0.6233 123 0.6522
weighted_voting_pred: 
12 0.6233 23 0.7361 13 0.6233 123 0.6233
POE_12:  0.6924 POE_23:  0.7314 POE_13:  0.6427 POE_123:  0.6951
SOE_12:  0.6973 SOE_23:  0.738 SOE_13:  0.6433 SOE_123:  0.7027
資料已存入output_data.csv
Test Calibration Error: 0, Test Accuracy: 0

