In [None]:
import torch
from torch import nn, optim
from torch.nn.modules import Module


class Model(nn.Module):
    def __init__(self, input_size, layers_data: list, learning_rate=0.01, optimizer=optim.Adam):
        super().__init__()
        self.layers = nn.ModuleList()
        self.input_size = input_size  # Can be useful later ...
        for size, activation in layers_data:
            self.layers.append(nn.Linear(input_size, size))
            input_size = size  # For the next layer
            if activation is not None:
                assert isinstance(activation, Module), \
                    "Each tuples should contain a size (int) and a torch.nn.modules.Module."
                self.layers.append(activation)

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.learning_rate = learning_rate
        self.optimizer = optimizer(params=self.parameters(), lr=learning_rate)

    def forward(self, input_data):
        for layer in self.layers:
            input_data = layer(input_data)
        return input_data


# test that the net is working properly 
if __name__ == "__main__":
    data_size = 5
    layer1, layer2 = 10, 10
    output_size = 2
    data = torch.randn(data_size)
    mlp = Model(data_size, [(layer1, nn.ReLU()), (layer2, nn.ReLU()), (output_size, nn.Sigmoid())])
    output = mlp(data)
    print("done")

In [None]:
print(data)
print(mlp)

In [None]:
import numpy as np

class VAE(nn.Module):

    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,
                 conv_strides,
                 latent_space_dim):
        
        super().__init__()
        
        #shape of the tensor input to the first conv layer
        self.input_shape = input_shape # [1, n_mfcc, n_samples]
        #shape of the tensor output of the last conv layer
        self.output_shape = []
        self.conv_filters = conv_filters # [2, 4, 8]
        self.conv_transpose_filters = conv_filters[:-1][::-1]
        self.conv_transpose_filters.append(1)
        self.conv_kernels = conv_kernels # [3, 5, 3]
        self.conv_strides = conv_strides # [1, 2, 2]
        self.shape_before_bottleneck = None
        self.latent_space_dim = latent_space_dim # 2
        self.reconstruction_loss_weight = 1000
        self._num_conv_layers = len(conv_filters)
        self.encoder = nn.Sequential()
        self.decoder = nn.Sequential()
        self.layer2id = dict()
        self.layer2id['encoder'] = dict()
        self.layer2id['decoder'] = dict()   
     
        self._compute_output_shape()
        self._build_encoder()
        self._build_decoder()
        
    def _build_encoder(self):
        self._add_conv_layers()
        self._add_bottleneck()

    def _build_decoder(self):
        self._invert_bottleneck()
        self._add_conv_transpose_layers()
        
    def _add_conv_layers(self):
        """Create all convolutional blocks in encoder."""
        for block_index in range(self._num_conv_layers):
            block_number = block_index + 1
            layer_index = 0
            self.layer2id['encoder']['conv{}'.format(block_number)] = 3 * block_index + layer_index 
            self.encoder.add_module(
                'conv{}'.format(block_number),
                self._add_conv_layer(block_index))
            layer_index += 1
            self.layer2id['encoder']['relu{}'.format(block_number)] = 3 * block_index + layer_index 
            self.encoder.add_module(
                'relu{}'.format(block_number),
                nn.ReLU())
            layer_index += 1
            self.layer2id['encoder']['batchnorm{}'.format(block_number)] = 3 * block_index + layer_index 
            self.encoder.add_module(
                'batchnorm{}'.format(block_number),
                nn.BatchNorm2d(self.conv_filters[block_index]))
            
    def _add_conv_layer(self, block_index):
        """Add a convolutional block to a graph of layers, consisting of
        conv 2d + ReLU + batch normalization.
        """
        out_channels = self.conv_filters
        in_channels = []
        in_channels.append(self.input_shape[0])

        for i in range(len(self.conv_filters) - 1):
            in_channels.append(self.conv_filters[i])
            
        #print('conv in channels ', in_channels)
        #print('conv out channels ', out_channels)
            
        conv_layer = nn.Conv2d(
            in_channels[block_index],
            out_channels[block_index],
            self.conv_kernels[block_index],
            stride=self.conv_strides[block_index],
            padding=1
        )
        return conv_layer
    
    def _compute_output_shape(self):
        #convolution output shape is computed with this formula (for each dimension): [(W−K+2P)/S]+1
        #W is the input volume - in your case 128
        #K is the Kernel size - in your case 5
        #P is the padding - in your case 0 i believe
        #S is the stride - which you have not provided.
        inp = self.input_shape
        for k in range(len(self.conv_kernels)):
            K = self.conv_kernels[k]
            S = self.conv_strides[k]
            P = 1
            out = []
            out.append(self.conv_filters[k])
            for w in range(len(inp) -1):
                W = inp[w + 1]
                out.append(int(((W - K + (2 * P)) / S) + 1))
            inp = out
            self.output_shape.append(out)
    
    def _add_conv_transpose_layers(self):
        """Create all convolutional blocks in decoder."""
        for block_index in range(self._num_conv_layers):
            block_number = block_index + 1
            layer_index = 0
            self.layer2id['decoder']['conv_transpose{}'.format(block_number)] = 3 * block_index + layer_index 
            self.decoder.add_module(
                'conv_transpose{}'.format(block_number),
                self._add_conv_transpose_layer(block_index))
            layer_index += 1
            self.layer2id['decoder']['relu{}'.format(block_number)] = 3 * block_index + layer_index 
            self.decoder.add_module(
                'relu{}'.format(block_number),
                nn.ReLU())
            layer_index += 1
            self.layer2id['decoder']['batchnorm{}'.format(block_number)] = 3 * block_index + layer_index 
            self.decoder.add_module(
                'batchnorm{}'.format(block_number),
                nn.BatchNorm2d(self.conv_transpose_filters[block_index]))

    def _add_conv_transpose_layer(self, block_index):
        """Add a convolutional transpose block to a graph of layers, consisting of
        conv 2d + ReLU + batch normalization.
        """
        out_channels = self.conv_transpose_filters
        in_channels = []
        in_channels.append(self.output_shape[len(conv_filters) - 1][0])
        for i in range(len(self.conv_transpose_filters) - 1):
            in_channels.append(self.conv_transpose_filters[i])
        stride = self.conv_strides[::-1][block_index]
        output_padding = 1 if stride > 1 else 0
        
        #print('conv_transpose in channels ', in_channels)
        #print('conv_transpose out channels ', out_channels)  
        
        conv_transpose_layer = nn.ConvTranspose2d(
            in_channels[block_index],
            out_channels[block_index],
            self.conv_kernels[::-1][block_index],
            stride=stride,
            padding=1,
            output_padding=output_padding
        )
        return conv_transpose_layer
    
    def _add_bottleneck(self):
        """Flatten encoder last conv layer output data
           and add bottleneck to latent dimension
        """
        self.shape_before_bottleneck = self.output_shape[self._num_conv_layers - 1]
        self.encoder.add_module('bottleneck_flatten', nn.Flatten(1))
        self.encoder.add_module('bottleneck_linear', 
                                nn.Linear(np.prod(self.shape_before_bottleneck), self.latent_space_dim * 2))

    def _invert_bottleneck(self):
        """Reshape data from bottleneck latent dimension 
           to dimension of the output data after the encoder last conv layer
        """
        self.decoder.add_module('bottleneck_linear', 
                                nn.Linear(self.latent_space_dim, np.prod(self.shape_before_bottleneck)))
        self.decoder.add_module('bottleneck_unflatten', nn.Unflatten(1, self.shape_before_bottleneck))
                                
    def _get_encoder_layer(self, name):
        return self.encoder[self.layer2id['encoder'][name]]
    
    def _get_encoder_layers(self):
        return self.encoder
        
    def _get_decoder_layer(self, name):
        return self.decoder[self.layer2id['decoder'][name]]
        
    def _get_decoder_layers(self):
        return self.decoder
    
    def _get_output_shape(self, block_index):
        return self.output_shape[block_index]
    
    def reparameterise(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = std.new_empty(std.size()).normal_()
            return eps.mul_(std).add_(mu)
        else:
            return mu

    def forward(self, x):
        mu_logvar = self.encoder(x).view(-1, 2, self.latent_space_dim)
        mu = mu_logvar[:, 0, :]
        logvar = mu_logvar[:, 1, :]
        z = self.reparameterise(mu, logvar)
        return self.decoder(z), mu, logvar

n_mfcc = 40
n_samples = 44
batch_size = 16

x = torch.rand(batch_size, 1, n_mfcc, n_samples)

input_shape=(x.shape[1:])
conv_filters=[32, 64, 64, 64]
conv_kernels=[3, 3, 3, 3]
conv_strides=[1, 2, 2, 1]
latent_space_dim = 2
    
model = VAE(input_shape,
              conv_filters,
              conv_kernels,
              conv_strides,
              latent_space_dim)

#print(model._get_encoder_layer('conv2'))
#print(model._get_decoder_layer('conv_transpose3'))

print(model._get_encoder_layers())
print(model._get_decoder_layers())

#for i in range(len(conv_filters)):
#    print(np.prod(model._get_output_shape(i)))
    
#print(model.conv_filters)
#print(model.conv_transpose_filters)

In [None]:
print(x.shape)
y = model(x)
print(y[0].shape)

In [None]:
channels = [1, 2, 4, 8]

layer2id['encoder'] = dict()
layer2id['decoder'] = dict()
layer_idx = conv_idx = 0

for k in range(len(channels)-1):
    layer2id['encoder']['conv{}'.format(conv_idx)] = layer_idx
    print(layer2id['encoder']['conv0'])
    layer_idx += 1
    conv_idx += 1
        
#new_dic = {}
#new_dic['encoder'] = {}
#new_dic[1] = {}
#new_dic['encoder'][2] = 5

In [None]:
print(layer2id['encoder']['conv1'])

In [None]:
#you can use this formula [(W−K+2P)/S]+1.

#W is the input volume - in your case 128
#K is the Kernel size - in your case 5
#P is the padding - in your case 0 i believe
#S is the stride - which you have not provided.

n_mfcc = 40
n_samples = 44

input_shape=(n_mfcc, n_samples, 1)
conv_filters=(32, 64, 64, 64)
conv_kernels=(3, 3, 3, 3)
conv_strides=(1, 2, 2, 1)

print(type(conv_kernels))

inp = input_shape
for k in range(len(conv_kernels)):
    K = conv_kernels[k]
    S = conv_strides[k]
    P = 1
    out = []
    for w in range(len(inp) -1):
        W = inp[w]
        out.append(int(((W - K + (2 * P)) / S) + 1))
    out.append(conv_filters[k])
    inp = out
    
    print(out)

In [None]:
n_mfcc = 40
n_samples = 44

input_shape=[n_mfcc, n_samples, 1]
conv_filters=[32, 64, 64, 64]

out_channels = conv_filters
in_channels = []
in_channels.append(input_shape[-1])
for i in range(len(conv_filters) - 1):
    in_channels.append(conv_filters[i])

print(out_channels)
print(in_channels)

In [None]:
a = [1,2,3,4,5]
b = a[:-1][::-1]
b.append(1)
print(b)

In [None]:
x = torch.randn(64, 10, 11)
print(x.shape)
x = x.unsqueeze(0)
print(x.shape)
print(x.shape[1:])

bottleneck = nn.Sequential(
    nn.Flatten(1),
    nn.Linear(np.prod(x.shape[1:]), 2),
)

inv_bottleneck = nn.Sequential(
    nn.Linear(2, np.prod(x.shape[1:])),
    nn.Unflatten(1, x.shape[1:])
)

enc = bottleneck(x)
dec = inv_bottleneck(enc)

print(enc.shape)
print(dec.shape)

In [None]:
input = torch.randn(10, 11, 64)
#input = input[None, :]
print(input.view(-1,np.prod(input.shape)).shape)

In [None]:
x = torch.rand(10, 11, 64)
print(x.shape)
print(x.unsqueeze(-1).shape)
print(torch.unsqueeze(x, 0).shape)
print(torch.unsqueeze(x, 1).shape)

In [None]:
x = torch.randn(16, 1, 40, 666)

conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
conv3 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
conv4 = nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))

convt1 = nn.ConvTranspose2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
convt2 = nn.ConvTranspose2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) 
convt3 = nn.ConvTranspose2d(64, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))

# Compute padding, output padding and dilation lists (x, y) in order to force 
# last ConvTranspose layer output to match first Conv layer input 
# need to solve the system given by the two formulas (given stride = (2, 2) and kernel = (3, 3) for last layer):
# H_{out} = (H_{in} - 1) x stride[0] - 2 x padding[0] + dilation[0] x (kernel_size[0] - 1) + output_padding[0] + 1
# W_{out} = (W_{in} - 1) x stride[1] - 2 x padding[1] + dilation[1] x (kernel_size[1] - 1) + output_padding[1] + 1


def compute_padding(n_mfcc, dim_in, dim_out, stride, kernel):
    exit = False
    
    stuff = range(n_mfcc)
    stuff_list = []
    
    for subset in itertools.product(stuff, repeat=3):
        stuff_list.append(subset)
        
    for padding, output_padding, dilation in sorted(stuff_list, key=sum): 
        if (float((dim_in - 1) * stride - 2 * padding + dilation * (kernel - 1) + output_padding + 1).is_integer() and
            dilation > 0 and
            dim_out == (dim_in - 1) * stride - 2 * padding + dilation * (kernel - 1) + output_padding + 1):
            print(dim_out)
            print((dim_in - 1) * stride - 2 * padding + dilation * (kernel - 1) + output_padding + 1)
            break
    
    return padding, output_padding, dilation
    

y = conv4(conv3(conv2(conv1(x))))

H_in = convt3(convt2(convt1(y))).shape[-2]
W_in = convt3(convt2(convt1(y))).shape[-1]
H_out = x.shape[-2]
W_out = x.shape[-1]
stride = (2, 2)
kernel = (3, 3)
n_mfcc = 40

padding_0, output_padding_0, dilation_0 = compute_padding(n_mfcc, H_in, H_out, stride[0], kernel[0])
padding_1, output_padding_1, dilation_1 = compute_padding(n_mfcc, W_in, W_out, stride[1], kernel[1])

print(padding_0) 
print(padding_1)
print(output_padding_0) 
print(output_padding_1)
print(dilation_0) 
print(dilation_1)

convt4 = nn.ConvTranspose2d(32, 1, kernel_size=(3, 3), stride=(2, 2), 
                            padding=(padding_0, padding_1), 
                            output_padding=(output_padding_0, output_padding_1), 
                            dilation=(dilation_0, dilation_1))

print(conv1(x).shape)
print(conv2(conv1(x)).shape)
print(conv3(conv2(conv1(x))).shape)
print(conv4(conv3(conv2(conv1(x)))).shape)

print("- - - - - - - - - - - - - - -")

print(convt1(y).shape)
print(convt2(convt1(y)).shape)
print(convt3(convt2(convt1(y))).shape)
print(convt4(convt3(convt2(convt1(y)))).shape)

print("- - - - - - - - - - - - - - -")

encoder = nn.Sequential(
    conv1,
    conv2,
    conv3,
    conv4
)

decoder = nn.Sequential(
    convt1,
    convt2,
    convt3,
    convt4
)

enc = encoder(x)
dec = decoder(enc)

print(enc.shape)
print(dec.shape)

In [None]:
import itertools

stuff = range(40)
stuff_list = []

for subset in itertools.product(stuff, repeat=3):
    stuff_list.append(subset)
    
print(sorted(stuff_list, key=sum))

In [None]:
num = 3.0

print(3 == num)

In [None]:
conv_filters = [32, 64, 64, 64]
input_shape = [1, 40, 130]

enc_input_channels = conv_filters[:-1]
enc_input_channels.insert(0, input_shape[0])
enc_output_channels = conv_filters
dec_input_channels = enc_output_channels[::-1]
dec_output_channels = enc_input_channels[::-1]

print("input_channel (encoder) = ", enc_input_channels)
print("output_channel (encoder) = ", enc_output_channels)
print("input_channel (decoder) = ", dec_input_channels)
print("output_channel (decoder) = ", dec_output_channels)
