In [None]:
class RecurrentAutoencoder(nn.Module):
    def __init__(self, num_features):
        super(RecurrentAutoencoder, self).__init__()
        self.layer_sizes = [512, 128]
        self.num_features = num_features

        self.encode_rnn1 = nn.LSTM(
            input_size=self.num_features,
            hidden_size=self.layer_sizes[0],
            num_layers=1,
            batch_first=True
        ).cuda()

        self.encode_rnn2= nn.LSTM(
            input_size=self.layer_sizes[0],
            hidden_size=self.layer_sizes[1],
            num_layers=1,
            batch_first=True
        ).cuda()

        self.decode_rnn1 = nn.LSTM(
            input_size=self.layer_sizes[1],
            hidden_size=self.layer_sizes[1],
            num_layers=1,
            batch_first=True
        ).cuda()

        self.decode_rnn2 = nn.LSTM(
            input_size=self.layer_sizes[1],
            hidden_size=self.layer_sizes[0],
            num_layers=1,
            batch_first=True
        ).cuda()

        self.output_layer = nn.Linear(self.layer_sizes[0], self.num_features).cuda()

    def forward(self, x):
        seq_len = x.shape[1]
        print(x.shape)
        x, (_, _) = self.encode_rnn1(x)
        _, (encoded, _) = self.encode_rnn2(x)
        
        encoded = encoded.repeat(seq_len, self.num_features)
        encoded = encoded.reshape((self.num_features, seq_len, self.layer_sizes[1]))
        encoded, (_, _) = self.decode_rnn1(encoded)
        encoded, (_, _) = self.decode_rnn2(encoded)
        encoded = encoded.reshape((seq_len, self.layer_sizes[0]))
        return self.output_layer(encoded).reshape((1, seq_len, 1))
    
# def MSETripletLoss(X, encoded_X, encoded_pos, encoded_neg):
#     CE_loss = nn.MSE(encoded_X, X)
#     sub = torch.subtract(torch.dot(encoded_X, encoded_neg), torch.dot(encoded_X, encoded_pos))
#     exp = torch.exp(sub)
#     triplet_loss = torch.log(torch.add(1, exp))
#     return torch.add(CE_loss, triplet_loss)


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, out_dim, h_dims, h_activ, out_activ):
        super(Encoder, self).__init__()

        layer_dims = [input_dim] + h_dims + [out_dim]
        self.num_layers = len(layer_dims) - 1
        self.layers = nn.ModuleList()
        for index in range(self.num_layers):
            layer = nn.Linear(
                layer_dims[index],
                layer_dims[index + 1],
            )
            self.layers.append(layer)

        self.h_activ, self.out_activ = h_activ, out_activ   

    def forward(self, x):
        for index, layer in enumerate(self.layers):
            x = layer(x)

            if self.h_activ and index < self.num_layers - 1:
                x = self.h_activ(x)
            elif self.out_activ and index == self.num_layers - 1:
                return self.out_activ(x)

        return x


class Decoder(nn.Module):
    def __init__(self, input_dim, out_dim, h_dims, h_activ):
        super(Decoder, self).__init__()

        layer_dims = [input_dim] + h_dims + [h_dims[-1]]
        self.num_layers = len(layer_dims) - 1
        self.layers = nn.ModuleList()
        for index in range(self.num_layers):
            layer = nn.Linear(
                layer_dims[index],
                layer_dims[index + 1],
            )
            self.layers.append(layer)

        self.h_activ = h_activ
        self.dense_matrix = nn.Parameter(
            torch.rand((layer_dims[-1], out_dim), dtype=torch.float),
            requires_grad=True
        )

    def forward(self, x):

        for index, layer in enumerate(self.layers):
            x = layer(x)

            if self.h_activ and index < self.num_layers - 1:
                x = self.h_activ(x)

        return torch.mm(x, self.dense_matrix)


######
# MAIN
######


class AE(nn.Module):
    def __init__(self, input_dim, encoding_dim, h_dims=[], h_activ=nn.ReLU(),
                 out_activ=nn.Tanh()):
        super(AE, self).__init__()

        self.encoder = Encoder(input_dim, encoding_dim, h_dims, h_activ,
                               out_activ).cuda()
        self.decoder = Decoder(encoding_dim, input_dim, h_dims[::-1],
                               h_activ).cuda()

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x


In [None]:
class VariationalEncoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dims, activ):
        super(VariationalEncoder, self).__init__()
        self.dims = [input_dim] + hidden_dims + [emb_dim]
        self.layers = nn.ModuleList()
        self.activ = activ
        for dim1, dim2 in zip(self.dims, self.dims[1:-1]):
            layer = nn.Linear(dim1, dim2).cuda()
            self.layers.append(layer)
        
        self.mu_layer = nn.Linear(self.dims[-2], self.dims[-1])
        self.sigma_layer = nn.Linear(self.dims[-2], self.dims[-1])

        self.N = torch.distributions.Normal(0, 1)
        self.N.loc = self.N.loc.cuda()  # hack to get sampling on the GPU
        self.N.scale = self.N.scale.cuda()
        self.kl = 0

    def forward(self, x):
        for layer in self.layers:
            x = self.activ(layer(x))

        mu = self.mu_layer(x)
        sigma = torch.exp(self.sigma_layer(x))
        z = mu + sigma*self.N.sample(mu.shape)
        self.kl = (sigma**2 + mu**2 - torch.log(sigma) - 1/2).sum()
        return z

class Decoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dims, activ):
        super(Decoder, self).__init__()
        self.dims = [input_dim] + hidden_dims + [emb_dim]
        self.dims = self.dims[::-1]
        self.layers = nn.ModuleList()
        self.activ = activ
        for dim1, dim2 in zip(self.dims, self.dims[1:]):
            layer = nn.Linear(dim1, dim2).cuda()
            self.layers.append(layer)

    def forward(self, z):
        for layer in self.layers[:-1]:
            z = self.activ(layer(z))
        z = self.layers[-1](z)
        return z

class VariationalAutoencoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dims=[], activ=F.relu):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = VariationalEncoder(input_dim, emb_dim, hidden_dims, activ)
        self.decoder = Decoder(input_dim, emb_dim, hidden_dims, activ)

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

In [None]:

class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses_positive = torch.pow(distance_positive, 2) / 2
        losses_negative = torch.pow(1 / distance_negative, 2) / 2

        return (losses_positive + losses_negative).mean()