In [1]:
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import ConcatDataset, DataLoader, Subset, Dataset
from torchvision.datasets import DatasetFolder, VisionDataset
from tqdm.auto import tqdm
import random
from torchvision import transforms as T
import matplotlib.pyplot as plt
from PIL import Image


myseed = 6666
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(myseed)
torch.manual_seed(myseed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(myseed)

train_tfm = transforms.Compose([
    # Resize the image into a fixed shape (height = width = 128)
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

test_tfm = transforms.Compose([
    # Resize the image into a fixed shape (height = width = 128)
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

class FoodDataset(Dataset):

    def __init__(self, path, tfm=test_tfm, files=None):
        super(FoodDataset, self).__init__()
        self.path = path
        self.files = sorted([os.path.join(path, x) for x in os.listdir(path) if x.endswith(".jpg")])
        if files is not None:
            self.files = files
        print(f"One {path} sample:", self.files[0])
        self.transform = tfm

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        fname = self.files[idx]
        try:
            im = Image.open(fname)

            if im.mode == 'RGBA':
                im = im.convert('RGB')

            im = self.transform(im)

            try:
                label = int(os.path.basename(fname).split("_")[0])
            except ValueError:
                label = -1
        except OSError:
            print(f"Skip error image: {fname}")
            return None

        return im, label
    
    # Data loading
train_dataset  = FoodDataset("/home/yuchi/AI/Dataset/training", tfm=train_tfm)  # Replace with the training dataset path
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  # Increase batch size for training

test_dataset  = FoodDataset("/home/yuchi/AI/Dataset/testing", tfm=test_tfm)  # Replace with the testing dataset path
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

  from .autonotebook import tqdm as notebook_tqdm


One /home/yuchi/AI/Dataset/training sample: /home/yuchi/AI/Dataset/training/0_0.jpg
One /home/yuchi/AI/Dataset/testing sample: /home/yuchi/AI/Dataset/testing/0001.jpg


In [1]:
import torch
from torch import nn

class Bottleneck(nn.Module):
    # 殘差塊定義
    extention = 4 # 每個 Bottleneck block 的擴展倍率
    def __init__(self, inplanes, planes, stride, downsample=None):
        super(Bottleneck, self).__init__()
        # 1x1卷積層，用於減少維度
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, stride=stride, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        # 3x3卷積層，保持特徵圖的大小（除非有 stride 設定）
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        # 1x1卷積層，用於恢復維度（乘上擴展倍率）
        self.conv3 = nn.Conv2d(planes, planes * self.extention, kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.extention)

        self.relu = nn.ReLU(inplace=True)
        # 如果下采樣不為 None，則應用下采樣
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        shortcut = x # shortcut 是跳躍連接的原始輸入
        # 第一個 1x1 卷積層和 BN 操作
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        # 第二個 3x3 卷積層和 BN 操作
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        # 第三個 1x1 卷積層和 BN 操作
        out = self.conv3(out)
        out = self.bn3(out)
        # 如果需要，對 shortcut 使用下采樣
        if self.downsample is not None:
            shortcut = self.downsample(x)
        # 殘差加和
        out = out + shortcut 
        out = self.relu(out) # 最後一層 ReLU 激活函數
        return out

class ResNet50(nn.Module):
    def __init__(self, block, layers, num_class):
        self.inplane = 64 # 初始輸入通道數
        super(ResNet50, self).__init__()

        self.block = block
        self.layers = layers
        # 第一層卷積：7x7大小的卷積核，stride=2，padding=3，處理輸入圖像
        self.conv1 = nn.Conv2d(3, self.inplane, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.inplane)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # 四個 block stage，依次增加通道數
        self.stage1 = self.make_layer(self.block, 64, layers[0], stride=1)
        self.stage2 = self.make_layer(self.block, 128, layers[1], stride=2)
        self.stage3 = self.make_layer(self.block, 256, layers[2], stride=2)
        self.stage4 = self.make_layer(self.block, 512, layers[3], stride=2)

        # Change the avgpool kernel size based on output size after stage4
        # 四個 block stage，依次增加通道數
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Adaptive pooling to output (1, 1) # 自適應池化
        self.fc = nn.Linear(512 * block.extention, num_class)

    def forward(self, x):
        # 第一層卷積和池化
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.maxpool(out)
        # Block部分 # 四個 stage
        out = self.stage1(out)
        out = self.stage2(out)
        out = self.stage3(out)
        out = self.stage4(out)
        # 池化和全連接層
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)

        return out
    # 定義如何堆疊多個 Bottleneck block
    def make_layer(self, block, plane, block_num, stride=1):
        block_list = []
        downsample = None
        # 如果需要改變尺寸或通道數，定義下采樣
        if (stride != 1 or self.inplane != plane * block.extention):
            downsample = nn.Sequential(
                nn.Conv2d(self.inplane, plane * block.extention, stride=stride, kernel_size=1, bias=False),
                nn.BatchNorm2d(plane * block.extention)
            )
         # 第一個 block 處理 stride 和 downsample
        conv_block = block(self.inplane, plane, stride=stride, downsample=downsample)
        block_list.append(conv_block)
        self.inplane = plane * block.extention
        # 添加剩下的 block，這些 block 沒有下采樣
        for i in range(1, block_num):
            block_list.append(block(self.inplane, plane, stride=1))

        return nn.Sequential(*block_list)
'''
if __name__ == "__main__":
    resnet = ResNet50(Bottleneck, [3, 4, 6, 3], 11)  # Assuming you want 11 classes
    x = torch.randn(64, 3, 128, 128)  # Change to 128x128 input
    x = resnet(x)
    print(x.shape)  # Should output: torch.Size([64, 11])
'''


torch.Size([64, 11])


In [3]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader,Dataset
import os
import torchvision.models as models
 
 
def main():
    
    # 2. load model
    num_class = 11
    model = models.resnet50(pretrained=True)
    fc_inputs = model.fc.in_features
    model.fc = nn.Linear(fc_inputs, num_class)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    # 3. prepare super parameters
    criterion = nn.CrossEntropyLoss()
    learning_rate = 0.0001
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    epoch = 15
 
    # 4. train
    val_acc_list = []
    for epoch in range(0, epoch):
        print('\nEpoch: %d' % (epoch + 1))
        model.train()
        sum_loss = 0.0
        correct = 0.0
        total = 0.0
        for batch_idx, (images, labels) in enumerate(train_loader):
            length = len(train_loader)
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images) # torch.size([batch_size, num_class])
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
            sum_loss += loss.item()
            _, predicted = torch.max(outputs.data, dim=1)
            total += labels.size(0)
            correct += predicted.eq(labels.data).cpu().sum()
            print('[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% ' 
                % (epoch + 1, (batch_idx + 1 + epoch * length), sum_loss / (batch_idx + 1), 100. * correct / total))
            
        #get the ac with testdataset in each epoch
        print('Waiting Val...')
        with torch.no_grad():
            correct = 0.0
            total = 0.0
            for batch_idx, (images, labels) in enumerate(test_loader):
                model.eval()
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum()
            print('Val\'s ac is: %.3f%%' % (100 * correct / total))
            
            acc_val = 100 * correct / total
            val_acc_list.append(acc_val)
 
 
        torch.save(model.state_dict(), "/home/yuchi/AI/M1354024_best.ckpt")
        if acc_val == max(val_acc_list):
            torch.save(model.state_dict(), "/home/yuchi/AI/M1354024_best.ckpt")
            print("save epoch {} model".format(epoch))
 
if __name__ == "__main__":
    main()




Epoch: 1


  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


[epoch:1, iter:1] Loss: 2.487 | Acc: 9.375% 
[epoch:1, iter:2] Loss: 2.406 | Acc: 10.156% 
[epoch:1, iter:3] Loss: 2.352 | Acc: 16.146% 
[epoch:1, iter:4] Loss: 2.308 | Acc: 17.578% 
[epoch:1, iter:5] Loss: 2.247 | Acc: 21.562% 
[epoch:1, iter:6] Loss: 2.199 | Acc: 23.958% 
[epoch:1, iter:7] Loss: 2.155 | Acc: 27.232% 
[epoch:1, iter:8] Loss: 2.091 | Acc: 30.078% 
[epoch:1, iter:9] Loss: 2.046 | Acc: 31.944% 
[epoch:1, iter:10] Loss: 2.004 | Acc: 33.750% 
[epoch:1, iter:11] Loss: 1.972 | Acc: 34.375% 
[epoch:1, iter:12] Loss: 1.920 | Acc: 37.500% 
[epoch:1, iter:13] Loss: 1.881 | Acc: 38.822% 
[epoch:1, iter:14] Loss: 1.848 | Acc: 40.067% 
[epoch:1, iter:15] Loss: 1.808 | Acc: 41.875% 
[epoch:1, iter:16] Loss: 1.780 | Acc: 42.676% 
[epoch:1, iter:17] Loss: 1.753 | Acc: 43.474% 
[epoch:1, iter:18] Loss: 1.721 | Acc: 44.444% 
[epoch:1, iter:19] Loss: 1.694 | Acc: 45.641% 
[epoch:1, iter:20] Loss: 1.666 | Acc: 46.797% 
[epoch:1, iter:21] Loss: 1.635 | Acc: 47.917% 
[epoch:1, iter:22] Loss

In [4]:
model_best = models.resnet50(pretrained=True)
fc_inputs = model_best.fc.in_features
model_best.fc = nn.Linear(fc_inputs, 11)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model_best.to(device)
model_best.load_state_dict(torch.load(f"/home/yuchi/AI/M1354024_best.ckpt"))
model_best.eval()
prediction = []
with torch.no_grad():
    for data,_ in test_loader:
        test_pred = model_best(data.to(device))
        test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
        prediction += test_label.squeeze().tolist()
#create test csv
def pad4(i):
    return "0"*(4-len(str(i)))+str(i)
df = pd.DataFrame()
df["Id"] = [pad4(i) for i in range(1,len(test_dataset)+1)]
df["Category"] = prediction
df.to_csv("/home/yuchi/AI/submission.csv",index = False)
print("done!")

done!
