### Import libraries

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
import os
import cv2
import numpy as np
import setproctitle
import argparse

1


In [2]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.data = self._load_data()

    def _load_data(self):
        data = []
        for cls in self.classes:
            class_path = os.path.join(self.root_dir, cls)
            for img_name in os.listdir(class_path):
                img_path = os.path.join(class_path, img_name)
                data.append((img_path, self.class_to_idx[cls]))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )
            
    def forward(self, x):
        out = self.left(x)
        out = out + self.shortcut(x)
        out = F.relu(out)
        
        return out


In [4]:
class ResNet(nn.Module): #Resnet18 model
    def __init__(self, ResidualBlock, num_classes=3):
        super(ResNet, self).__init__()
        self.inchannel = 64
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),  # Change kernel_size, stride, and padding
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)  # Add MaxPool2d
        )
        self.layer1 = self.make_layer(ResidualBlock, 64, 2, stride=1)
        self.layer2 = self.make_layer(ResidualBlock, 128, 2, stride=2)
        self.layer3 = self.make_layer(ResidualBlock, 256, 2, stride=2)        
        self.layer4 = self.make_layer(ResidualBlock, 512, 2, stride=2)        
        self.fc = nn.Linear(512, num_classes)
        
    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 7)  # Change average pooling to have a kernel_size of 7 to match input size
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out


In [5]:
def ResNet18():
    return ResNet(ResidualBlock)


In [6]:
def set_unique_process_name(unique_name):
    setproctitle.setproctitle(unique_name)


In [15]:
if __name__ == "__main__":
    #set the program specific name
    unique_name = "user_program"
    set_unique_process_name(unique_name)

    # Check GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Set hyperparameters
    EPOCH = 150
    pre_epoch = 0
    BATCH_SIZE = 16
    LR = 0.01

    # Prepare dataset and preprocessing
    transform_train = transforms.Compose([
        transforms.Resize((224, 224)),  # Change input size
        transforms.ToTensor(),
    ])

    transform_test = transforms.Compose([
        transforms.Resize((224, 224)),  # Change input size
        transforms.ToTensor(),
    ])
    
    #parse arguments
    #parser = argparse.ArgumentParser(description='Process some integers.')

    #parser.add_argument('--projectname', type=str, help='location of the image')
    #parse the arguments
    #args = parser.parse_args()
    
    #projectname = args.projectname

    #set the directory
    current_directory = os.getcwd()
    relative_path_train = "server/datasets/test/data"
    relative_path_test = "server/datasets/test/data"
    file_path_train = os.path.join(current_directory, relative_path_train)
    file_path_test = os.path.join(current_directory, relative_path_test)

    custom_dataset_train = CustomDataset(root_dir=file_path_train, transform=transform_train)
    custom_dataset_test = CustomDataset(root_dir=file_path_test, transform=transform_test)

    trainloader = torch.utils.data.DataLoader(custom_dataset_train, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    testloader = torch.utils.data.DataLoader(custom_dataset_test, batch_size=8, shuffle=True, num_workers=2)

    # Define ResNet18
    net = ResNet18().to(device)

    # Define loss function & optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4)


    for epoch in range(pre_epoch, EPOCH):
        print("jajucharunning")
        print('\nEpoch: %d' % (epoch + 1))
        net.train()
        sum_loss = 0.0
        correct = 0.0
        total = 0.0
        for i, data in enumerate(trainloader, 0):
            # Prepare dataset
            length = len(trainloader)
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            
            # Forward & backward
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            # Print accuracy & loss in each batch
            sum_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += predicted.eq(labels.data).cpu().sum()
            print('[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% ' 
                % (epoch + 1, (i + 1 + epoch * length), sum_loss / (i + 1), 100. * correct / total))

        torch.save({
            'epoch': epoch + 1,  # Save the current epoch
            'model_state_dict': net.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': sum_loss,
        }, os.path.join(current_directory,'server/datasets/test/train/checkpoints', f'resnet_model_epoch_{epoch + 1}.pth'))
        
        # Get accuracy with the test dataset in each epoch
        print('Waiting Test...')
        with torch.no_grad():
            correct = 0
            total = 0
            for data in testloader:
                net.eval()
                images, labels = data
                images, labels = images.to(device), labels.to(device)
                #print(images[0].shape)

                # Convert PyTorch tensor to NumPy array
                numpy_image = images[0].cpu().numpy()

                # Rescale values from [0, 1] to [0, 255]
                numpy_image = (numpy_image * 255).astype(np.uint8)

                # Transpose the dimensions from [3, 224, 224] to [224, 224, 3]
                numpy_image = np.transpose(numpy_image, (1, 2, 0))

                # Convert NumPy array to OpenCV format (BGR)
                opencv_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)

                outputs = net(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum()

                #filename = f'{str(predicted[0])+str(epoch)}.png'
                #cv2.imwrite(filename, opencv_image)

            print('Test\'s accuracy is: %.3f%%' % (100 * correct / total))

    print('Training has finished, total epoch is %d' % EPOCH)


jajucharunning

Epoch: 1
[epoch:1, iter:1] Loss: 1.208 | Acc: 25.000% 
[epoch:1, iter:2] Loss: 1.108 | Acc: 31.250% 
[epoch:1, iter:3] Loss: 0.974 | Acc: 35.417% 
[epoch:1, iter:4] Loss: 0.920 | Acc: 46.875% 
[epoch:1, iter:5] Loss: 0.977 | Acc: 50.000% 
[epoch:1, iter:6] Loss: 1.131 | Acc: 46.875% 
[epoch:1, iter:7] Loss: 1.093 | Acc: 48.214% 
[epoch:1, iter:8] Loss: 1.019 | Acc: 53.125% 
[epoch:1, iter:9] Loss: 1.017 | Acc: 50.694% 
[epoch:1, iter:10] Loss: 1.001 | Acc: 50.980% 


RuntimeError: File /resnet_model_epoch_1.pth cannot be opened.

In [10]:
%tb

SystemExit: 2

In [12]:
import os

# get the current working directory
current_working_directory = os.getcwd()

# print output to the console
print(current_working_directory)

/home/tr
