In [5]:
mport torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, VGG16, Vgg_two_stream, Logistic, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-4,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Logistic',
    'mode': 'rgb',
    'EPOCH': 50,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
               
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Logistic(
  (logic): Sequential(
    (0): Linear(in_features=193548, out_features=3, bias=True)
  )
)
Epoch:1/50     Step:1|6   loss:1.1313748359680176  
Epoch:1/50     Step:2|6   loss:6.042836666107178  
Epoch:1/50     Step:3|6   loss:7.658213138580322  
Epoch:1/50     Step:4|6   loss:3.13948655128479  
Epoch:1/50     Step:5|6   loss:1.8175681829452515  
Epoch:1/50     Step:6|6   loss:5.128973007202148  
Epoch:1/50     Step:7|6   loss:5.173724174499512  
current max accuracy	 test set:42.99065420560748%	 train set:49.88290398126464%
Epoch:2/50     Step:1|6   loss:4.515814304351807  
Epoch:2/50     Step:2|6   loss:4.77076530456543  
Epoch:2/50     Step:3|6   loss:4.305926322937012  
Epoch:2/50     Step:4|6   loss:2.920238494873047  
Epoch:2/50     Step:5|6   loss:2.0663328170776367  
Epoch:2/50     Step:6|6   loss:1.1113277673721313  
Epoch:2/50     Step:7|6   loss:2.8948562145233154  
current max accuracy	 test set:42.99065420560748%	 train set:49.88290398126464%
Epoch:3/50     Step:1

In [2]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, VGG16, Vgg_two_stream, Logistic, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-4,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Logistic',
    'mode': 'ir',
    'EPOCH': 50,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
               
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Logistic(
  (logic): Sequential(
    (0): Linear(in_features=193548, out_features=3, bias=True)
  )
)
Epoch:1/50     Step:1|6   loss:1.0947716236114502  
Epoch:1/50     Step:2|6   loss:4.17678689956665  
Epoch:1/50     Step:3|6   loss:0.8129972219467163  
Epoch:1/50     Step:4|6   loss:2.4710593223571777  
Epoch:1/50     Step:5|6   loss:2.16934871673584  
Epoch:1/50     Step:6|6   loss:1.2618601322174072  
Epoch:1/50     Step:7|6   loss:1.347703218460083  
current max accuracy	 test set:77.57009345794393%	 train set:77.75175644028103%
Epoch:2/50     Step:1|6   loss:1.183098554611206  
Epoch:2/50     Step:2|6   loss:1.1106321811676025  
Epoch:2/50     Step:3|6   loss:1.3236607313156128  
Epoch:2/50     Step:4|6   loss:1.3961237668991089  
Epoch:2/50     Step:5|6   loss:1.2691214084625244  
Epoch:2/50     Step:6|6   loss:1.203107476234436  
Epoch:2/50     Step:7|6   loss:1.300279140472412  
current max accuracy	 test set:84.11214953271028%	 train set:86.18266978922716%
Epoch:3/50     Ste

In [14]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, VGG16, Vgg_two_stream, Logistic, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-4,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Logistic_two_stream',
    'mode': 'both',
    'EPOCH': 50,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
               
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)



Logistic_two_stream(
  (IN): Sequential(
    (0): Linear(in_features=193548, out_features=3, bias=True)
  )
  (IN_both): Sequential(
    (0): Linear(in_features=387096, out_features=3, bias=True)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
)
Epoch:1/50     Step:1|6   loss:1.0978562831878662  
Epoch:1/50     Step:2|6   loss:15.052549362182617  
Epoch:1/50     Step:3|6   loss:6.39993953704834  
Epoch:1/50     Step:4|6   loss:4.021249771118164  
Epoch:1/50     Step:5|6   loss:10.489912986755371  
Epoch:1/50     Step:6|6   loss:9.854753494262695  
Epoch:1/50     Step:7|6   loss:7.095717906951904  
current max accuracy	 test set:77.57009345794393%	 train set:73.53629976580797%
Epoch:2/50     Step:1|6   loss:1.7497584819793701  
Epoch:2/50     Step:2|6   loss:2.8186092376708984  
Epoch:2/50     Step:3|6   loss:4.904473304748535  
Epoch:2/50     Step:4|6   loss:5.417214870452881  
Epoch:2/50     Step:5|6   loss:3.700740337371826  
Epoch:2/50     Step:6|6   loss:2.0529956817626953  
Epoc

In [2]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import os
from models import Logistic_two_stream, VGG16, Vgg_two_stream, Logistic, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')

# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-4,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model':'VGG16' ,
    'mode': 'rgb',
    'EPOCH': 5,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                x = rgb if mode == 'rgb' else ir
                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)

elif model_name == 'Logistic':
    net = Logistic(classes_num).to(DEVICE)
    target_size = 254

elif model_name == 'Logistic_two_stream':
    net = Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254

elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254

elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254

print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=4)  # Adjust num_workers for parallel loading
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False, num_workers=4)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = len(train_dataloader)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for epoch in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            x = rgb.to(DEVICE) if mode == 'rgb' else ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss.item())
        print(f'Epoch:{epoch + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((epoch + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, epoch, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, epoch, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            x = rgb.to(DEVICE) if mode == 'rgb' else ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)




VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [3]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import os
from models import Logistic_two_stream, VGG16, Vgg_two_stream, Logistic, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')

# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-4,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model':'VGG16' ,
    'mode': 'ir',
    'EPOCH': 5,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                x = rgb if mode == 'rgb' else ir
                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)

elif model_name == 'Logistic':
    net = Logistic(classes_num).to(DEVICE)
    target_size = 254

elif model_name == 'Logistic_two_stream':
    net = Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254

elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224

elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254

elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254

print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=4)  # Adjust num_workers for parallel loading
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False, num_workers=4)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = len(train_dataloader)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for epoch in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            x = rgb.to(DEVICE) if mode == 'rgb' else ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss.item())
        print(f'Epoch:{epoch + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((epoch + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, epoch, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, epoch, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            x = rgb.to(DEVICE) if mode == 'rgb' else ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)




VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [2]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, Flame_one_stream, VGG16, Vgg_two_stream, Logistic, Flame_two_stream, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream, Resnet18, Resnet18_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-3,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Flame_one_stream',
    'mode': 'rgb',
    'EPOCH': 20,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Flame_one_stream', 'Flame_two_stream', 'Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'Resnet18_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'Flame_one_stream':
    net = Flame_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
elif model_name == 'Flame_two_stream':
    net = Flame_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
# Add similar blocks for other models
elif model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
    
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18':
    net = Resnet18(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18_two_stream':
    net = Resnet18_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Flame_one_stream(
  (IN): Sequential(
    (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (IN_both): Sequential(
    (0): Conv2d(6, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (residual): Conv2d(8, 8, kernel_size=(1, 1), stride=(2, 2))
  (block): Sequential(
    (0): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
    )
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_

In [3]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, Flame_one_stream, VGG16, Vgg_two_stream, Logistic, Flame_two_stream, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream, Resnet18, Resnet18_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-3,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Flame_one_stream',
    'mode': 'ir',
    'EPOCH': 20,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Flame_one_stream', 'Flame_two_stream', 'Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'Resnet18_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'Flame_one_stream':
    net = Flame_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
elif model_name == 'Flame_two_stream':
    net = Flame_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
# Add similar blocks for other models
elif model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
    
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18':
    net = Resnet18(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18_two_stream':
    net = Resnet18_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Flame_one_stream(
  (IN): Sequential(
    (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (IN_both): Sequential(
    (0): Conv2d(6, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (residual): Conv2d(8, 8, kernel_size=(1, 1), stride=(2, 2))
  (block): Sequential(
    (0): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
    )
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_

In [4]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, Flame_one_stream, VGG16, Vgg_two_stream, Logistic, Flame_two_stream, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream, Resnet18, Resnet18_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-3,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Flame_one_stream',
    'mode': 'both',
    'EPOCH': 20,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Flame_one_stream', 'Flame_two_stream', 'Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'Resnet18_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'Flame_one_stream':
    net = Flame_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
elif model_name == 'Flame_two_stream':
    net = Flame_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
# Add similar blocks for other models
elif model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
    
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18':
    net = Resnet18(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18_two_stream':
    net = Resnet18_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Flame_one_stream(
  (IN): Sequential(
    (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (IN_both): Sequential(
    (0): Conv2d(6, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (residual): Conv2d(8, 8, kernel_size=(1, 1), stride=(2, 2))
  (block): Sequential(
    (0): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
    )
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): SeparableConv2d(
      (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
      (pointwise): Conv2d(8, 8, kernel_

In [5]:
import torch
import torch.nn as nn
from dataset import MyDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F
import os
from models import Logistic_two_stream, Flame_one_stream, VGG16, Vgg_two_stream, Logistic, Flame_two_stream, Mobilenetv2, Mobilenetv2_two_stream, LeNet5_one_stream, LeNet5_two_stream, Resnet18, Resnet18_two_stream
import csv
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, f1_score

# Set DEVICE to 'cuda' if available, otherwise 'cpu'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Add these lines to create the 'log' directory if it doesn't exist
if not os.path.exists('./log'):
    os.makedirs('./log')
    
# Direct assignment of parameters
args = {
    'path_rgb': './254p RGB Images/',
    'path_ir': './254p Thermal Images/',
    'index': 0,
    'batch_size': 64,
    'lr': 1e-3,
    'classes_num': 3,
    'subset_rate': 0.01,
    'trainset_rate': 0.8,
    'model': 'Flame_two_stream',
    'mode': 'both',
    'EPOCH': 20,
    'test_interval': 1,
    'log_path': './log/results.csv',
    'log_loss_path': './log/'
}

# Add these lines to replace argparse
path_rgb = args['path_rgb']
path_ir = args['path_ir']
index = args['index']
batch_size = args['batch_size']
lr = args['lr']
classes_num = args['classes_num']
subset_rate = args['subset_rate']
trainset_rate = args['trainset_rate']
model_name = args['model']
mode = args['mode']
EPOCH = args['EPOCH']
test_interval = args['test_interval']
log_path = args['log_path']
log_loss_path = args['log_loss_path']

def test(dataloader, model, device, mode, model_name, epoch, log_path, correct_on_test, correct_on_train, Model_custom_list, flag='test_set', output_flag=False):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for (rgb, ir, labels) in dataloader:
            rgb, ir, labels = rgb.to(device), ir.to(device), labels.to(device)

            if model_name in Model_custom_list:
                outputs = model(rgb, ir, mode=mode)
            else:
                if mode == 'rgb':
                    x = rgb
                elif mode == 'ir':
                    x = ir

                outputs = model(x)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total

    if flag == 'test_set':
        correct_on_test.append(accuracy)
    elif flag == 'train_set':
        correct_on_train.append(accuracy)

    if output_flag:
        print(f'Accuracy on {flag} for Epoch {epoch + 1}: {accuracy}%')

    # Save results to a CSV file
    with open(log_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if epoch == 0:
            writer.writerow(['Epoch', 'Accuracy'])
        writer.writerow([epoch + 1, accuracy])

    return accuracy

# Model initialization
Model_custom_list = ['Flame_one_stream', 'Flame_two_stream', 'Mobilenetv2_two_stream', 'Vgg_two_stream', 'Logistic_two_stream', 'Resnet18_two_stream', 'LeNet5_one_stream', 'LeNet5_two_stream']

Transform_flag = False

if model_name == 'Flame_one_stream':
    net = Flame_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
elif model_name == 'Flame_two_stream':
    net = Flame_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
# Add similar blocks for other models
elif model_name == 'VGG16':
    net = VGG16(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Vgg_two_stream':
    Transform_flag = True
    target_size = 224
    net = Vgg_two_stream().to(DEVICE)
        
elif model_name == 'Logistic':
    net= Logistic(classes_num).to(DEVICE)
    target_size = 254
        
elif model_name == 'Logistic_two_stream':
    net= Logistic_two_stream(classes_num).to(DEVICE)
    target_size = 254
    
   
elif model_name == 'Mobilenetv2':
    net = Mobilenetv2(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Mobilenetv2_two_stream':
    net = Mobilenetv2_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18':
    net = Resnet18(classes_num).to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'Resnet18_two_stream':
    net = Resnet18_two_stream().to(DEVICE)
    Transform_flag = True
    target_size = 224
        
elif model_name == 'LeNet5_one_stream':
    net = LeNet5_one_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
elif model_name == 'LeNet5_two_stream':
    net = LeNet5_two_stream().to(DEVICE)
    Transform_flag = False
    target_size = 254
        
print(net)

# Dataset initialization
Dataset = MyDataset(path_rgb, path_ir, input_size=target_size, transform=Transform_flag)

Dataset, _ = torch.utils.data.random_split(Dataset, [int(len(Dataset) * subset_rate), len(Dataset) - int(len(Dataset) * subset_rate)])

split_rate = trainset_rate
train_set, val_set = torch.utils.data.random_split(Dataset, [int(len(Dataset) * split_rate), len(Dataset) - int(len(Dataset) * split_rate)])

train_dataloader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=val_set, batch_size=16, shuffle=False)

# Optimizer and Loss function
optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
loss_function = nn.CrossEntropyLoss(label_smoothing=0.2)

# Training loop
correct_on_train = [0]
correct_on_test = [0]

total_step = (int(len(Dataset) * split_rate) // batch_size)

net.train()

max_accuracy = 0
Loss_accuracy = []
test_acc = []
loss_list = []
train_acc = []

for index in range(EPOCH):
    net.train()
    for i, (rgb, ir, y) in enumerate(train_dataloader):
        optimizer.zero_grad()

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        loss = loss_function(y_pre, y.to(DEVICE))
        Loss_accuracy.append(loss)
        print(f'Epoch:{index + 1}/{EPOCH}     Step:{i + 1}|{total_step}   loss:{loss.item()}  ')
        loss_list.append(loss.item())

        loss.backward()

        optimizer.step()

    if ((index + 1) % test_interval) == 0:
        current_accuracy = test(test_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                correct_on_train, Model_custom_list, flag='test_set', output_flag=False)
        test_acc.append(current_accuracy)
        current_accuracy_train = test(train_dataloader, net, DEVICE, mode, model_name, index, log_path, correct_on_test,
                                      correct_on_train, Model_custom_list, flag='train_set', output_flag=False)
        train_acc.append(current_accuracy_train)
        print(f'current max accuracy\t test set:{max(correct_on_test)}%\t train set:{max(correct_on_train)}%')

        if current_accuracy > max_accuracy:
            max_accuracy = current_accuracy

# Example usage of precision_recall and f1_score
y_all_true = torch.tensor([]).to(DEVICE)
y_all_pre = torch.tensor([]).to(DEVICE)
with torch.no_grad():
    net.eval()
    for (rgb, ir, y) in test_dataloader:
        y = y.to(DEVICE)
        y_all_true = torch.cat((y_all_true, y))

        if model_name in Model_custom_list:
            y_pre = net(rgb.to(DEVICE), ir.to(DEVICE), mode=mode)
        else:
            if mode == 'rgb':
                x = rgb.to(DEVICE)
            elif mode == 'ir':
                x = ir.to(DEVICE)
            y_pre = net(x)

        _, label_index = torch.max(y_pre.data, dim=-1)
        y_all_pre = torch.cat((y_all_pre, label_index))

# Calculate precision, recall, and F1 score
precision_recall_result = precision_recall_fscore_support(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')
f1_score_result = f1_score(y_all_true.cpu().numpy(), y_all_pre.cpu().numpy(), average='macro')

# Print the results
print("Precision, Recall:", precision_recall_result[:2])
print("F1 Score:", f1_score_result)

# Save the results
results_array = np.zeros(3, dtype=object)
results_array[0] = loss_list
results_array[1] = train_acc
results_array[2] = test_acc

np.save(log_loss_path + model_name + '.npy', results_array)


Flame_two_stream(
  (stream1): Flame(
    (IN): Sequential(
      (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (IN_both): Sequential(
      (0): Conv2d(6, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (residual): Conv2d(8, 8, kernel_size=(1, 1), stride=(2, 2))
    (block): Sequential(
      (0): SeparableConv2d(
        (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=8, bias=False)
        (pointwise): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
      )
      (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): SeparableConv2d(
        (depthwise): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), g