In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torch
import torchvision
import h5py
import os
import sys
import scipy
import damselfly as df

PATH = '/storage/home/adz6/group/project'
RESULTPATH = os.path.join(PATH, 'results/damselfly')
PLOTPATH = os.path.join(PATH, 'plots/damselfly')
DATAPATH = os.path.join(PATH, 'damselfly/data/datasets')

"""
Date: 7/20/2021
Description: prototype autoencoder
"""

In [None]:
class ConvDecoder2(torch.nn.Module):
    
    def __init__(self, deconv_block1, deconv_block2, maxunpool_kerns, linear_block):
        super(ConvDecoder2, self).__init__()
        
        self.layer1 = DecLinearBlock(linear_block)
        
        self.unflatten = torch.nn.Unflatten(dim=1, unflattened_size = (deconv_block1[0][0], linear_block[0][0] ))
        self.maxunpool1 = torch.nn.MaxUnpool1d(kernel_size=maxunpool_kerns[0])
        self.layer2 = DeconvBlock(deconv_block1)
        self.maxunpool2 = torch.nn.MaxUnpool1d(kernel_size=maxunpool_kerns[1])
        self.layer3 = DeconvBlock(deconv_block2)
        
    def forward(self, x, indices):
        
        x = self.layer1(x)
        
        x = self.unflatten(x)
        x = self.maxunpool1(x, indices[-1])
        x = self.layer2(x)
        x = self.maxunpool2(x, indices[-2])
        x = self.layer3(x)
        
        return x
    
class ConvEncoder2(torch.nn.Module):
    
    def __init__(self, conv_block1, conv_block2, maxpool_kerns, linear_block ):
        super(ConvEncoder2, self).__init__()
        
        self.layer1 = ConvBlock(conv_block1)
        self.maxpool1 = torch.nn.MaxPool1d(kernel_size=maxpool_kerns[0], return_indices=True)
        self.layer2 = ConvBlock(conv_block2)
        self.maxpool2 = torch.nn.MaxPool1d(kernel_size=maxpool_kerns[1], return_indices=True)
        self.flatten = torch.nn.Flatten()
        
        self.layer3 = EncLinearBlock(linear_block)
        
    def forward(self, x):
        
        x = self.layer1(x)
        x, indices1 = self.maxpool1(x)
        x = self.layer2(x)
        x, indices2 = self.maxpool2(x)
        x = self.flatten(x)
        
        x = self.layer3(x)
        
        return x, (indices1, indices2)
    
class ConvBlock(torch.nn.Module):
    
    def __init__(self, conv_list):
        super(ConvBlock, self).__init__()
        
        self.activation = torch.nn.ReLU
        self.layer1 = self._make_block(conv_list, self.activation)
        
        
    def forward(self, x):
        
        return self.layer1(x)
        
    def _make_block(self, conv_list, activation):
        layers = []
        
        for item in conv_list:
            layers.append(torch.nn.Conv1d(item[0], item[1], kernel_size=item[2], 
                                          dilation=item[3], stride=1, padding=item[2]//2, 
                                          padding_mode='circular'))
            layers.append(activation())
            
        return torch.nn.Sequential(*layers)
    
class DeconvBlock(torch.nn.Module):
    
    def __init__(self, deconv_list):
        super(DeconvBlock, self).__init__()
        
        self.activation = torch.nn.ReLU
        self.layer1 = self._make_block(deconv_list, self.activation)
        
        
    def forward(self, x):
        
        return self.layer1(x)
        
    def _make_block(self, deconv_list, activation):
        layers = []
        
        for item in deconv_list:
            layers.append(activation())
            layers.append(torch.nn.ConvTranspose1d(item[0], item[1], kernel_size=item[2], 
                                                   stride=1, padding=item[2]//2))
            
            
        return torch.nn.Sequential(*layers)
    
class EncLinearBlock(torch.nn.Module):

    def __init__(self, linear_block):
        super(EncLinearBlock, self).__init__()
        
        self.activation = torch.nn.ReLU
        self.layer1 = self._make_block(linear_block, self.activation)
        
    def forward(self, x):
        
        return self.layer1(x)
    
    def _make_block(self, linear_block, activation):
        layers = []
        
        for i in range(len(linear_block)):
            layers.append(torch.nn.Linear(linear_block[i][0], linear_block[i][1]))
            if i < len(linear_block) - 1:
                layers.append(activation())

        return torch.nn.Sequential(*layers)

class DecLinearBlock(torch.nn.Module):
    
    def __init__(self, linear_block):
        super(DecLinearBlock, self).__init__()
        
        self.activation = torch.nn.ReLU
        self.layer1 = self._make_block(linear_block, self.activation)
        
    def forward(self, x):
        
        return self.layer1(x)
    
    def _make_block(self, linear_block, activation):
        layers = []
        
        for i in range(len(linear_block)):
            layers.append(activation())
            layers.append(torch.nn.Linear(linear_block[i][0], linear_block[i][1]))

        return torch.nn.Sequential(*layers)

In [None]:
class ConvAE(torch.nn.Module):
    
    def __init__(self, 
                 conv_block1, 
                 conv_block2, 
                 maxpool_kerns, 
                 enc_linear_block, 
                 deconv_block1, 
                 deconv_block2, 
                 maxunpool_kerns, 
                 dec_linear_block
                ):
        super(ConvAE, self).__init__()
        
        self.encoder = ConvEncoder2(conv_block1, conv_block2, maxpool_kerns, enc_linear_block)
        self.decoder = ConvDecoder2(deconv_block1, deconv_block2, maxunpool_kerns, dec_linear_block)
        
    def forward(self, x):
        
        x, indices = self.encoder(x)
        x = self.decoder(x, indices)
        
        return x

In [None]:
convblock_list1 = [(3, 20, 13, 1), (20, 20, 13, 1)]
convblock_list2 = [(20, 40, 7, 1), (40, 40, 7, 1)]

deconvblock_list1 = [(40, 40, 7, 1), (40, 20, 7, 1)]
deconvblock_list2 = [(20, 20, 13, 1), (20, 3, 13, 1)]

maxpool_kerns = [8, 4]
maxunpool_kerns = [4, 8]

enc_linear_block = [(40 * 8192 // (8 * 4), 512 ), (512, 256)]
dec_linear_block = [(256, 512), (512, 40 * 8192 // (8 * 4))]

encoder_list = [convblock_list1, convblock_list2, maxpool_kerns, enc_linear_block]
decoder_list = [deconvblock_list1, deconvblock_list2, maxunpool_kerns, dec_linear_block]

enc = ConvEncoder2(convblock_list1, convblock_list2, maxpool_kerns, enc_linear_block)
block = ConvBlock(convblock_list1)
dec = ConvDecoder2(deconvblock_list1, deconvblock_list2, maxunpool_kerns, dec_linear_block)

ae = ConvAE(*encoder_list, *decoder_list)

In [None]:
ae

In [None]:
x = torch.randn((10, 3, 8192))
y = enc.forward(x)[0]
enc_indices = enc.forward(x)[1]

In [None]:
z = dec.forward(y, enc_indices)
print(z.shape)

In [None]:
ae.forward(x).shape

In [None]:
def ConvRelu(in_f, out_f, kernel, dilation):

    return torch.nn.Sequential(
                    torch.nn.Conv1d(in_f, out_f, kernel_size=kernel, stride=1, dilation=dilation),
                    torch.nn.ReLU()
                         )
def ConvTransRelu(in_f, out_f, kernel, dilation):

    return torch.nn.Sequential(
                    torch.nn.ReLU(),
                    torch.nn.ConvTranspose1d(in_f, out_f, kernel_size=kernel, stride=1, dilation=dilation)
                    
                         )

def ConvMaxpoolBlock(conv_in_f, conv_out_f, conv_kernel, conv_dilation, maxpool_kernel):

    conv_relu_blocks = []
    for convset in zip(conv_in_f, conv_out_f, conv_kernel, conv_dilation):
        #print(convset)
        conv_relu_blocks.append(ConvRelu(convset[0], convset[1], convset[2], convset[3]))
    #[ConvRelu(convset[0], convset[1], convset[2], convset[3]) for convset in zip(conv_in_f, conv_out_f, conv_kernel, conv_dilation)]
    
    return torch.nn.Sequential(
                        *conv_relu_blocks,
                        torch.nn.MaxPool1d(kernel_size=maxpool_kernel)
                        )

def ConvMaxUnpoolBlock(conv_in_f, conv_out_f, conv_kernel, conv_dilation, maxunpool_kernel):

    conv_trans_relu_blocks = []
    for convset in zip(conv_in_f, conv_out_f, conv_kernel, conv_dilation):
        #print(convset)
        conv_trans_relu_blocks.append(ConvTransRelu(convset[0], convset[1], convset[2], convset[3]))
    #[ConvRelu(convset[0], convset[1], convset[2], convset[3]) for convset in zip(conv_in_f, conv_out_f, conv_kernel, conv_dilation)]
    
    return torch.nn.Sequential(
                        torch.nn.MaxUnpool1d(kernel_size=maxunpool_kernel),
                        *conv_trans_relu_blocks
                        )

def StackConvMaxpool(conv_max_list):
    conv_max_blocks = []
    for conv_max_set in conv_max_list:
        #print(conv_max_set)
        conv_max_blocks.append(ConvMaxpoolBlock(conv_max_set[0], conv_max_set[1], conv_max_set[2], conv_max_set[3], conv_max_set[4]))
    #[ConvMaxpoolBlock(conv_max_set[0], conv_max_set[1], conv_max_set[2], conv_max_set[3], conv_max_set[4]) for conv_max_set in conv_max_list]
    
    return torch.nn.Sequential(*conv_max_blocks)

def StackConvMaxUnpool(conv_unmax_list):
    conv_unmax_blocks = []
    for conv_unmax_set in conv_unmax_list:
        #print(conv_max_set)
        conv_unmax_blocks.append(ConvMaxUnpoolBlock(conv_unmax_set[0], conv_unmax_set[1], conv_unmax_set[2], conv_unmax_set[3], conv_unmax_set[4]))
    #[ConvMaxpoolBlock(conv_max_set[0], conv_max_set[1], conv_max_set[2], conv_max_set[3], conv_max_set[4]) for conv_max_set in conv_max_list]
    
    return torch.nn.Sequential(*conv_unmax_blocks)

def LinearRelu(in_f, out_f):

    return torch.nn.Sequential(
                        torch.nn.Linear(in_f, out_f),
                        torch.nn.ReLU(),
                        )

def EncLinear(in_f, out_f):

    #linear_blocks = [LinearRelu(linearset[0], linearset[1]) for linearset in zip(in_f, out_f)]
    layers = []
    for i in range(len(in_f)):
        layers.append(torch.nn.Linear(in_f[i], out_f[i]))
        if i < len(in_f) - 1:
            layers.append(torch.nn.ReLU())
    
    return torch.nn.Sequential(*layers)

def DecLinear(in_f, out_f):

    #linear_blocks = [LinearRelu(linearset[0], linearset[1]) for linearset in zip(in_f, out_f)]
    layers = []
    for i in range(len(in_f)):
        layers.append(torch.nn.ReLU())
        layers.append(torch.nn.Linear(in_f[i], out_f[i]))
        
    
    return torch.nn.Sequential(*layers)


class ConvEncoder(torch.nn.Module):
    
    def __init__(self, conv_list, linear_list):
        super(ConvEncoder, self).__init__()
        
        self.conv = StackConvMaxpool(conv_list)
        self.flatten = torch.nn.Flatten(start_dim=1)
        self.linear = EncLinear(linear_list[0], linear_list[1])
        
    def forward(self, x):
        
        x = self.conv(x)
        x = self.flatten(x)
        x = self.linear(x)
        
        return x
    
class ConvDecoder(torch.nn.Module):
    
    def __init__(self, conv_list, linear_list):
        super(ConvDecoder, self).__init__()
        
        self.linear = DecLinear(linear_list[0], linear_list[1])
        self.unflatten = torch.nn.Unflatten(dim=1, unflattened_size = (conv_list[0][0][0], linear_list[-1][-1] // (conv_list[0][0][0])))
        self.deconv = StackConvMaxUnpool(conv_list)
        
    def forward(self, x):
        
        x = self.linear(x)
        x = self.unflatten(x)
        x = self.deconv(x)
        
        return x
        
        
def CalcConvMaxpoolOutputSize(conv_max_list, ninput_ch, ninput):
    conv_stack = StackConvMaxpool(conv_max_list)
    
    x = torch.rand((1, ninput_ch, ninput))
    #print(x.shape)
    x = conv_stack(x)
    
    size = x.size()[1:]
    num_features = 1
    for s in size:
        num_features *= s
    
    x = x.view(-1, num_features)
    
    return int(x.shape[-1])
       

In [None]:
enc_conv_list = [
                [
                    [3, 20], # in_f 
                    [20, 20], # out_f
                    [12, 12], # conv_kernels
                    [1, 1], # dilations
                    12 # maxpool_kernel + size
                ],
                [
                    [20, 40],
                    [40, 40],
                    [6, 6],
                    [1, 1],
                    6
                ],
                [
                    [40, 80],
                    [80, 80],
                    [3, 3],
                    [1, 1],
                    3
                ]
            ]

enc_linear_list = [
                [df.models.CalcConvMaxpoolOutputSize(conv_list, 3, 8192), 416],
                [416, 213],
            ]

dec_conv_list = [
                [
                    [80,80],
                    [80, 40],
                    [3, 3],
                    [1, 1],
                    3
                ],
                [
                    [40, 40],
                    [40, 20],
                    [6, 6],
                    [1, 1],
                    6
                ],
                [
                    [20, 20], # in_f 
                    [20, 3], # out_f
                    [12, 12], # conv_kernels
                    [1, 1], # dilations
                    12 # maxpool_kernel + size
                ],
                
                
            ]

dec_linear_list = [
                [213, 416],
                [416, df.models.CalcConvMaxpoolOutputSize(conv_list, 3, 8192)]
            ]

In [None]:
encoder = ConvEncoder(enc_conv_list, enc_linear_list)
decoder = ConvDecoder(dec_conv_list, dec_linear_list)

In [None]:
x = torch.randn(12, 3, 8192)

In [None]:
encoder.forward(x).shape

In [None]:
encoder

In [None]:
decoder

In [None]:
y = encoder.forward(x)
print(decoder.forward(y).shape)

In [None]:
def norm1d(planes):
    
    return torch.nn.BatchNorm1d(planes)

def conv1xn(in_planes, out_planes, kernel_size, stride = 1):

    return torch.nn.Conv1d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, dilation=1, padding_mode='circular', padding = kernel_size // 2, bias=False)

def conv1x1(in_planes, out_planes, stride=1):
    
    return torch.nn.Conv1d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

def downsample(in_planes, out_planes, stride = 1):
    
    return torch.nn.Sequential(conv1x1(in_planes, out_planes, stride = stride), norm1d(out_planes))

class ResBlock(torch.nn.Module):
    
    def __init__(self, inplanes, planes, in_kernel, out_kernel, stride=1):
        
        super(ResBlock, self).__init__()
        
        self.inplanes = inplanes
        
        self.planes = planes
        
        self.conv1 = conv1xn(self.inplanes, self.planes, in_kernel, stride=stride)
        
        self.bn1 = norm1d(self.planes)
        
        self.relu = torch.nn.ReLU(inplace=True)
        
        self.conv2 = conv1xn(self.planes, self.planes, out_kernel)
        
        self.bn2 = norm1d(self.planes)
        
        if self.inplanes != self.planes:
        
            self.downsample = downsample(self.inplanes, self.planes, stride=stride)
        
    def forward(self, x):
        
        identity = x
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        #print(out.shape, identity.shape)
        
        if self.inplanes != self.planes:
        
            identity = self.downsample(x)
            
        #print(out.shape, identity.shape)
        
        out += identity
        out = self.relu(out)
        
        return out
        
        

In [None]:
class resnet1d(torch.nn.Module):
    
    def __init__(self, block, block_list, stride=4, nclass = 2):
        
        super(resnet1d, self).__init__()
        
        self.inplanes = 64
        self.kernel_size = 7
        self.stride = stride
        self.output_size = 4096 // 4 ** 3
        
        self.conv1 = conv1xn(2, self.inplanes, 7, stride=1)
        self.bn1 = norm1d(self.inplanes)
        self.relu = torch.nn.ReLU(inplace=True)
        self.maxpool = torch.nn.MaxPool1d(2, padding = 0)
        
        self.layer1 = self._make_layer(block, self.inplanes, self.inplanes, self.kernel_size, block_list[0], self.stride)
        self.layer2 = self._make_layer(block, self.inplanes, 2 * self.inplanes, self.kernel_size, block_list[1], self.stride)
        self.layer3 = self._make_layer(block, 2 * self.inplanes, 4 * self.inplanes, self.kernel_size, block_list[2], self.stride)
        self.layer4 = self._make_layer(block, 4 * self.inplanes, 8 * self.inplanes, self.kernel_size, block_list[3], self.stride)
        
        self.avgpool = torch.nn.AdaptiveAvgPool1d(1)
        
        self.fc = torch.nn.Linear(8 * self.inplanes, nclass)
        
    def _make_layer(self, block, inplanes, outplanes, kernel_size, blocks, stride):
        
        layers = []
        layer_planes = inplanes
        
        if layer_planes == outplanes:
            layers.append(block(layer_planes, outplanes, kernel_size, kernel_size))
        else:
            layers.append(block(layer_planes, outplanes, kernel_size, kernel_size, stride=stride))
        
        layer_planes = outplanes
            
        for _ in range(1, blocks):
            layers.append(block(layer_planes, layer_planes, kernel_size, kernel_size))
            
        return torch.nn.Sequential(*layers)
    
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        
        return x
        
        

In [None]:
mymodel = resnet1d(ResBlock, [2,2,2,2])

In [None]:
x = torch.randn((10, 2, 8192))

In [None]:
mymodel.forward(x)

In [None]:
maxpool = torch.nn.MaxPool1d(2, padding = 0)

In [None]:
maxpool(x).shape

In [None]:
4096 // 4 ** 4