In [6]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from tqdm import tqdm
from scipy.ndimage import gaussian_filter
from torchsummary import summary
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from math import exp
import torch.nn.functional as F


In [7]:
class Generator(nn.Module):
    def __init__(self, neurons=2, kern_sz=3, in_channels=1, out_channels=1):
        super(Generator,self).__init__() 
        #encoder block 
        self.conv1a = nn.Conv2d(in_channels=in_channels, out_channels=neurons, kernel_size=kern_sz, stride=1, padding=1)
        self.conv1b = nn.Conv2d(in_channels=neurons, out_channels=neurons*2, kernel_size=kern_sz, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(neurons*2)
        self.pool1 = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.conv2a = nn.Conv2d(in_channels=neurons*2, out_channels=neurons*4, kernel_size=kern_sz, stride=1, padding=1)
        self.conv2b = nn.Conv2d(in_channels=neurons*4, out_channels=neurons*4, kernel_size=kern_sz, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(neurons*4)
        self.pool2 = nn.MaxPool2d(kernel_size = 3, stride=2,padding=1) 

        self.conv3a = nn.Conv2d(in_channels=neurons*4, out_channels=neurons*8, kernel_size=kern_sz, stride=1, padding=1)
        self.conv3b = nn.Conv2d(in_channels=neurons*8, out_channels=neurons*8, kernel_size=kern_sz, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(neurons*8)
        self.pool3 = nn.MaxPool2d(kernel_size = 3, stride=5,padding=1) 

        self.conv4a = nn.Conv2d(in_channels=neurons*8, out_channels=neurons*16, kernel_size=kern_sz, stride=1, padding=1)
        self.conv4b = nn.Conv2d(in_channels=neurons*16, out_channels=neurons*16, kernel_size=kern_sz, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(neurons*16)
        self.pool4 = nn.MaxPool2d(kernel_size = 3, stride=5,padding=1) 
    
        # Bottleneck
        self.conv5a = nn.Conv2d(in_channels=neurons*16, out_channels=neurons*32, kernel_size=kern_sz, stride=1, padding=1)
        self.conv5b = nn.Conv2d(in_channels=neurons*32, out_channels=neurons*32, kernel_size=kern_sz, stride=1, padding=1) 
        self.bn5 = nn.BatchNorm2d(neurons*32)
        self.pool5 = nn.MaxPool2d(kernel_size = 3, stride=3,padding=1) 
        
        # Decoder block (Upsampling)
        self.upconv4 = nn.ConvTranspose2d(in_channels=neurons*32, out_channels=neurons*16, kernel_size=3, stride=3,padding=0)
        self.dec4 = nn.Conv2d(in_channels=neurons*32, out_channels=neurons*16, kernel_size=kern_sz, stride=1, padding=1) 
        self.bnc4 = nn.BatchNorm2d(neurons*16)
        
        self.upconv3 = nn.ConvTranspose2d(in_channels=neurons*16, out_channels=neurons*8, kernel_size=3, stride=5,padding=0,output_padding=2)
        self.dec3 = nn.Conv2d(in_channels=neurons*16, out_channels=neurons*8, kernel_size=kern_sz, stride=1, padding=1) 
        self.bnc3 = nn.BatchNorm2d(neurons*8)
        
        self.upconv2 = nn.ConvTranspose2d(in_channels=neurons*8, out_channels=neurons*4, kernel_size=4, stride=5,padding=0,output_padding=1)
        self.dec2 = nn.Conv2d(in_channels=neurons*8, out_channels=neurons*4, kernel_size=kern_sz, stride=1, padding=1) 
        self.bnc2 = nn.BatchNorm2d(neurons*4)
        
        self.upconv1 = nn.ConvTranspose2d(in_channels=neurons*4, out_channels=neurons*2, kernel_size=4, stride=2,padding=1)
        self.dec1 = nn.Conv2d(in_channels=neurons*4, out_channels=neurons*2, kernel_size=kern_sz, stride=1, padding=1)
        self.bnc1 = nn.BatchNorm2d(neurons*2)

        self.fupconv = nn.ConvTranspose2d(in_channels=neurons*2, out_channels=out_channels, kernel_size=4, stride=2,padding=1)
        
    # def forward(self, x):
    #     # Encoder
    #     x1 = self.conv1b(self.conv1a(x))
    #     x1 = self.bn1(self.pool1(x1)) 
    #     x1 = F.relu(x1)
    #     x2 = self.conv2b(self.conv2a(x1))
    #     x2 = self.bn2(self.pool2(x2)) 
    #     x2 = F.relu(x2)
    #     x3 = self.conv3b(self.conv3a(x2))
    #     x3 = self.bn3(self.pool3(x3)) 
    #     x3 = F.relu(x3)
    #     x4 = self.conv4b(self.conv4a(x3))
    #     x4 = self.bn4(self.pool4(x4)) 
    #     x4 = F.relu(x4)

    #     # #bottleneck
    #     x5 = F.relu(self.bn5(self.pool5(self.conv5b(self.conv5a(x4)))))

    #     # # Decoder
    #     x = torch.cat([self.upconv4(x5),x4],dim=1)
    #     x = self.bnc4(self.dec4(x)) 
    #     x = F.relu(x)
    #     x = torch.cat([self.upconv3(x),x3],dim=1)
    #     x = self.bnc3(self.dec3(x)) 
    #     x = F.relu(x)
    #     x = torch.cat([self.upconv2(x),x2],dim=1)
    #     x = self.bnc2(self.dec2(x)) 
    #     x = F.relu(x)
    #     x = torch.cat([self.upconv1(x),x1],dim=1)
    #     x = self.bnc1(self.dec1(x))
    #     x = F.relu(x)
    #     x = F.sigmoid(self.fupconv(x))
        
    #     return x
    
    def forward(self, x):
        # Encoder
        x1 = self.conv1b(self.conv1a(x))
        x1 = self.pool1(x1)
        x1 = F.relu(x1)
        x2 = self.conv2b(self.conv2a(x1))
        x2 = self.pool2(x2)
        x2 = F.relu(x2)
        x3 = self.conv3b(self.conv3a(x2))
        x3 = self.pool3(x3)
        x3 = F.relu(x3)
        x4 = self.conv4b(self.conv4a(x3))
        x4 = self.pool4(x4)
        x4 = F.relu(x4)

        # #bottleneck
        x5 = F.relu(self.pool5(self.conv5b(self.conv5a(x4))))

        # # Decoder
        x = torch.cat([self.upconv4(x5),x4],dim=1)
        x = self.dec4(x) 
        x = F.relu(x)
        x = torch.cat([self.upconv3(x),x3],dim=1)
        x = self.dec3(x)
        x = F.relu(x)
        x = torch.cat([self.upconv2(x),x2],dim=1)
        x = self.dec2(x)
        x = F.relu(x)
        x = torch.cat([self.upconv1(x),x1],dim=1)
        x = self.dec1(x)
        x = F.relu(x)
        x = F.sigmoid(self.fupconv(x))
        
        return x
    


model = Generator(neurons=2, kern_sz=3, in_channels=1, out_channels=1)

summary(model, (1, 300,300));


Layer (type:depth-idx)                   Output Shape              Param #
├─Conv2d: 1-1                            [-1, 2, 300, 300]         20
├─Conv2d: 1-2                            [-1, 4, 300, 300]         76
├─MaxPool2d: 1-3                         [-1, 4, 150, 150]         --
├─Conv2d: 1-4                            [-1, 8, 150, 150]         296
├─Conv2d: 1-5                            [-1, 8, 150, 150]         584
├─MaxPool2d: 1-6                         [-1, 8, 75, 75]           --
├─Conv2d: 1-7                            [-1, 16, 75, 75]          1,168
├─Conv2d: 1-8                            [-1, 16, 75, 75]          2,320
├─MaxPool2d: 1-9                         [-1, 16, 15, 15]          --
├─Conv2d: 1-10                           [-1, 32, 15, 15]          4,640
├─Conv2d: 1-11                           [-1, 32, 15, 15]          9,248
├─MaxPool2d: 1-12                        [-1, 32, 3, 3]            --
├─Conv2d: 1-13                           [-1, 64, 3, 3]            18,4

In [32]:
class DatasetLoad(Dataset):
    def __init__(self, X_train, y_train ):
        """
        Args:
            features (numpy.ndarray): Numpy array containing the features.
            labels (numpy.ndarray): Numpy array containing the labels.
        """
        self.X_train = X_train
        self.y_train = y_train

    def __len__(self):
        return len(self.X_train)
    
    def __getitem__(self, idx):
        X_train = torch.from_numpy(self.X_train[idx]).float()
        y_train = torch.from_numpy(self.y_train[idx]).float()

        return X_train.to(device), y_train.to(device)
    
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [38]:
#### input and output dataset here

X_train = np.random.random((1000,1,300,300))
y_train = np.random.random((1000,1,300,300))
X_val = np.random.random((25,1,300,300))
y_val = np.random.random((25,1,300,300))

In [39]:
X_val_torch = torch.tensor(X_val).float().to(device)
y_val_torch = torch.tensor(y_val).float().to(device)
print(X_val_torch.shape)
print(y_val_torch.shape)

torch.Size([25, 1, 300, 300])
torch.Size([25, 1, 300, 300])


In [40]:
train_dataset = DatasetLoad(X_train,y_train)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)

In [41]:
model = Generator(neurons=2, kern_sz=3, in_channels=1, out_channels=1).to(device)
lr = 0.0001
loss_fn_mse = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = lr)

num_epochs = 250
BS = 20

total_samples = len(X_train)
n_batches = int(total_samples/ BS)
print_batches = (total_samples / BS ) / 4

record_NNLoss = []
record_ssim=[]
val_record_NNLoss = []
val_record_ssim=[np.nan]



In [43]:
#Reproducibility
torch.manual_seed(155)
np.random.seed(155)

traininglog=dict()
e_record_Loss = []
val_store_Loss = []
print('''Start Training Loop!''')
for epoch in range(num_epochs):
    print('''Epoch {} '''.format(epoch))
    
    batch_Loss = []
    for batch_i, (X_batch,y_batch) in enumerate(train_dataloader):

        #zero the parameter gradients
        optimizer.zero_grad()
        
        #activate model training
        model.train()
        #predict
        prediction = model(X_batch)
        #calculate loss 
        Loss = loss_fn_mse(prediction,y_batch) 
        
        #compute metric
        ### add your own metric here
                
        #Backpropagate
        Loss.backward()
        #Apply optimizer
        optimizer.step()
        
        #Print out batch progress
        if batch_i % print_batches == 0:
            print('[%d/%d], Batch:[%d/%d]\t Loss: %.9f \t'
                  % (epoch,num_epochs,batch_i,n_batches,Loss.item()))
        #Save batch losses
        batch_Loss.append(Loss.item())
        
    #Compute epoch losses
    epoch_Loss = np.mean(batch_Loss)
    
    #Print out Epoch Progress
    print('EPOCH %d Training -> Loss: %.9f \t ' 
           %(epoch,epoch_Loss))

    #save epoch progress
    e_record_Loss.append(epoch_Loss)
    
    #save into dictionary
    traininglog['Epoch_Loss'] = e_record_Loss
    
    ###############################
    ######### Validation ##########
    ###############################
    with torch.no_grad():

        #activate model evaluation
        model.eval()
        #predict
        val_prediction = model(X_val_torch)
        #calculate loss
        val_Loss = loss_fn_mse(val_prediction,y_val_torch) 


        #Print validation progress
        print('EPOCH %d Validation -> Loss: %.9f \t'
               %(epoch, val_Loss.item()))
    

    #save into dictionary
    val_store_Loss.append(val_Loss.item())
    traininglog['val_Loss'] = val_store_Loss



Start Training Loop!
Epoch 0 
[0/250], Batch:[0/50]	 Loss: 0.083301976 	
[0/250], Batch:[25/50]	 Loss: 0.083440006 	
[0/250], Batch:[50/50]	 Loss: 0.083375834 	
[0/250], Batch:[75/50]	 Loss: 0.083442397 	
EPOCH 0 Training -> Loss: 0.083396536 	 
EPOCH 0 Validation -> Loss: 0.083376840 	
Epoch 1 
[1/250], Batch:[0/50]	 Loss: 0.083449826 	
[1/250], Batch:[25/50]	 Loss: 0.083297335 	
[1/250], Batch:[50/50]	 Loss: 0.083247833 	
[1/250], Batch:[75/50]	 Loss: 0.083489195 	
EPOCH 1 Training -> Loss: 0.083357497 	 
EPOCH 1 Validation -> Loss: 0.083349720 	
Epoch 2 
[2/250], Batch:[0/50]	 Loss: 0.083445534 	
[2/250], Batch:[25/50]	 Loss: 0.083279826 	
[2/250], Batch:[50/50]	 Loss: 0.083389431 	


KeyboardInterrupt: 