In [None]:
class MyLinearLayer(nn.Module):
    """ Custom Linear layer but mimics a standard linear layer """
    def __init__(self, size_in, size_out):
        super().__init__()
        self.size_in, self.size_out = size_in, size_out
        weights = torch.Tensor(size_out, size_in)
        self.weights = nn.Parameter(weights)  # nn.Parameter is a Tensor that's a module parameter.
        bias = torch.Tensor(size_out)
        self.bias = nn.Parameter(bias)

        # initialize weights and biases
        nn.init.kaiming_uniform_(self.weights, a=math.sqrt(5)) # weight init
        fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weights)
        bound = 1 / math.sqrt(fan_in)
        nn.init.uniform_(self.bias, -bound, bound)  # bias init

    def forward(self, x):
        w_times_x = torch.mm(x, self.weights.t())
        return torch.add(w_times_x, self.bias)  # w times x + b

In [2]:
import torch
import torch.nn as nn

# !nvidia-smi
# print (torch.__version__)
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Text Translation

In [3]:
class SelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        # assert debug
        assert (
          self.head_dim * heads == embed_size
        ), "Embedding size needs to be divisible by heads"

        # obtain Q K V matrices by linear transformation
        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

    def forward(self, values, keys, query, mask):
        # Get number of training examples
        N = query.shape[0]

        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        # Split the embedding into self.heads different pieces
        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        query = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)  # (N, value_len, heads, head_dim)
        keys = self.keys(keys)  # (N, key_len, heads, head_dim)
        queries = self.queries(query)  # (N, query_len, heads, heads_dim)

        # Einsum does matrix mult. for query*keys for each training example
        # with every other training example, don't be confused by einsum
        # it's just how I like doing matrix multiplication & bmm

        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
        # queries shape: (N, query_len, heads, heads_dim),
        # keys shape: (N, key_len, heads, heads_dim)
        # energy: (N, heads, query_len, key_len)

        # Mask padded indices so their weights become 0
        if mask is not None:
          energy = energy.masked_fill(mask == 0, float("-1e20"))

        # Normalize energy values similarly to seq2seq + attention
        # so that they sum to 1. Also divide by scaling factor for
        # better stability
        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)
        # attention shape: (N, heads, query_len, key_len)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
          N, query_len, self.heads * self.head_dim
        )
        # attention shape: (N, heads, query_len, key_len)
        # values shape: (N, value_len, heads, heads_dim)
        # out after matrix multiply: (N, query_len, heads, head_dim), then
        # we reshape and flatten the last two dimensions.

        out = self.fc_out(out)
        # Linear layer doesn't modify the shape, final shape will be
        # (N, query_len, embed_size)

        return out

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        self.feed_forward = nn.Sequential(
          nn.Linear(embed_size, forward_expansion * embed_size),
          nn.ReLU(),
          nn.Linear(forward_expansion * embed_size, embed_size),
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attention(value, key, query, mask)

        # Add skip connection, run through normalization and finally dropout
        x = self.dropout(self.norm1(attention + query))
        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x))
        return out

class Encoder(nn.Module):
    def __init__(
        self,
        src_vocab_size,
        embed_size,
        num_layers,
        heads,
        device,
        forward_expansion,
        dropout,
        max_length,
        ):

        super(Encoder, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
          [
              TransformerBlock(
                  embed_size,
                  heads,
                  dropout=dropout,
                  forward_expansion=forward_expansion,
              )
              for _ in range(num_layers)
          ]
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
        out = self.dropout(
          (self.word_embedding(x) + self.position_embedding(positions))
        )

        # In the Encoder the query, key, value are all the same, it's in the
        # decoder this will change. This might look a bit odd in this case.
        for layer in self.layers:
          out = layer(out, out, out, mask)

        print(out.shape)

        return out

class DecoderBlock(nn.Module):
    def __init__(self, embed_size, heads, forward_expansion, dropout, device):
        super(DecoderBlock, self).__init__()
        self.norm = nn.LayerNorm(embed_size)
        self.attention = SelfAttention(embed_size, heads=heads)
        self.transformer_block = TransformerBlock(
          embed_size, heads, dropout, forward_expansion
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, value, key, src_mask, trg_mask):
        attention = self.attention(x, x, x, trg_mask)
        query = self.dropout(self.norm(attention + x))
        out = self.transformer_block(value, key, query, src_mask)
        return out

class Decoder(nn.Module):
    def __init__(
        self,
        trg_vocab_size,
        embed_size,
        num_layers,
        heads,
        forward_expansion,
        dropout,
        device,
        max_length,
        ):
        super(Decoder, self).__init__()
        self.device = device
        self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
          [
              DecoderBlock(embed_size, heads, forward_expansion, dropout, device)
              for _ in range(num_layers)
          ]
        )
        self.fc_out = nn.Linear(embed_size, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, trg_mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
        x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))

        for layer in self.layers:
          x = layer(x, enc_out, enc_out, src_mask, trg_mask)

        out = self.fc_out(x)

        return out

class Transformer(nn.Module):
    def __init__(
        self,
        src_vocab_size,
        trg_vocab_size,
        src_pad_idx,
        trg_pad_idx,
        embed_size=1024,
        num_layers=6,
        forward_expansion=4,
        heads=8,
        dropout=0,
        device="cpu",
        max_length=100,
        ):

        super(Transformer, self).__init__()

        self.encoder = Encoder(
          src_vocab_size,
          embed_size,
          num_layers,
          heads,
          device,
          forward_expansion,
          dropout,
          max_length,
        )

        self.decoder = Decoder(
          trg_vocab_size,
          embed_size,
          num_layers,
          heads,
          forward_expansion,
          dropout,
          device,
          max_length,
        )

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # (N, 1, 1, src_len)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        N, trg_len = trg.shape
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
          N, 1, trg_len, trg_len
        )

        return trg_mask.to(self.device)

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)
        out = self.decoder(trg, enc_src, src_mask, trg_mask)
        return out

In [4]:
if True:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)

    # x = torch.tensor(data).to(device)
    # shape: (training examples, sequence length)
    x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
    trg = torch.tensor([[1, 7, 4, 3, 5, 9, 2, 0], [1, 5, 6, 2, 4, 7, 6, 2]]).to(device)
    # trg = torch.tensor(fakeY).to(device)

    src_pad_idx = 0
    trg_pad_idx = 0
    src_vocab_size = 10
    trg_vocab_size = 10
    model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx, device=device).to(
      device
    )
    out = model(x, trg[:, :-1])
    # print(out.shape)
    # print(out[0,0,:])

cpu
torch.Size([2, 9, 1024])


In [None]:
print(x.shape)
print(trg.shape)

torch.Size([2, 9])
torch.Size([2, 8])


# ADRELINE

In [None]:
import argparse
from .modules import AbstractLocalizationModule, FeatureExtraction, LocalizationOutput
import numpy as np
import torch
import torch.nn as nn
from typing import Tuple
from utils import SELLoss


class ADRENALINEEncoder(nn.Module):
  """This class implements the encoder module for a sequence-to-sequence-based sound event localization neural
  network. It uses a feature extraction front-end based on convolutional layers, as proposed in
      Sharath Adavanne, Archontis Politis, Joonas Nikunen, Tuomas Virtanen: "Sound Event Localization and Detection
          of Overlapping Sources Using Convolutional Recurrent Neural Networks" (2018)
  and implements a standard encoder structure based on gated recurrent units.
  """
  def __init__(self,
                hparams: argparse.Namespace) -> None:
      super(ADRENALINEEncoder, self).__init__()

      self.hidden_dim = hparams.hidden_dim
      self.num_layers = hparams.num_layers

      num_steps_per_chunk = int(2 * hparams.chunk_length / hparams.frame_length)
      self.feature_extraction = FeatureExtraction(num_steps_per_chunk,
                                                  hparams.num_fft_bins,
                                                  dropout_rate=hparams.dropout_rate)

      feature_dim = int(hparams.num_fft_bins / 4)

      self.initial_state = nn.Parameter(
          torch.randn((2 * hparams.num_layers, 1, hparams.hidden_dim), dtype=torch.float32), requires_grad=True
      )

      self.gru = nn.GRU(feature_dim, hparams.hidden_dim, batch_first=True, bidirectional=True,
                        num_layers=hparams.num_layers, dropout=hparams.dropout_rate)

  def forward(self,
              audio_features: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
      extracted_features = self.feature_extraction(audio_features)
      batch_size = extracted_features.shape[0]

      output, hidden = self.gru(extracted_features, self.initial_state.repeat(1, batch_size, 1))

      hidden = hidden.view(2, 2, batch_size, -1).permute(0, 2, 3, 1).reshape(self.num_layers, batch_size, 2 * self.hidden_dim)

      return output, hidden


class ADRENALINEDecoder(nn.Module):
  """This class implements an attention-based decoder module for a sequence-to-sequence-based sound event localization
  neural network. It exploits a standard architecture based on the scaled dot-product for computing attention values
  and gated recurrent units as the recurrent part.
  """
  def __init__(self,
                hparams: argparse.Namespace) -> None:
      super(ADRENALINEDecoder, self).__init__()

      self.hidden_dim = hparams.hidden_dim
      self.num_layers = hparams.num_layers

      self.scale_matrix = nn.Linear(2 * hparams.hidden_dim, 2 * hparams.hidden_dim, bias=False)

      self.gru = nn.GRU(2 * hparams.hidden_dim + 3 * hparams.max_num_sources, 2 * hparams.hidden_dim,
                        batch_first=True, num_layers=hparams.num_layers, dropout=hparams.dropout_rate)

      self.localization_output = LocalizationOutput(2 * hparams.hidden_dim, max_num_sources=hparams.max_num_sources)

  def forward(self,
              source_activity_input: torch.Tensor,
              direction_of_arrival_input: torch.Tensor,
              hidden: torch.Tensor,
              encoder_outputs: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
      """
      :param source_activity_input: input vector indicating source activity at the current time step
      :param direction_of_arrival_input: direction-of-arrival input vector at the current time step
      :param hidden: decoder hidden state from the previous time step
      :param encoder_outputs: all encoder outputs
      :return: source_activity_output, direction_of_arrival_output, hidden: corresponding outputs
      """
      batch_size, sequence_length, _ = encoder_outputs.shape

      # Compute attention weights via dot product between current hidden state and encoder outputs.
      expanded_hidden = hidden[self.num_layers - 1, :].unsqueeze(0).permute(1, 0, 2).repeat(1, sequence_length, 1)

      scaled_dot_product = (self.scale_matrix(encoder_outputs) * expanded_hidden).sum(-1) / np.sqrt(self.hidden_dim)
      attention_weights = torch.softmax(scaled_dot_product, dim=-1)

      context_vector = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)

      input_with_context = torch.cat((source_activity_input,
                                      direction_of_arrival_input.view(batch_size, 1, -1),
                                      context_vector), dim=-1)

      output, next_hidden = self.gru(input_with_context, hidden)

      source_activity_output, direction_of_arrival_output = self.localization_output(output)

      return source_activity_output, direction_of_arrival_output, next_hidden, attention_weights


class ADRENALINE(AbstractLocalizationModule):
  """Implementation of the Attention-based Deep REcurrent Network for locALizINg acoustic Events (ADRENALINE)."""
  def __init__(self,
                dataset_path: str,
                cv_fold_idx: int,
                hparams: argparse.Namespace) -> None:
      super(ADRENALINE, self).__init__(dataset_path, cv_fold_idx, hparams)

      self.max_num_sources = hparams.max_num_sources

      self.encoder = ADRENALINEEncoder(hparams)
      self.decoder = ADRENALINEDecoder(hparams)

  def get_loss_function(self) -> nn.Module:
      return SELLoss(self.hparams.max_num_sources, alpha=self.hparams.alpha)

  def forward(self,
              audio_features: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, dict]:
      batch_size, _, sequence_length, _ = audio_features.shape
      device = audio_features.device

      source_activity = torch.zeros((batch_size, 1, self.max_num_sources)).to(device)
      direction_of_arrival = torch.zeros((batch_size, 1, self.max_num_sources, 2)).to(device)

      encoder_outputs, hidden = self.encoder(audio_features)
      source_activity_output = torch.zeros((batch_size, sequence_length, self.max_num_sources)).to(device)
      direction_of_arrival_output = torch.zeros((batch_size, sequence_length, self.max_num_sources, 2)).to(device)

      attention_map = []

      for step_idx in range(sequence_length):
          source_activity, direction_of_arrival, hidden, attention_weights = self.decoder(
              source_activity, direction_of_arrival, hidden, encoder_outputs)

          attention_map.append(attention_weights.unsqueeze(-1))

          source_activity_output[:, step_idx, :] = source_activity.squeeze()
          direction_of_arrival_output[:, step_idx, :, :] = direction_of_arrival.squeeze()

      attention_map = torch.cat(attention_map, dim=-1)

      meta_data = {
          'attention_map': attention_map
      }

      return source_activity_output, direction_of_arrival_output, meta_data

# SSSL

### Data (fake)

In [None]:
import numpy as np
import random


# (training examples, 4 locations, Nfreq, Ntime, 4)
data = np.random.rand(50, 4, 501, 69, 4)
label = np.random.rand(50, 1)
fakeY = np.random.rand(50, 4, 501, 69, 2)
for i in range(data.shape[0]):
  label[i,0] = random.randint(0, data.shape[1]-1)

In [None]:
x = torch.tensor(data)
print(x.shape)

### Transformer

In [2]:
class mySelfAttention(nn.Module):
  def __init__(self, embed_size, heads):
      super(mySelfAttention, self).__init__()
      self.embed_size = embed_size
      self.heads = heads
      self.head_dim = embed_size // heads

      # assert debug
      assert (
          self.head_dim * heads == embed_size
      ), "Embedding size needs to be divisible by heads"

      # obtain Q K V matrices by linear transformation
      self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
      self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
      self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
      self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

  def forward(self, values, keys, query):
      # Get number of training examples
      N = query.shape[0]

      value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

      # Split the embedding into self.heads different pieces
      values = values.reshape(N, value_len, self.heads, self.head_dim)
      keys = keys.reshape(N, key_len, self.heads, self.head_dim)
      query = query.reshape(N, query_len, self.heads, self.head_dim)

      values = self.values(values)  # (N, value_len, heads, head_dim)
      keys = self.keys(keys)  # (N, key_len, heads, head_dim)
      queries = self.queries(query)  # (N, query_len, heads, heads_dim)

      # Einsum does matrix mult. for query*keys for each training example
      # with every other training example, don't be confused by einsum
      # it's just how I like doing matrix multiplication & bmm

      energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
      # queries shape: (N, query_len, heads, heads_dim),
      # keys shape: (N, key_len, heads, heads_dim)
      # energy: (N, heads, query_len, key_len)

      # Normalize energy values similarly to seq2seq + attention
      # so that they sum to 1. Also divide by scaling factor for
      # better stability
      attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)
      # attention shape: (N, heads, query_len, key_len)

      out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
          N, query_len, self.heads * self.head_dim
      )
      # attention shape: (N, heads, query_len, key_len)
      # values shape: (N, value_len, heads, heads_dim)
      # out after matrix multiply: (N, query_len, heads, head_dim), then
      # we reshape and flatten the last two dimensions.

      out = self.fc_out(out)
      # Linear layer doesn't modify the shape, final shape will be
      # (N, query_len, embed_size)
      print(out.shape)

      return out

class myTransformerBlock(nn.Module):
  def __init__(self, src_seq_size, heads, dropout, forward_expansion):
      super(myTransformerBlock, self).__init__()
      self.attention = mySelfAttention(src_seq_size, heads)
      self.norm1 = nn.LayerNorm(src_seq_size)
      self.norm2 = nn.LayerNorm(src_seq_size)

      self.feed_forward = nn.Sequential(
          nn.Linear(src_seq_size, forward_expansion * src_seq_size),
          nn.ReLU(),
          nn.Linear(forward_expansion * src_seq_size, src_seq_size),
      )

      self.dropout = nn.Dropout(dropout)

  def forward(self, value, key, query):
      attention = self.attention(value, key, query)

      # Add skip connection, run through normalization and finally dropout
      x = self.dropout(self.norm1(attention + query))
      forward = self.feed_forward(x)
      out = self.dropout(self.norm2(forward + x))
      return out

class myEncoder(nn.Module):
  def __init__(self, src_seq_size, num_layers, heads, device, forward_expansion, dropout, max_length):
    super(myEncoder, self).__init__()
    self.embed_size = src_seq_size
    self.device = device
    self.layers = nn.ModuleList(
        [
            myTransformerBlock(
                src_seq_size,
                heads,
                dropout=dropout,
                forward_expansion=forward_expansion,
            )
            for _ in range(num_layers)
        ]
    )

    self.dropout = nn.Dropout(dropout)

  def forward(self, x, enc_out):
    N, seq_length = x.shape
    positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
    out = x

    # In the Encoder the query, key, value are all the same, it's in the
    # decoder this will change. This might look a bit odd in this case.
    for layer in self.layers:
        out = layer(out, out, out)

    return out

class myDecoderBlock(nn.Module):
  def __init__(self, src_seq_size, heads, forward_expansion, dropout, device):
      super(myDecoderBlock, self).__init__()
      self.norm = nn.LayerNorm(src_seq_size)
      self.attention = mySelfAttention(src_seq_size, heads=heads)
      self.transformer_block = myTransformerBlock(
          src_seq_size, heads, dropout, forward_expansion
      )
      self.dropout = nn.Dropout(dropout)

  def forward(self, x, value, key):
      attention = self.attention(x, x, x, trg_mask)
      query = self.dropout(self.norm(attention + x))
      out = self.transformer_block(value, key, query, src_mask)
      return out

class myDecoder(nn.Module):
  def __init__(
      self,
      trg_vocab_size,
      src_seq_size,
      num_layers,
      heads,
      forward_expansion,
      dropout,
      device,
      max_length,
  ):
      super(myDecoder, self).__init__()
      self.device = device

      self.layers = nn.ModuleList(
          [
              DecoderBlock(src_seq_size, heads, forward_expansion, dropout, device)
              for _ in range(num_layers)
          ]
      )
      self.fc_out = nn.Linear(src_seq_size, trg_vocab_size)
      self.dropout = nn.Dropout(dropout)

  def forward(self, x, enc_out, src_mask, trg_mask):
      N, seq_length = x.shape
      positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
      x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))

      for layer in self.layers:
          x = layer(x, enc_out, enc_out, src_mask, trg_mask)

      out = self.fc_out(x)

      return out






In [5]:
x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
trg = torch.tensor([[1, 7, 4, 3, 5, 9, 2, 0], [1, 5, 6, 2, 4, 7, 6, 2]]).to(device)
  

enc = myEncoder(512, 6, 8, 'cuda', 4, 0, 1024)
out = enc(x, trg)
# print(temp)

RuntimeError: ignored