In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

In [2]:
import pandas as pd
import numpy as np
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401 unused import

In [3]:
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

In [4]:
from tqdm.notebook import tqdm
from PIL import Image
import os

In [5]:
## parameters
### 五张图片一组
step=5

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

### read images

In [7]:
path = r'C:\Users\liuya\Downloads\3d_printing_research\clipped_samples_otsu'
image_list = os.listdir(path)

In [8]:
full_path = os.path.join(path, image_list[350])
np.array(Image.open(full_path).convert('RGB')).shape

(250, 730, 3)

In [9]:
image_ls = []

for i in tqdm(image_list):
    full_path = os.path.join(path, i)
    img = Image.open(full_path).convert('L')
    img_array = np.asarray(img)
    image_ls.append(img_array)

  0%|          | 0/4046 [00:00<?, ?it/s]

In [10]:
def sliding_window(datas,steps=1,width=step):
    win_set=[]
    for i in tqdm(np.arange(0,len(datas),steps)):
        temp=datas[i:i+width]
        if len(temp)==width:
            win_set.append(np.array(temp)[None,:,:,:])
    return np.array(win_set)

In [11]:
data_input = sliding_window(image_ls,steps=5,width=step)

  0%|          | 0/810 [00:00<?, ?it/s]

In [12]:
data_input = data_input / 255

In [13]:
data_input = torch.tensor(data_input, dtype=torch.float32)

In [14]:
## 5-dimensions: (N,C,D,H,W) batch, channels=1 (grey), # images in a sequence, height, weight
## for 4-dimensions: (N,C,D,W)
data_input.shape

torch.Size([809, 1, 5, 250, 730])

In [15]:
data_input.view((-1, 1, data_input.shape[3], data_input.shape[4])).shape

torch.Size([4045, 1, 250, 730])

### build the model

In [16]:
class Conv_NN_encoder(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.encoder = nn.Sequential(
            ## 250*730=182500
            nn.Linear(250*730, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 32))
        
    def forward(self, x):
        batch_size = x.shape[0]
        n_tau = x.shape[2]
        #1 means 1 channel, multiply sample size with tau to create this many of 1*W vectors
        x = x.view((-1, x.shape[3]*x.shape[4]))
        encoded = self.encoder(x)
        #back to the previous dimension (sample_size, tau)
        encoded = encoded.view((batch_size, n_tau, -1))
        return encoded

In [17]:
class Lstm_encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm1 = nn.LSTM(input_size=32, hidden_size=16)
        
    def forward(self, x):
        #reshape x to fit the input requirement of lstm
        x = x.permute(1, 0, 2)
        output, (hidden, cell) = self.lstm1(x)
        # output include all timestep, while hidden just include the last timestep.
        hidden = hidden.repeat((output.shape[0], 1, 1))
        return hidden

In [18]:
class Lstm_decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm1 = nn.LSTM(input_size=16, hidden_size=32)
    
    def forward(self, x):
        # not need to reshape
        output, hn = self.lstm1(x)
        #reshape output
        output = output.permute(1, 0, 2)
        return output

In [19]:
class Conv_NN_decoder(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.decoder = nn.Sequential(
            nn.Linear(32, 128),
            nn.ReLU(),
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Linear(512, 182500),
            nn.Sigmoid())

        
    def forward(self, x):
        x = x.contiguous().view((-1, x.shape[2]))
        decoded = self.decoder(x)
        #output = self.pad(output)
        #back to original size
        decoded = decoded.view((-1, 1, 5, 250, 730))
        return decoded

In [20]:
class net(nn.Module):
    def __init__(self, *args):
        super().__init__()
        self.Conv_NN_encoder = args[0]
        self.Lstm_encoder = args[1]
        self.Lstm_decoder = args[2]
        self.Conv_NN_decoder = args[3]
    
    def forward(self, x):
        output = self.Conv_NN_encoder(x)
        output = self.Lstm_encoder(output)
        output = self.Lstm_decoder(output)
        output = self.Conv_NN_decoder(output)
        return output

In [21]:
model = net(Conv_NN_encoder(), Lstm_encoder(), Lstm_decoder(), Conv_NN_decoder())

### train the model

In [22]:
def train(model, device, train_loader, optimizer, epoch):
    
    model.train() #trian model
    for batch_idx, data in enumerate(train_loader):
        data = data.to(device)
        optimizer.zero_grad()
        output = model(data)

        ##calculate loss
        loss = 0
        for i in range(data.shape[0]):
            loss += F.mse_loss(output[i], data[i], reduction='sum')
        loss /= data.shape[0]
        #loss = F.mse_loss(output, data)
        loss.backward()
        optimizer.step()
        # print result every 10 batch
        if batch_idx % 10 == 0:
            print('Train Epoch: {} ... Batch: {} ... Loss: {:.8f}'.format(epoch, batch_idx, loss))

In [23]:
def test(model, device, test_loader):
    model.eval() #evaluate model
    test_loss = 0
    with torch.no_grad():
        for data in test_loader:
            data = data.to(device)
            output = model(data)
            #calculate sum loss
            test_loss += F.mse_loss(output, data, reduction='sum').item()
    
        test_loss /= len(test_loader.dataset)
        print('------------------- Test set: Average loss: {:.4f} ... Samples: {}'.format(test_loss, len(test_loader.dataset)))

### train test split

In [24]:
train_window_, val_window_ = train_test_split(data_input, test_size=0.2, random_state=2022)

In [25]:
train_loader = torch.utils.data.DataLoader(train_window_, batch_size=16,shuffle=True)
test_loader = torch.utils.data.DataLoader(val_window_, batch_size=16,shuffle=False)

In [26]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [27]:
model = model.to(device)

In [34]:
optimizer = optim.Adam(model.parameters(), lr=0.5)

In [35]:
epochs = 10

In [36]:
for epoch in range(1, epochs + 1):
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)

Train Epoch: 1 ... Batch: 0 ... Loss: 23490.80468750
Train Epoch: 1 ... Batch: 10 ... Loss: 32708.57226562
Train Epoch: 1 ... Batch: 20 ... Loss: 33530.69531250
Train Epoch: 1 ... Batch: 30 ... Loss: 22800.57812500
Train Epoch: 1 ... Batch: 40 ... Loss: 40667.35937500
------------------- Test set: Average loss: 26292.4883 ... Samples: 162
Train Epoch: 2 ... Batch: 0 ... Loss: 25772.32031250
Train Epoch: 2 ... Batch: 10 ... Loss: 36163.54296875
Train Epoch: 2 ... Batch: 20 ... Loss: 25318.92578125
Train Epoch: 2 ... Batch: 30 ... Loss: 23073.33203125
Train Epoch: 2 ... Batch: 40 ... Loss: 26609.96289062
------------------- Test set: Average loss: 26247.7859 ... Samples: 162
Train Epoch: 3 ... Batch: 0 ... Loss: 28436.42578125
Train Epoch: 3 ... Batch: 10 ... Loss: 32434.58984375
Train Epoch: 3 ... Batch: 20 ... Loss: 30098.04101562
Train Epoch: 3 ... Batch: 30 ... Loss: 18843.65039062
Train Epoch: 3 ... Batch: 40 ... Loss: 21970.05273438
------------------- Test set: Average loss: 26702