In [3]:
import torch
from torch.nn import functional as F
from torch import nn
import pytorch_lightning as pl
from pytorch_lightning.core.lightning import LightningModule

import h5py

In [5]:
class ConvolutionBlock(nn.Module):
    def __init__(self, in_channels, out_channels, conv_kernel, dropout=0., activation=nn.ReLU(), pool=nn.MaxPool2d(2), **kwargs):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, conv_kernel, **kwargs)
        self.activation = activation
        self.dropout = nn.Dropout(dropout)
        self.pool = pool

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        output = self.conv(X)
        if self.activation is not None:
            output = self.activation(output)
        output = self.dropout(output)
        output = self.pool(output)
        return output


class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, conv_kernel, dropout=0., activation=nn.ReLU(), upsample_size=10, **kwargs):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, conv_kernel, **kwargs)
        self.activation = activation
        self.dropout = nn.Dropout(dropout)
        self.upsample_size = upsample_size

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        output = F.interpolate(X, scale_factor=self.upsample_size, mode="bilinear", align_corners=True)
        output = self.conv(output)
        if self.activation is not None:
            output = self.activation(output)
        output = self.dropout(output)
        return output


class BaseEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_1 = ConvolutionBlock(1, 8, 11, pool=nn.MaxPool2d(4))
        self.conv_2 = ConvolutionBlock(8, 16, 7)
        self.conv_3 = ConvolutionBlock(16, 32, 5, pool=nn.MaxPool2d(4))
        self.conv_4 = ConvolutionBlock(32, 64, 3)
        self.conv_5 = ConvolutionBlock(64, 128, 3, pool=nn.MaxPool2d(4))
        self.conv_6 = ConvolutionBlock(128, 256, 1, activation=None)
        self.layers = [self.conv_1, self.conv_2, self.conv_3, self.conv_4, self.conv_5, self.conv_6]

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        output = self.conv_1(X)
        for layer in self.layers[1:]:
            output = layer(output)
        return output


class BaseDecoder(nn.Module):
    def __init__(self, sigmoid=True):
        super().__init__()
        self.conv_1 = DecoderBlock(256, 128, 5, upsample_size=8)
        self.conv_2 = DecoderBlock(128, 64, 7, upsample_size=8)
        self.conv_3 = DecoderBlock(64, 32, 5, upsample_size=6)
        self.conv_4 = DecoderBlock(32, 16, 3, upsample_size=2)
        self.conv_5 = DecoderBlock(16, 8, 3, upsample_size=2)
        self.conv_6 = DecoderBlock(8, 1, 3, upsample_size=2)
        self.conv_7 = DecoderBlock(1, 1, 3,  activation=nn.Sigmoid(), upsample_size=1)
        self.layers = [self.conv_1, self.conv_2, self.conv_3, self.conv_4, self.conv_5, self.conv_6, self.conv_7]

    def forward(self, X: torch.Tensor):
        output = self.conv_1(X)
        for layer in self.layers[1:]:
            output = layer(output)
        return output


class AutoEncoder(LightningModule):
    def __init__(self, encoder, decoder, lr=1e-3):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.lr = lr

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        z = self.encoder(X)
        return self.decoder(z)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), self.lr)
        return optimizer

    def training_step(self, batch, batch_idx):
        X, Y = batch
        pred_Y = self.forward(X)
        loss = F.binary_cross_entropy(pred_Y, Y)
        tensorboard_logs = {"train_loss": loss}
        return {"loss": loss, "log": tensorboard_logs}

In [3]:
def calculate_shape(N, kernel, stride, padding):
    return stride * (N - 1) + kernel - 2 * padding

In [291]:
test = torch.rand(1, 1, 1200, 1200)

In [52]:
block1 = ConvolutionBlock(1, 8, 11, pool=nn.MaxPool2d(4))
block2 = ConvolutionBlock(8, 16, 7)
block3 = ConvolutionBlock(16, 32, 5, pool=nn.MaxPool2d(4))
block4 = ConvolutionBlock(32, 64, 3)
block5 = ConvolutionBlock(64, 128, 3, pool=nn.MaxPool2d(4))
block6 = ConvolutionBlock(128, 256, 1)

In [107]:
encoder = BaseEncoder()

In [108]:
encoder(test).shape

NameError: name 'test' is not defined

In [236]:
z = torch.rand(1, 256, 1, 1)

In [285]:
decoder = BaseDecoder()

In [286]:
decoder(z).shape

torch.Size([1, 1, 1200, 1200])

In [146]:
decoder(z)

tensor([[[[0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966],
          [0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966],
          [0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966],
          ...,
          [0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966],
          [0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966],
          [0.4966, 0.4966, 0.4966,  ..., 0.4966, 0.4966, 0.4966]]]],
       grad_fn=<SigmoidBackward>)

In [12]:
autoencoder = AutoEncoder(BaseEncoder(), BaseDecoder())

## Data setup

In [14]:
from dii.pipeline.datautils import CompositeH5Dataset
from dii.pipeline.transforms import default_pipeline
from torch.utils.data import DataLoader

In [19]:
train = CompositeH5Dataset("../../data/raw/ion_images.h5", "true", default_pipeline, indices=[i for i in range(20)])

In [20]:
train_loader = DataLoader(train, batch_size=16, num_workers=4)

In [21]:
trainer = pl.Trainer()

GPU available: False, used: False
TPU available: False, using: 0 TPU cores


In [22]:
trainer.fit(autoencoder, train_loader)


  | Name    | Type        | Params
----------------------------------------
0 | encoder | BaseEncoder | 145 K 
1 | decoder | BaseDecoder | 1 M   


Epoch 1:   1%|▏         | 9/625 [02:04<2:21:37, 13.79s/it, loss=0.579, v_num=11]


1