In [None]:
import os
import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Function
import torchvision
import time
import numpy as np
import matplotlib.pyplot as plt
import torch.utils.data as util_data
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import torchvision.datasets as dsets
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torchvision import models
import cv2
import time

import csv
import json
import pandas as pd
import matplotlib.pyplot as plt
import shutil
from typing import Any
import random
import torchvision

In [None]:
dataset_root = '/kaggle/input/palm6000/plam_6000'
dataset_train_root = os.path.join(dataset_root, 'train')
dataset_val_root = os.path.join(dataset_root, 'test/')
dataset_all_dataset_root = os.path.join(dataset_root, 'all_dataset')
num_workers = 8
num_threads =8 
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
num_epochs =200
# epoch_lr_decrease = 100
learning_rate = 0.004
encode_length =256
num_classes =600


In [None]:

class SessionDataset(Dataset):   
    def __init__(self, root, transform=None):    
        self.labels = np.array([int(x.split('_')[0]) for x in os.listdir(path=root)])  
        self.image_files = np.array([x.path for x in os.scandir(path=root)])     
        self.transform = transform  
    def __getitem__(self, index):       
        img = cv2.imread(self.image_files[index])        
        label = self.labels[index]
        if self.transform:
            img = self.transform(img)
        
        return img, label
    
    
    def __len__(self):
        return np.size(self.image_files)
        

In [None]:
data_transform = {
    "train": transforms.Compose([
        transforms.ToPILImage(),
#         transforms.RandomResizedCrop((224, 224)),
        transforms.RandomResizedCrop((128, 128)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),#转为Tensor
        transforms.Normalize((0.51568358, 0.51568358, 0.51568358), (0.10332626, 0.10332626, 0.10332626)),#标准化处理
        
    ]),
    "val": transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.51568358, 0.51568358, 0.51568358), (0.10332626, 0.10332626, 0.10332626))

    ]),
    "all_dataset": transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.51568358, 0.51568358, 0.51568358), (0.10332626, 0.10332626, 0.10332626))

    ]),
    
}

In [None]:
# 
train_batch_size = 64#64
val_batch_size = 64

session_train_dataset = SessionDataset(root=dataset_train_root,
                                    transform=data_transform['train'])

train_loader = DataLoader(session_train_dataset,
                         batch_size=train_batch_size,
                         shuffle=True,
                         num_workers=num_workers)
session_val_dataset = SessionDataset(root=dataset_val_root,
                                    transform=data_transform['val'])
val_loader = DataLoader(session_val_dataset,
                         batch_size=val_batch_size,
                         shuffle=False,
                         num_workers=num_workers)
sessiondatabaseDataset = SessionDataset(root=dataset_all_dataset_root,
                                    transform=data_transform['all_dataset'])
database_loader = DataLoader(sessiondatabaseDataset,
                         batch_size=train_batch_size,
                         shuffle=False,
                         num_workers=num_workers)
train_num = len(session_train_dataset)
val_num = len(session_val_dataset)
datasetloader_num = len(sessiondatabaseDataset)
print(train_num, val_num,datasetloader_num)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define the basic residual block
class BasicBlock(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * self.expansion)
            )

    def forward(self, x):
        out = self.bn1(self.conv1(x))
        out = F.relu(out)
        out = self.bn2(self.conv2(out))
        out = F.relu(out)
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

# Define the ResNet-50 model
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        layers = []
        layers.append(block(self.inplanes, planes, stride))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

# Create an instance of the ResNet-50 model
def original_resnet50(pretrained=False, progress=True, **kwargs):
    return ResNet(BasicBlock, [3, 4, 6, 3], **kwargs)

In [None]:
import torch
import torch.nn as nn

# Define the VGG block
class VGGBlock(nn.Module):
    def __init__(self, in_channels, out_channels, num_conv_layers):
        super(VGGBlock, self).__init__()
        layers = []
        for _ in range(num_conv_layers):
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            layers.append(nn.ReLU(inplace=True))
            in_channels = out_channels
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)

# Define the VGG model
class VGGNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(VGGNet, self).__init__()
        self.features = nn.Sequential(
            VGGBlock(3, 64, 2),
            VGGBlock(64, 128, 2),
            VGGBlock(128, 256, 3),
            VGGBlock(256, 512, 3),
            VGGBlock(512, 512, 3)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Create an instance of the VGGNet model
def vggnet(pretrained=False, **kwargs):
    model = VGGNet(**kwargs)
    return model


In [None]:
import torch
from torch import nn

__all__ = ['iresnet50_cbam']

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class CBAM(nn.Module):
    def __init__(self, channels, reduction=16):
        super(CBAM, self).__init__()
        self.channel_gate = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels // reduction, channels, kernel_size=1),
            nn.Sigmoid()
        )
        self.spatial_gate = nn.Sequential(
            nn.Conv2d(channels, channels // reduction, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels // reduction, channels, kernel_size=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel-wise attention
        channel_avg = self.channel_gate(x)
        x = x * channel_avg

        # Spatial-wise attention
        channel_max = self.spatial_gate(x)
        x = x * channel_max

        return x

class IBasicBlock(nn.Module):
    expansion = 1
    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1):
        super(IBasicBlock, self).__init__()
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        self.bn1 = nn.BatchNorm2d(inplanes, eps=1e-05,)
        self.conv1 = conv3x3(inplanes, planes)
        self.bn2 = nn.BatchNorm2d(planes, eps=1e-05,)
        self.prelu = nn.PReLU(planes)
        self.conv2 = conv3x3(planes, planes, stride)
        self.bn3 = nn.BatchNorm2d(planes, eps=1e-05,)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x
        out = self.bn1(x)
        out = self.conv1(out)
        out = self.bn2(out)
        out = self.prelu(out)
        out = self.conv2(out)
        out = self.bn3(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        return out

class IResNet(nn.Module):
    fc_scale = 7 * 7

    def __init__(self, block, layers, dropout=0, num_features=512, zero_init_residual=False, groups=1, width_per_group=64, replace_stride_with_dilation=None, fp16=False):
        super(IResNet, self).__init__()
        self.fp16 = fp16
        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.inplanes, eps=1e-05)
        self.prelu = nn.PReLU(self.inplanes)
        self.layer1 = self._make_layer(block, 64, layers[0], stride=2)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        self.bn2 = nn.BatchNorm2d(512 * block.expansion, eps=1e-05,)
        self.dropout = nn.Dropout(p=dropout, inplace=True)
        self.cbam = CBAM(512 * block.expansion)
        self.fc = nn.Linear(4 * 512 * block.expansion * self.fc_scale, num_features)
        self.features = nn.BatchNorm1d(num_features, eps=1e-05)
        nn.init.constant_(self.features.weight, 1.0)
        self.features.weight.requires_grad = False

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight, 0, 0.1)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, IBasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                nn.BatchNorm2d(planes * block.expansion, eps=1e-05, ),
            )
        layers = []
        layers.append(
            block(self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(self.inplanes, planes, groups=self.groups, base_width=self.base_width, dilation=self.dilation))

        return nn.Sequential(*layers)

    def forward(self, x):
        with torch.cuda.amp.autocast(self.fp16):
            x = self.conv1(x)
            x = self.bn1(x)
            x = self.prelu(x)
            x = self.layer1(x)
            x = self.layer2(x)
            x = self.layer3(x)
            x = self.layer4(x)
            x = self.bn2(x)
            x = self.dropout(x)
            x = self.cbam(x)
            x = torch.flatten(x, 1)
        x = self.fc(x.float() if self.fp16 else x)
        x = self.features(x)
        return x

def iresnet50_cbam(pretrained=False, progress=True, **kwargs):
    return IResNet(IBasicBlock, [3, 4, 14, 3], **kwargs)

 

In [None]:
import numpy as np

 


def logistic(x, k=1, x0=0):
    """
    Logistic function: L / (1 + exp(-k * (x - x0)))
    
    Parameters:
    - x: Input values
    - k: Steepness of the curve (default: 1)
    - x0: x-value of the sigmoid's midpoint (default: 0)
    
    Returns:
    - Result of the logistic function
    """
    return 1 / (1 + np.exp(-k * (x - x0)))


def new_dropout(x, level=0.5):  
    if level < 0. or level >= 1:
        raise Exception('Dropout level must be in interval [0, 1].')  
    retain_prob = 1. - level  

    sample = torch.from_numpy(np.array(logistic(0.3, 100))).to(device)


    
    x *=sample          
    
    x /= retain_prob   
    return x



In [None]:

class SecureRandomProjectionLayer(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(SecureRandomProjectionLayer, self).__init__()
        self.random_projection_matrix = nn.Parameter(torch.randn(input_dim, output_dim) * 0.01, requires_grad=False)


    def forward(self, x):
        # Apply secure random projection
        transformed_data = torch.matmul(x, self.random_projection_matrix)
        return transformed_data
# new layer
class hash(Function):
    @staticmethod
    def forward(ctx, input):

        return torch.sign(input)    
    @staticmethod
    def backward(ctx, grad_output):
        #input,  = ctx.saved_tensors
        #grad_output = grad_output.data
        return grad_output
def hash_layer(input):
    return hash.apply(input)


class CNN_ResNet(nn.Module):
    def __init__(self, encode_length, num_classes ):
        super(CNN_ResNet, self).__init__()
        self.res = original_resnet50(pretrained=False, num_features =60).to(device)
        
        self.res.fc = nn.Linear(in_features=2048, out_features=1024)
        self.secure_projection = SecureRandomProjectionLayer(1024,1024)
      
        self.fc_plus = nn.Linear(1024, encode_length) 
        
        # Add the secure random projection layer
        
        
        self.fc = nn.Linear( encode_length, num_classes, bias=False)

    def forward(self, x):
        x = self.res(x)
        
        x = new_dropout(x)
        
        # Apply the secure random projection before x = self.fc_plus(x)
        x = self.secure_projection(x)
               
        if x.dim() != 3:
            x = x.view(x.size(0), -1, 1)

        # Get the dimensions
        n, j = x.shape[1], x.shape[2]

        # Reshape x to have dimensions (batch_size, n, j)
        x = x.view(x.size(0), n, j)

        # Find the maximum element in each subspace
        max_values, _ = x.max(dim=2)

 
        x = self.fc_plus(max_values)

        code = hash_layer(x)

   
        output = self.fc(code)
        
        return output, x, code
    
#net = CNN_ResNet(encode_length=encode_length, num_classes=num_classes).to(device)
print('ok')

In [None]:
# new layer
 
class STEBinarizationLayer(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        # Using the STE method for binarization during forward pass
        ctx.save_for_backward(input)
        return torch.sign(input)

    @staticmethod
    def backward(ctx, grad_output):
        # During backward pass, use a straight-through estimator (STE)
        input, = ctx.saved_tensors
        grad_input = grad_output.clone()
        grad_input[input > 1] = 0
        grad_input[input < -1] = 0
        return grad_input

def ste_binarization_layer(input):
    return STEBinarizationLayer.apply(input)

class CNN_ResNet(nn.Module):
    def __init__(self, encode_length, num_classes):
        super(CNN_ResNet, self).__init__()
        self.res = original_resnet50(pretrained=False, num_classes=1000).to(device)
        self.res.fc = nn.Linear(in_features=2048, out_features=1000, bias=True)
        self.fc_plus = nn.Linear(1000, encode_length)
        self.fc = nn.Linear(encode_length, num_classes, bias=False)

    def forward(self, x):
        x = self.res(x)
        x = self.fc_plus(x)
        # Use STE for binarization
        code = ste_binarization_layer(x)
        output = self.fc(code)
        return output, x, code

# Example usage
#net = CNN_ResNet(encode_length=encode_length, num_classes=num_classes).to(device)
print('ok')


In [None]:
 
# new layer
class hash(Function):
    @staticmethod
    def forward(ctx, input):

        return torch.sign(input)    
    @staticmethod
    def backward(ctx, grad_output):
        #input,  = ctx.saved_tensors
        #grad_output = grad_output.data
        return grad_output
def hash_layer(input):
    return hash.apply(input)


    
    
class CNN_ResNet(nn.Module):
    def __init__(self, encode_length, num_classes):
        super(CNN_ResNet, self).__init__()
        self.res =vggnet(pretrained=False,num_classes=1000).to(device)
    
        self.res.fc = nn.Linear(in_features=2048, out_features=1000)
      
        self.fc_plus = nn.Linear(1000, encode_length) 
        
        self.fc = nn.Linear(encode_length, num_classes, bias=False)

    def forward(self, x):
        
        x = self.res(x)
        x = new_dropout(x)
        x = self.fc_plus(x)

        code = hash_layer(x)
   
        output = self.fc(code)
        
  
        return output, x, code
    
net = CNN_ResNet(encode_length=encode_length, num_classes=num_classes).to(device)
print('ok')

In [None]:

criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, weight_decay=5e-4)

In [None]:
def compress(train, test, model, classes=600):
    retrievalB = list([])
    retrievalL = list([])
    with torch.no_grad():
        for batch_step, (data, target) in enumerate(train):         
            var_data = data.to(device)            
            _,_, code = model(var_data)        
            retrievalB.extend(code.cpu().data.numpy())
            retrievalL.extend(target)
         
        queryB=list([])
        queryL=list([])
        for batch_step, (data, target) in enumerate(test):     
            var_data = data.to(device)
            _,_, code = model(var_data)
            
            queryB.extend(code.cpu().data.numpy())
            queryL.extend(target)
            
        retrievalB=np.array(retrievalB)
        retrievalL=np.eye(classes)[np.array(retrievalL)]
            
        queryB=np.array(queryB)
        queryL=np.eye(classes)[np.array(queryL)]   
        
            
        return retrievalB, retrievalL, queryB, queryL


def calculate_hamming1(B1, B2):
    """
    :param B1:  vector [n]
    :param B2:  vector [r*n]
    :return: hamming distance [r]
    """
    if len(B1.shape) == 1:
        B1 = np.expand_dims(B1, axis=0)  # Convert B1 to a 2D array if it's 1D

    q = B2.shape[1]  # max inner product value
    distH = 0.5 * (q - np.dot(B1, B2.transpose()))
    return distH
  
    
def calculate_hamming(B1, B2):
    """
    :param B1:  vector [n]
    :param B2:  vector [r*n]
    :return: hamming distance [r]
    """
    q = B2.shape[1] # max inner product value
    distH = 0.5 * (q - np.dot(B1, B2.transpose()))
    return distH


def calculate_map(qB, rB, queryL, retrievalL):
    """
       :param qB: {-1,+1}^{mxq} query bits
       :param rB: {-1,+1}^{nxq} retrieval bits
       :param queryL: {0,1}^{mxl} query label
       :param retrievalL: {0,1}^{nxl} retrieval label
       :return:
    """
    num_query = queryL.shape[0]
    map = 0
    for iter in range(num_query):
       
        gnd = (np.dot(queryL[iter, :], retrievalL.transpose()) > 0).astype(np.float32)
        
        tsum = np.sum(gnd).astype(int)
        if tsum == 0:
            continue
        
        hamm = calculate_hamming(qB[iter, :], rB)
        ind = np.argsort(hamm)
        gnd = gnd[ind]

        count = np.linspace(1, tsum, tsum) # [1,2, tsum]
        tindex = np.asarray(np.where(gnd == 1)) + 1.0
        map_ = np.mean(count / (tindex))
       
        map = map + map_
    map = map / num_query
    return map



def calculate_top_map(qB, rB, queryL, retrievalL, topk):
    """
    :param qB: {-1,+1}^{mxq} query bits
    :param rB: {-1,+1}^{nxq} retrieval bits
    :param queryL: {0,1}^{mxl} query label
    :param retrievalL: {0,1}^{nxl} retrieval label
    :param topk:
    :return:
    """
    num_query = queryL.shape[0]
    topkmap = 0
    for iter in range(num_query):
        
       
        gnd = (np.dot(queryL[iter, :], retrievalL.transpose()) > 0).astype(np.float32)
        hamm = calculate_hamming(qB[iter, :], rB)
        ind = np.argsort(hamm)
        gnd = gnd[ind]

        tgnd = gnd[0:topk]
        tsum = np.sum(tgnd).astype(int)
        if tsum == 0:
            continue
        count = np.linspace(1, tsum, tsum)

        tindex = np.asarray(np.where(tgnd == 1)) + 1.0
        topkmap_ = np.mean(count / (tindex))
      
        topkmap = topkmap + topkmap_
    topkmap = topkmap / num_query
    return topkmap



def myCalcTopMap(rB, qB, retrievalL, queryL, topk):
    
    
    num_query = queryL.shape[0]
    
    topkmap = 0
    
    
    for iter in tqdm(range(num_query)):
        
        
        gnd = (np.dot(queryL[iter, :], retrievalL.transpose()) > 0).astype(np.float32)
        
        
        hamm = CalcHammingDist(qB[iter, :], rB)
        
        
        ind = np.argsort(hamm)
        
       
        gnd = gnd[ind]
        
        
        tgnd = gnd[0:topk]
        
       
        tsum = np.sum(tgnd).astype(int)
        if tsum == 0:
            continue
        
       
        count = np.linspace(1, tsum, tsum)
        
        
        tindex = np.asarray(np.where(tgnd == 1)) + 1.0
        
        
        topkmap_ = np.mean(count / (tindex))
        
        topkmap = topkmap + topkmap_
   
    topkmap = topkmap / num_query
    
    return topkmap

In [None]:
# 训练过程
import matplotlib.pyplot as plt

# Initialize an empty list to store accuracy values for each epoch
epoch_accuracies = []
best = 0.0

torch.set_num_threads(num_threads)

if torch.cuda.device_count() > 1:
    net = nn.DataParallel(net)    
net.to(device)
# Train the Model
for epoch in range(num_epochs):  
    net.train()

    
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward + Backward + Optimize
        optimizer.zero_grad()
        outputs, feature, _ = net(images)


        loss1 = criterion(outputs, labels)
  
        loss2 = torch.mean(torch.abs(torch.pow(torch.abs(feature) - torch.ones(feature.size()).to(device), 3)))
        loss = loss1 + 0.1 * loss2
        loss.backward()
        optimizer.step()

#         if (i + 1) % (len(oxford102TrainDataset) // batch_size / 2) == 0:
        print ('Epoch [%d/%d], Iter [%d/%d] Loss1: %.4f Loss2: %.4f'
               % (epoch + 1, num_epochs, i + 1, len(sessiondatabaseDataset) // train_batch_size,
                  loss1.item(), loss2.item()))

   
    net.eval()  
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
           
            outputs, _, _ = net(images)
          
            _, predicted = torch.max(outputs.cpu().data, 1)

            total += labels.size(0)

            correct += (predicted == labels).sum()

    print('Test Accuracy of the model: %.2f %%' % (100.0 * correct / total))

    if 1.0 * correct / total > best:
        best = 1.0 * correct / total
        torch.save(net.state_dict(), 'temp44.pkl')    
    print('best: %.2f %%' % (best * 100.0))
    net.eval()
    retrievalB, retrievalL, queryB, queryL = compress(train_loader, val_loader, net)
    print('---calculate map1---')
    result1 = calculate_map(qB=queryB, rB=retrievalB, queryL=queryL, retrievalL=retrievalL)
    #torch.save(net.state_dict(), 'map1_{}_module_all_obj.pkl'.format(round(result1,3)))
    print(result1)
    retrievalB, retrievalL, queryB, queryL = compress(database_loader, val_loader, net)
    print('---calculate map2---')
    result2 = calculate_map(qB=queryB, rB=retrievalB, queryL=queryL, retrievalL=retrievalL)
    print(result2)

 

In [None]:
    epoch_accuracies.append(result2)
        #torch.save(net.state_dict(), 'map2_{}_module_all_obj.pkl'.format(round(result2,3)))
    genuine_scores = []  # Store positive scores
    imposter_scores = []  # Store negative scores

    with torch.no_grad():
        for i in range(len(queryB)):
            for j in range(len(retrievalB)):
                if np.all(queryL[i] == retrievalL[j]):  # Genuine pair
                    queryB_tensor = torch.from_numpy(queryB[i])  # Convert to a PyTorch tensor
                    retrievalB_tensor = torch.from_numpy(retrievalB[j])  # Convert to a PyTorch tensor
                    genuine_scores.append(-1 * torch.dist(queryB_tensor, retrievalB_tensor, p=2).item())
                else:  # Imposter pair
                    queryB_tensor = torch.from_numpy(queryB[i])  # Convert to a PyTorch tensor
                    retrievalB_tensor = torch.from_numpy(retrievalB[j])  # Convert to a PyTorch tensor
                    imposter_scores.append(-1 * torch.dist(queryB_tensor, retrievalB_tensor, p=2).item())

    # Set a threshold (you may need to fine-tune this)
    threshold = 0.0  # Adjust the threshold as needed

    # Calculate FAR and FRR
    far = np.mean(np.array(imposter_scores) >= threshold)
    frr = np.mean(np.array(genuine_scores) < threshold)

    # Calculate EER
    eer = 0.5  # Initialize EER
    threshold_range = np.linspace(min(imposter_scores), max(genuine_scores), num=1000)

    for t in threshold_range:
        far_t = np.mean(np.array(imposter_scores) >= t)
        frr_t = np.mean(np.array(genuine_scores) < t)

        if abs(far_t - frr_t) < abs(far - frr):
            far, frr = far_t, frr_t
            eer = 0.5 * (far + frr)

    print(f'FAR: {far * 100:.2f}%')
    print(f'FRR: {frr * 100:.2f}%')
    print(f'EER: {eer * 100:.2f}%')

# Plot the accuracy vs. epoch graph
plt.figure(figsize=(8, 6))
plt.plot(range(1, num_epochs + 1), epoch_accuracies, marker='o', linestyle='-')
plt.title('Accuracy vs. Epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (result2)')
plt.grid(True)