# 1. Import

In [1]:
from os import path
import config


def get_path(filename: str) -> tuple[str, str]:
    return (
        path.join(config.dataset_path, f"{filename}_Extract.npy"),
        path.join(config.dataset_path, f"{filename}.npy")
    )


In [2]:
import os
import toolset
import numpy as np
from npy_append_array import NpyAppendArray

if os.path.exists(toolset.temp_x_path()):
    os.remove(toolset.temp_x_path())
if os.path.exists(toolset.temp_y_path()):
    os.remove(toolset.temp_y_path())

with NpyAppendArray(toolset.temp_x_path()) as array_x,\
        NpyAppendArray(toolset.temp_y_path()) as array_y:
    for name in config.file_names:
        paths = get_path(name)

        current_x = np.load(paths[0], mmap_mode="r")
        current_x = current_x / 255.0
        array_x.append(current_x)

        current_y = np.load(paths[1], mmap_mode="r")
        current_y = 1*current_y
        array_y.append(current_y)


# 2. Preprocess

In [3]:
data_x = np.load(toolset.temp_x_path(), "r")
data_y = np.load(toolset.temp_y_path(), "r")
if data_x.shape[0] != data_y.shape[0]:
    print("!")


!


In [4]:
from sklearn.model_selection import KFold

kfold = KFold(config.kfold_nsplits, shuffle=True, random_state=42)


In [5]:
import torch


class Data(torch.utils.data.Dataset):
    def __init__(self, data: np.ndarray, label: np.ndarray) -> None:
        self.x = data
        self.y = label

    def __len__(self):
        return self.y.shape[0]

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]


  from .autonotebook import tqdm as notebook_tqdm


# 3. Model

In [6]:
from torch import nn


class CNNModel(nn.Module):
    def __init__(self) -> None:
        super(CNNModel, self).__init__()
        # Kernel_size: size of filter block
        # Stride: The distance between position of the filter
        # Padding: Non-sense at border, so that all data is preserved
        # (in case there is remainder when dividing by kernel_size or something)

        # 28x28x1 -> 26x26x32
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3)
        self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        # 13x13x32 -> 13x13x64
        self.conv2 = nn.Conv2d(
            in_channels=32, out_channels=64, kernel_size=3, padding="same")
        self.avgpool = nn.AvgPool2d(kernel_size=2, stride=2)
        # 6x6x64
        self.linear1 = nn.Linear(6*6*64, 32)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(32, 10)

    def forward(self, x: np.ndarray) -> np.ndarray:
        # Go through all layers
        x = self.conv1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        return x


# 4. Train

In [7]:
model = CNNModel()

use_cuda = torch.cuda.is_available()
if not use_cuda:
    print("CUDA not used!")
device = torch.device("cuda" if use_cuda else "cpu")

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

model = model.to(device)
criterion = criterion.to(device)


  self.weight = Parameter(torch.empty(


In [11]:
def train(train_idx: np.ndarray) -> tuple[float, float]:
    print(data_x.shape, data_y.shape)
    train = Data(data_x[train_idx], data_y[judge_idx])
    print(data_y)
    train_dataloader = torch.utils.data.DataLoader(
        train, batch_size=config.batch_size)
    total_loss_train = 0
    total_accumulate_train = 0
    for x, y in tqdm(train_dataloader):
        x = x.to(device)
        y = y.to(device)

        output = model(x.float())

        batch_loss = criterion(output, y)
        total_loss_train += batch_loss

        accumulate = (output.argmax(dim=1) == y).sum()
        total_accumulate_train += accumulate

        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()

    total_loss_train = total_loss_train.item()
    total_accumulate_train = total_accumulate_train.item()
    return (total_loss_train, total_accumulate_train)


In [9]:
def judge(judge_idx: np.ndarray) -> tuple[float, float]:
    judge = Data(data_x[judge_idx], data_y[judge_idx])
    judge_dataloader = torch.utils.data.DataLoader(
        judge, batch_size=config.batch_size)

    total_loss_judge = 0
    total_accumulate_judge = 0
    with torch.no_grad():
        for x, y in tqdm(judge_dataloader):
            x = x.to(device)
            y = y.to(device)

            output = model(x.float())

            batch_loss = criterion(output, y)
            total_loss_judge += batch_loss

            accumulate = (output.argmax(dim=1) == y).sum()
            total_accumulate_judge += accumulate

    total_loss_judge = total_loss_judge.item()
    total_accumulate_judge = total_accumulate_judge.item()
    return (total_loss_judge, total_accumulate_judge)


In [12]:
from tqdm import tqdm

min_judge_loss = float('inf')


for epoch, (train_idx, judge_idx) in enumerate(kfold.split(data_x)):
    total_loss_train, total_accumulate_train = train(train_idx)
    total_loss_judge, total_accumulate_judge = judge(judge_idx)
    print(
        f'''Epochs: {epoch+1} 
        | Train Loss: {total_loss_train / len(train):.3f}
        | Train Accuracy: {total_accumulate_train/len(train):.3f}
        | Val Loss: {total_loss_judge/len(judge):.3f}
        | Val Accuracy: {total_accumulate_judge/len(judge):.3f}
        | Train idx: {train_idx}
        | Val idx: {judge_idx}'''
    )
    if min_judge_loss > total_loss_judge/len(judge):
        min_judge_loss = total_loss_judge/len(judge)
        torch.save(model.state_dict(), "simplemodel.pt")
        print(f"Save model because val loss improve loss {min_judge_loss:.3f}")


(787, 144, 256, 3) (701,)
[0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

  0%|          | 0/1 [00:00<?, ?it/s]


RuntimeError: Could not infer dtype of numpy.int32