- Based on: https://arxiv.org/pdf/1708.00838v1.pdf
- Code: https://github.com/kunalrdeshmukh/End-to-end-compression

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

%cd drive/My\ Drive/SWE

Mounted at /content/drive/
/content/drive/My Drive/SWE


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import tqdm
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

### Data

In [None]:
ls datasets/

[0m[01;34mmini15002[0m/  mini.tar.xz  timesteps.npy


In [None]:
timesteps = np.load('datasets/timesteps.npy')
print(timesteps.shape)

(1000, 1, 300, 300)


### Network

In [None]:
from copy import deepcopy

old_timesteps = deepcopy(timesteps)

In [None]:
CHANNELS = 1
HEIGHT = timesteps.shape[2]
WIDTH = timesteps.shape[2]
EPOCHS = 200
#BATCH_SIZE = 2
#n_batches = int(timesteps.shape[0]/BATCH_SIZE)
LOG_INTERVAL = int(timesteps.shape[0]/10)
TRAIN_SIZE = int(0.8 * timesteps.shape[0])

#timesteps = old_timesteps

In [None]:
# timesteps = (timesteps - np.min(timesteps)) / (np.max(timesteps) - np.min(timesteps))
def standard_tts(timesteps):
    
  train = timesteps[:TRAIN_SIZE]
  test = timesteps[TRAIN_SIZE:]

  train_mean = np.mean(train)
  train_std = np.mean(train)

  # Normalize
  train = (train - train_mean) / (train_std)
  test = (test - train_mean) / (train_std)

  return train, test, train_mean, train_std

def min_max_tts(timesteps):
    
  train = timesteps[:TRAIN_SIZE]
  test = timesteps[TRAIN_SIZE:]

  train_max = np.max(train)
  train_min = np.min(train)

  # Normalize
  train = (train - train_min) / (train_max - train_min)
  test = (test - train_min) / (train_max - train_min)

  return train, test, train_max, train_min

In [None]:
train, test, train_max, train_min = min_max_tts(timesteps)

In [None]:
class Interpolate(nn.Module):
    def __init__(self, size, mode):
        super(Interpolate, self).__init__()
        self.interp = nn.functional.interpolate
        self.size = size
        self.mode = mode
        
    def forward(self, x):
        x = self.interp(x, size=self.size, mode=self.mode, align_corners=False)
        return x

In [None]:
class End_to_end(nn.Module):
  def __init__(self):
    super(End_to_end, self).__init__()
    
    # Encoder
    self.conv1 = nn.Conv2d(CHANNELS, 64, kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=0)
    self.bn1 = nn.BatchNorm2d(64, affine=False)
    self.conv3 = nn.Conv2d(64, CHANNELS, kernel_size=3, stride=1, padding=1)
    
    # Decoder
    self.interpolate = Interpolate(size=HEIGHT, mode='bilinear')
    self.deconv1 = nn.Conv2d(CHANNELS, 64, 3, stride=1, padding=1)
    self.deconv2 = nn.Conv2d(64, 64, 3, stride=1, padding=1)
    self.bn2 = nn.BatchNorm2d(64, affine=False)
    
    self.deconv_n = nn.Conv2d(64, 64, 3, stride=1, padding=1)
    self.bn_n = nn.BatchNorm2d(64, affine=False)

    
    self.deconv3 = nn.ConvTranspose2d(64, CHANNELS, 3, stride=1, padding=1)
    
    self.relu = nn.ReLU()
  
  def encode(self, x):
    out = self.relu(self.conv1(x))
    out = self.relu(self.conv2(out))
    out = self.bn1(out)
    return self.conv3(out)
    
  
  def reparameterize(self, mu, logvar):
    pass
  
  def decode(self, z):
    upscaled_image = self.interpolate(z)
    out = self.relu(self.deconv1(upscaled_image))
    out = self.relu(self.deconv2(out))
    out = self.bn2(out)
    for _ in range(5):
      out = self.relu(self.deconv_n(out))
      out = self.bn_n(out)
    out = self.deconv3(out)
    final = upscaled_image + out
    return final,out,upscaled_image

    
  def forward(self, x):
    com_img = self.encode(x)
    final,out,upscaled_image = self.decode(com_img)
    return final, out, upscaled_image, com_img, x

In [None]:
CUDA = torch.cuda.is_available()
if CUDA:
  model = End_to_end().cuda()
else :
  model = End_to_end()

print("GPU available ? "+str(CUDA))
  
optimizer = optim.Adam(model.parameters(), lr=1e-2)

GPU available ? True


### Training

In [None]:
def loss_function(final_img,residual_img,upscaled_img,com_img,orig_img):
    
    com_loss = nn.MSELoss(size_average=False)(orig_img, final_img)
    rec_loss = nn.MSELoss(size_average=False)(residual_img,orig_img-upscaled_img)

    return com_loss + rec_loss

In [None]:
train_losses = []
test_losses = []

for epoch in range(EPOCHS):
    x = 0
    
    # Shuffle to remove sequentiality bias
    np.random.shuffle(train)

    train_loss = 0

    # TODO: create batches and train the net with them  
    for b in range(timesteps.shape[0]):

        model.train()
        train_loss = 0

        optimizer.zero_grad()
        # data = torch.Tensor(train[x:x+BATCH_SIZE, :, :, :WIDTH])
        data = torch.Tensor(train[x, :, :, :WIDTH]).expand(1, -1, -1, -1)
            
        if CUDA:
          final, residual_img, upscaled_image, com_img, orig_im = model(data.cuda())
        else :
          final, residual_img, upscaled_image, com_img, orig_im = model(data)

        loss = loss_function(final, residual_img, upscaled_image, com_img, orig_im)
        
        loss.backward()
        
        train_loss += loss.item()
        
        if b % int(train.shape[0]/100) == 0 and b is not 0:
          # train loss
          train_losses.append(loss.item())

          # test loss
          data = torch.Tensor(test[int(test.shape[0]/2), :, :, :WIDTH]).expand(1,1, -1, -1)          
          final, residual_img, upscaled_image, com_img, orig_im = model(data.cuda())
          test_loss = loss_function(final, residual_img, upscaled_image, com_img, orig_im)
          test_losses.append(test_loss.item())

          # plot
          plt.title("Epoch {}-{}".format(epoch, b))
          plt.plot(train_losses, label="train")
          plt.plot(test_losses, label="test")
          plt.legend()
          plt.show()

          #plt.matshow(orig_im.detach().cpu().numpy()[0, 0])
          plt.matshow(final.detach().cpu().numpy()[0, 0])
          plt.show()
        
        optimizer.step()

    # -----
    train_losses = []
    test_losses = []
    print('====> Epoch: {} Batch {} Average loss: {:.4f}'.format(epoch, b, train_loss / timesteps.shape[0]))

    # test predict and comparison
    data = torch.Tensor(test[int(test.shape[0]/2), :, :, :WIDTH]).expand(1, -1, -1, -1)
    final, residual_img, upscaled_image, com_img, orig_im = model(data.cuda())

    torch.save(model.state_dict(), "datasets/model")

Output hidden; open in https://colab.research.google.com to view.

In [None]:
data = torch.Tensor(timesteps[600, :, :, :WIDTH]).expand(1,1, -1, -1)          
final, residual_img, upscaled_image, com_img, orig_im = model(data.cuda())
plt.matshow(orig_im.cpu().detach().numpy()[0,0])
plt.matshow(final.cpu().detach().numpy()[0,0])