In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
import torch
import torch.nn as nn
from torchvision import models
from torch.nn.functional import relu

---

### model

In [None]:
class model_UNet

### dataset

In [None]:
import os, random
from torch.utils.data import Dataset, DataLoader
from typing import Union

#---------------------------------------------
# custom dataset
#https://discuss.pytorch.org/t/custom-data-loader-for-big-data/129361
class CustomDataset(Dataset):
    def __init__(self, path_dir_X:str, path_dir_Y:str, n_test:Union[int,float], n_val:Union[int,float]): # n_test -> float:ratio of test, int:number of test
        # list_data_X_file_name_all, list_data_Y_file_name_all
        self.path_dir_X = path_dir_X
        self.path_dir_Y = path_dir_Y
        self.list_data_X_file_name_all = os.listdir(path_dir_X)
        self.list_data_Y_file_name_all = os.listdir(path_dir_Y)
        self.n_data_all = len(self.list_data_X_file_name_all)
        #check
        if len(self.list_data_X_file_name_all) != len(self.list_data_Y_file_name_all):
            raise ValueError("error!!!")
        # suffle
        random.shuffle(self.list_data_X_file_name_all)
        random.shuffle(self.list_data_Y_file_name_all)
        # n_test
        if type(n_test)==int:
            self.n_test = n_test
        elif type(n_test)==float:
            self.n_test = int(len(self.list_data_X_file_name_all)*n_test)
        else:
            raise ValueError("error!!!")
        # n_val
        if type(n_val)==int:
            self.n_val = n_val
        elif type(n_val)==float:
            self.n_val = int(len(self.list_data_X_file_name_all)*n_val)
        else:
            raise ValueError("error!!!")
        #check
        if self.n_data_all <= self.n_test+self.n_val:
            raise ValueError("error!!!")
        # list_data_X_file_name_test / _val / _train
        self.list_data_X_file_name_test = self.list_data_X_file_name_all[:self.n_test]
        self.list_data_X_file_name_val = self.list_data_X_file_name_all[self.n_test:self.n_test+self.n_val]
        self.list_data_X_file_name_train = self.list_data_X_file_name_all[self.n_test+self.n_val:]
        # list_data_Y_file_name_test / _val / _train
        self.list_data_Y_file_name_test = self.list_data_Y_file_name_all[:self.n_test]
        self.list_data_Y_file_name_val = self.list_data_Y_file_name_all[self.n_test:self.n_test+self.n_val]
        self.list_data_Y_file_name_train = self.list_data_Y_file_name_all[self.n_test+self.n_val:]
        
    def __len__(self):
        return len(self.list_data_X_file_name_train)
    
    def __getitem__(self, x):
        #data_X
        data_X_file_name = self.list_data_X_file_name_train[x]
        path_file_X = "{0}/{1}".format(self.path_dir_X, data_X_file_name)
        data_X = np.load(path_file_X, allow_pickle=True)
        data_X = torch.from_numpy(data_X)
        #data_Y
        data_Y_file_name =data_X_file_name #= self.list_data_Y_file_name[x]
        path_file_Y = "{0}/{1}".format(self.path_dir_Y, data_Y_file_name)
        data_Y = np.load(path_file_Y, allow_pickle=True)
        data_Y = torch.from_numpy(data_Y)
        #return
        return data_X, data_Y
    
    def getitem_val_test(self, list_data_X_file_name, x):
        #data_X
        data_X_file_name = list_data_X_file_name[x]
        path_file_X = "{0}/{1}".format(self.path_dir_X, data_X_file_name)
        data_X = np.load(path_file_X, allow_pickle=True)
        data_X = torch.from_numpy(data_X)
        #data_Y
        data_Y_file_name = data_X_file_name #= self.list_data_Y_file_name[x]
        path_file_Y = "{0}/{1}".format(self.path_dir_Y, data_Y_file_name)
        data_Y = np.load(path_file_Y, allow_pickle=True)
        data_Y = torch.from_numpy(data_Y)
        #return
        return data_X, data_Y
    
    def return_n_data_all(self):
        return self.n_data_all
    
    def return_n_test(self):
        return self.n_test
    
    def return_n_val(self):
        return self.n_val
    
    def return_n_train(self):
        return self.n_data_all - self.n_val - self.n_test
    
    def return_test_data(self):
        #https://www.tutorialspoint.com/how-to-join-tensors-in-pytorch
        #data_X_test, data_Y_test
        list_data_X_file_name = self.list_data_X_file_name_test
        n_data = self.n_test
        data_X_test = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[0] for i in range(n_data)])
        data_Y_test = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[1] for i in range(n_data)])
        return data_X_test, data_Y_test
    
    def return_val_data(self):
        #https://www.tutorialspoint.com/how-to-join-tensors-in-pytorch
        #data_X_val, data_Y_val
        list_data_X_file_name = self.list_data_X_file_name_val
        n_data = self.n_val
        data_X_val = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[0] for i in range(n_data)])
        data_Y_val = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[1] for i in range(n_data)])
        return data_X_val, data_Y_val


#---------------------------------------------
#var
path_dir_X = "../data_X"
path_dir_Y = "../data_Y_Task3"
n_test = 100
n_val = 100
batch_size = 10

#---------------------------------------------
#instance
dataset = CustomDataset(path_dir_X=path_dir_X, path_dir_Y=path_dir_Y, n_test=n_test, n_val=n_val)
dataloder = DataLoader(dataset, batch_size=batch_size, shuffle=True)

### criterion (loss)

In [None]:
#---------------------------------------------
#https://neptune.ai/blog/pytorch-loss-functions
criterion = nn.MSELoss()

### train

In [None]:
#----------------------------
#var (train)
num_epochs = 20
val_flag = True

#----------------------------
#var (model)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 
model = model_UNet(z_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[15], gamma=0.1)

#----------------------------
#results
history = {"train_loss": [], "val_loss": []}

#----------------------------
#学習
for epoch in range(num_epochs):
  #----------------------------
  # train
  model.train()
  for i, (x, y) in enumerate(dataloder):
    #----------------------------
    #forward
    input = x.to(device).view(-1, 28*28).to(torch.float32)
    output = model(input)
    loss = criterion(y, output)
    #----------------------------
    #backward
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    #----------------------------
    #print & result
    if (i+1) % 50 == 0:
      print(f'Epoch: {epoch+1}, train_loss: {loss: 0.4f}')
    history["train_loss"].append(loss)

  #----------------------------
  # eval
  if val_flag == True:
    model.eval()
    with torch.no_grad():
      #----------------------------
      #forward
      x, y = dataloder.return_val_data
      input = x.to(device).view(-1, 28*28).to(torch.float32)
      output = model(input)
      loss = criterion(y, output)
      #----------------------------
      #print & result
      history["val_loss"].append(loss)
      print(f'Epoch: {epoch+1}, val_loss: {loss: 0.4f}')
  
  #----------------------------
  # scheduler
  scheduler.step()

### result_train

### result_test

---

# Test (CustomDataset)

In [63]:
import os, random
from torch.utils.data import Dataset, DataLoader
from typing import Union

#---------------------------------------------
# custom dataset
#https://discuss.pytorch.org/t/custom-data-loader-for-big-data/129361
class CustomDataset(Dataset):
    def __init__(self, path_dir_X:str, path_dir_Y:str, n_test:Union[int,float], n_val:Union[int,float]): # n_test -> float:ratio of test, int:number of test
        # list_data_X_file_name_all, list_data_Y_file_name_all
        self.path_dir_X = path_dir_X
        self.path_dir_Y = path_dir_Y
        self.list_data_X_file_name_all = os.listdir(path_dir_X)
        self.list_data_Y_file_name_all = os.listdir(path_dir_Y)
        self.n_data_all = len(self.list_data_X_file_name_all)
        #check
        if len(self.list_data_X_file_name_all) != len(self.list_data_Y_file_name_all):
            raise ValueError("error!!!")
        # suffle
        random.shuffle(self.list_data_X_file_name_all)
        random.shuffle(self.list_data_Y_file_name_all)
        # n_test
        if type(n_test)==int:
            self.n_test = n_test
        elif type(n_test)==float:
            self.n_test = int(len(self.list_data_X_file_name_all)*n_test)
        else:
            raise ValueError("error!!!")
        # n_val
        if type(n_val)==int:
            self.n_val = n_val
        elif type(n_val)==float:
            self.n_val = int(len(self.list_data_X_file_name_all)*n_val)
        else:
            raise ValueError("error!!!")
        #check
        if self.n_data_all <= self.n_test+self.n_val:
            raise ValueError("error!!!")
        # list_data_X_file_name_test / _val / _train
        self.list_data_X_file_name_test = self.list_data_X_file_name_all[:self.n_test]
        self.list_data_X_file_name_val = self.list_data_X_file_name_all[self.n_test:self.n_test+self.n_val]
        self.list_data_X_file_name_train = self.list_data_X_file_name_all[self.n_test+self.n_val:]
        # list_data_Y_file_name_test / _val / _train
        self.list_data_Y_file_name_test = self.list_data_Y_file_name_all[:self.n_test]
        self.list_data_Y_file_name_val = self.list_data_Y_file_name_all[self.n_test:self.n_test+self.n_val]
        self.list_data_Y_file_name_train = self.list_data_Y_file_name_all[self.n_test+self.n_val:]
        
    def __len__(self):
        return len(self.list_data_X_file_name_train)
    
    def __getitem__(self, x):
        #data_X
        data_X_file_name = self.list_data_X_file_name_train[x]
        path_file_X = "{0}/{1}".format(self.path_dir_X, data_X_file_name)
        data_X = np.load(path_file_X, allow_pickle=True)
        data_X = torch.from_numpy(data_X)
        #data_Y
        data_Y_file_name =data_X_file_name #= self.list_data_Y_file_name[x]
        path_file_Y = "{0}/{1}".format(self.path_dir_Y, data_Y_file_name)
        data_Y = np.load(path_file_Y, allow_pickle=True)
        data_Y = torch.from_numpy(data_Y)
        #return
        return data_X, data_Y
    
    def getitem_val_test(self, list_data_X_file_name, x):
        #data_X
        data_X_file_name = list_data_X_file_name[x]
        path_file_X = "{0}/{1}".format(self.path_dir_X, data_X_file_name)
        data_X = np.load(path_file_X, allow_pickle=True)
        data_X = torch.from_numpy(data_X)
        #data_Y
        data_Y_file_name = data_X_file_name #= self.list_data_Y_file_name[x]
        path_file_Y = "{0}/{1}".format(self.path_dir_Y, data_Y_file_name)
        data_Y = np.load(path_file_Y, allow_pickle=True)
        data_Y = torch.from_numpy(data_Y)
        #return
        return data_X, data_Y
    
    def return_n_data_all(self):
        return self.n_data_all
    
    def return_n_test(self):
        return self.n_test
    
    def return_n_val(self):
        return self.n_val
    
    def return_n_train(self):
        return self.n_data_all - self.n_val - self.n_test
    
    def return_test_data(self):
        #https://www.tutorialspoint.com/how-to-join-tensors-in-pytorch
        #data_X_test, data_Y_test
        list_data_X_file_name = self.list_data_X_file_name_test
        n_data = self.n_test
        data_X_test = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[0] for i in range(n_data)])
        data_Y_test = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[1] for i in range(n_data)])
        return data_X_test, data_Y_test
    
    def return_val_data(self):
        #https://www.tutorialspoint.com/how-to-join-tensors-in-pytorch
        #data_X_val, data_Y_val
        list_data_X_file_name = self.list_data_X_file_name_val
        n_data = self.n_val
        data_X_val = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[0] for i in range(n_data)])
        data_Y_val = torch.stack([self.getitem_val_test(list_data_X_file_name, i)[1] for i in range(n_data)])
        return data_X_val, data_Y_val


#---------------------------------------------
#var
path_dir_X = "../data_X"
path_dir_Y = "../data_Y_Task3"
n_test = 100
n_val = 100
batch_size = 10

#---------------------------------------------
#instance
dataset = CustomDataset(path_dir_X=path_dir_X, path_dir_Y=path_dir_Y, n_test=n_test, n_val=n_val)
dataloder = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
for i, (data_X, data_Y) in enumerate(dataloder):
   # print(data_X.shape, data_Y.shape)
   print(data_Y)

In [64]:
dataset.return_n_data_all()

16117

In [65]:
dataset.return_n_test()

100

In [66]:
dataset.return_n_val()

100

In [76]:
dataset.return_val_data()[1].shape

torch.Size([100, 75, 1])

In [77]:
dataset.return_test_data()[1].shape

torch.Size([100, 75, 1])

---

# Test (model)

In [None]:
class UNet(nn.Module):
    def __init__(self, n_class):
        super().__init__()
        
        # Encoder
        # In the encoder, convolutional layers with the Conv2d function are used to extract features from the input image. 
        # Each block in the encoder consists of two convolutional layers followed by a max-pooling layer, with the exception of the last block which does not include a max-pooling layer.
        # -------
        # input: 572x572x3
        self.e11 = nn.Conv2d(3, 64, kernel_size=3, padding=1) # output: 570x570x64
        self.e12 = nn.Conv2d(64, 64, kernel_size=3, padding=1) # output: 568x568x64
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 284x284x64

        # input: 284x284x64
        self.e21 = nn.Conv2d(64, 128, kernel_size=3, padding=1) # output: 282x282x128
        self.e22 = nn.Conv2d(128, 128, kernel_size=3, padding=1) # output: 280x280x128
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 140x140x128

        # input: 140x140x128
        self.e31 = nn.Conv2d(128, 256, kernel_size=3, padding=1) # output: 138x138x256
        self.e32 = nn.Conv2d(256, 256, kernel_size=3, padding=1) # output: 136x136x256
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 68x68x256

        # input: 68x68x256
        self.e41 = nn.Conv2d(256, 512, kernel_size=3, padding=1) # output: 66x66x512
        self.e42 = nn.Conv2d(512, 512, kernel_size=3, padding=1) # output: 64x64x512
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 32x32x512

        # input: 32x32x512
        self.e51 = nn.Conv2d(512, 1024, kernel_size=3, padding=1) # output: 30x30x1024
        self.e52 = nn.Conv2d(1024, 1024, kernel_size=3, padding=1) # output: 28x28x1024


        # Decoder
        self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.d11 = nn.Conv2d(1024, 512, kernel_size=3, padding=1)
        self.d12 = nn.Conv2d(512, 512, kernel_size=3, padding=1)

        self.upconv2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.d21 = nn.Conv2d(512, 256, kernel_size=3, padding=1)
        self.d22 = nn.Conv2d(256, 256, kernel_size=3, padding=1)

        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.d31 = nn.Conv2d(256, 128, kernel_size=3, padding=1)
        self.d32 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

        self.upconv4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.d41 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.d42 = nn.Conv2d(64, 64, kernel_size=3, padding=1)

        # Output layer
        self.outconv = nn.Conv2d(64, n_class, kernel_size=1)


    def forward(self, x):
        # Encoder
        xe11 = relu(self.e11(x))
        xe12 = relu(self.e12(xe11))
        xp1 = self.pool1(xe12)

        xe21 = relu(self.e21(xp1))
        xe22 = relu(self.e22(xe21))
        xp2 = self.pool2(xe22)

        xe31 = relu(self.e31(xp2))
        xe32 = relu(self.e32(xe31))
        xp3 = self.pool3(xe32)

        xe41 = relu(self.e41(xp3))
        xe42 = relu(self.e42(xe41))
        xp4 = self.pool4(xe42)

        xe51 = relu(self.e51(xp4))
        xe52 = relu(self.e52(xe51))

        # Decoder
        xu1 = self.upconv1(xe52)
        xu11 = torch.cat([xu1, xe42], dim=1)
        xd11 = relu(self.d11(xu11))
        xd12 = relu(self.d12(xd11))

        xu2 = self.upconv2(xd12)
        xu22 = torch.cat([xu2, xe32], dim=1)
        xd21 = relu(self.d21(xu22))
        xd22 = relu(self.d22(xd21))

        xu3 = self.upconv3(xd22)
        xu33 = torch.cat([xu3, xe22], dim=1)
        xd31 = relu(self.d31(xu33))
        xd32 = relu(self.d32(xd31))

        xu4 = self.upconv4(xd32)
        xu44 = torch.cat([xu4, xe12], dim=1)
        xd41 = relu(self.d41(xu44))
        xd42 = relu(self.d42(xd41))

        # Output layer
        out = self.outconv(xd42)

        return out