In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!nvidia-smi

Wed Jun 30 05:10:34 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 465.27       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   56C    P8    10W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [3]:
path = '/content/drive/Shareddrives/CLSE/test'

In [4]:
cd $path

/content/drive/Shareddrives/CLSE/test


In [5]:
!pip install einops



In [6]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import random
import math
from tqdm import tqdm
from functools import partial
from einops.layers.torch import Rearrange, Reduce
import gc

import csv
from IPython.display import Audio

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [8]:
def get_datas(speechs, noises, snrs, length:int=16384):
  assert len(speechs) == len(noises), 'The number of voices must be equal to the number of noises'
  bz = len(speechs)
  clean_sounds = []
  noise_sounds = []
  noisy_sounds = []
  targets = []
  for i in range(bz):
    speech = np.array(speechs[i])
    if len(speech)-length<0:
      speech = np.pad(speech,(0,length-len(speech)),'wrap')
    else:
      speech_start = random.randrange(0,len(speech)-length+1)
      speech = speech[speech_start:speech_start+length]
    speech = speech - np.mean(speech) 
    speech = speech / np.linalg.norm(speech,2)
    speech_std = np.std(speech)

    noise = np.array(noises[i])
    if len(noise)-length<0:
      noise = np.pad(noise,(0,length-len(noise)),'reflect')
    else:
      noise_start = random.randrange(0,len(noise)-length+1)
      noise = noise[noise_start:noise_start+length]
    noise = noise - np.mean(noise) 
    noise = noise / np.linalg.norm(noise,2)
    noise = noise / (10.0 ** (0.05 * snrs[i]))
    noise_std = np.std(noise)

    noisy = speech + noise
    noisy_std = np.std(noisy)
    noisy = noisy / noisy_std

    speech = speech / speech_std
    
    noise = noise / noise_std
    
    clean_sounds.append(speech)
    noise_sounds.append(noise)
    noisy_sounds.append(noisy)

  clean_sounds = np.stack(clean_sounds, 0)
  clean_sounds = np.expand_dims(clean_sounds, 1)

  noise_sounds = np.stack(noise_sounds, 0)
  noise_sounds = np.expand_dims(noise_sounds, 1)

  noisy_sounds = np.stack(noisy_sounds, 0)
  noisy_sounds = np.expand_dims(noisy_sounds, 1)
  
  return clean_sounds, noise_sounds, noisy_sounds

In [9]:
class SI_SNR(torch.nn.Module):
  def __init__(self,eps=1e-04):
    super(SI_SNR, self).__init__()
    self.eps = torch.tensor(eps).to(device)

  def forward(self, pred, target,):
    s_target = target*(torch.sum(pred*target,(1,2),keepdim=True)/torch.maximum(torch.sum(target**2,(1,2),keepdim=True),self.eps))
    e_noise = pred-s_target
    
    sisnr = 10*torch.log10(torch.maximum(torch.sum(s_target**2,(1,2),keepdim=True),self.eps)/torch.maximum(torch.sum(e_noise**2,(1,2),keepdim=True),self.eps))
    return sisnr.mean()

In [10]:
class Block(nn.Module):
  def __init__(self, in_dim, dim, k_size, dilation=1, chunk_size=1,):
    super().__init__()
    self.conv1 = nn.Sequential(
        nn.BatchNorm1d(in_dim),
        nn.Conv1d(in_dim,dim*2,1),
        nn.GELU(),
      )
    if dilation==1:
      self.conv2 = nn.Sequential(
          nn.BatchNorm1d(dim),
          nn.Conv1d(dim,dim,k_size,padding=k_size//2,groups=dim//chunk_size,),
          nn.GELU(),
        )
    else:
      self.conv2 = nn.Sequential(
          nn.BatchNorm1d(dim),
          nn.Conv1d(dim,dim,dilation*2-1,padding=(dilation*2-1)//2,groups=dim//chunk_size),
          nn.GELU(),
          nn.Conv1d(dim,dim,k_size,padding=(k_size//2)*dilation,groups=dim//chunk_size,dilation=dilation),
          nn.GELU(),
        )
    self.conv3 = nn.Sequential(
        nn.BatchNorm1d(dim),
        nn.Conv1d(dim,in_dim,1),
        nn.GELU(),
      )

  def forward(self, x):
    z1 = self.conv1(x)
    u, v = torch.chunk(z1,2,dim=1)
    z2 = u + self.conv2(v)
    output = self.conv3(z2) + x
    return output

In [11]:
class Model(nn.Module):
  def __init__(self, patch_size = 64,
            dim = 128,
            expansion_factor = 2,
            proj_depth = 4,
            pred_depth = 2,
            k_size = 9,
            dilation = 16,
            chunk_size = 8,):
    super().__init__()
    self.cnn_enc = nn.Sequential(
          nn.Conv1d(1,dim//4,5,padding=2),
          nn.GELU(),
          nn.MaxPool1d(4,4),
          nn.BatchNorm1d(dim//4),
          
          nn.Conv1d(dim//4,dim//2,5,padding=2,groups=4),
          nn.GELU(),
          nn.MaxPool1d(4,4),
          nn.BatchNorm1d(dim//2),
          
          nn.Conv1d(dim//2,dim,5,padding=2,groups=4),
          nn.GELU(),
          nn.MaxPool1d(4,4),
          nn.BatchNorm1d(dim),
        )

    self.enc = nn.Sequential(
              Rearrange('b c (l p) -> b (p c) l', p = patch_size),
              nn.Conv1d(patch_size, dim, 3, padding=1),
            )
    
    self.project_net = nn.Sequential(
              nn.Conv1d(dim*2, dim, 1),
              *[Block(dim, dim*expansion_factor, k_size, dilation, chunk_size,) for _ in range(proj_depth)],
            )
    
    self.predict_net = nn.Sequential(
              *[Block(dim, dim*expansion_factor, k_size, dilation, chunk_size,) for _ in range(pred_depth)],
            )
    
    self.dec = nn.Sequential(
              nn.Conv1d(dim, patch_size, 3, padding=1),
              Rearrange('b (p c) l -> b c (l p)', p = patch_size),
            )

  def forward(self, x):
    z = self.project(x)
    p = self.predict(z)
    output = self.decode(p)
    return output

  def project(self, x):
    h = torch.cat([self.enc(x),self.cnn_enc(x)],dim=1)
    z = self.project_net(h)
    return z

  def predict(self, z):
    p = self.predict_net(z)
    return p

  def decode(self, p):
    output = self.dec(p)
    return output

In [12]:
patch_size = 64
dim = 128
expansion_factor = 2
proj_depth = 4
pred_depth = 2
k_size = 9
dilation = 16
chunk_size = 32
model = Model(patch_size,dim,expansion_factor,proj_depth,pred_depth,k_size,dilation,chunk_size).to(device)

In [13]:
sum(p.numel() for p in model.parameters())

2666368

In [14]:
test_speechs = np.load("../dataset/test_speechs.npy",allow_pickle=True)
test_noises = np.load("../dataset/test_noises.npy",allow_pickle=True)

In [15]:
model_names = ['Normal','SimSiam','SimSiam-pretrain','SimSiam-round','BYOL','BYOL-round','BYOL-round-100-step']

speechs = test_speechs
noises = test_noises[random.choices(range(len(test_noises)), k=len(speechs))]

clean_sounds = {}
noisy_sounds = {}
pred_sounds = {}
for snr in [-7.5, -2.5, 2.5, 7.5]:
    clean_sounds['{}'.format(snr)], _, noisy_sounds['{}'.format(snr)] = get_datas(speechs, noises, [snr for _ in range(len(speechs))], 16384*2)
    pred_sounds['{}'.format(snr)] = {}

In [16]:
with torch.no_grad():
    for name in model_names:
        print(name)
        checkpoint = torch.load("../{}/state.pt".format(name),map_location=torch.device(device))
        model.load_state_dict(checkpoint['model_state_dict'])
        model.eval()
        
        for snr in [-7.5, -2.5, 2.5, 7.5]:
            pred_sounds['{}'.format(snr)][name] = model(torch.tensor(noisy_sounds['{}'.format(snr)],device=device)).cpu().data

Normal


  return torch.max_pool1d(input, kernel_size, stride, padding, dilation, ceil_mode)


SimSiam
SimSiam-pretrain
SimSiam-round
BYOL
BYOL-round
BYOL-round-100-step


In [17]:
display_indices = np.array([i for i in range(len(speechs))])
np.random.shuffle(display_indices)
display_indices = display_indices[:5]
display_indices

array([ 10,  99, 151,  25, 356])

## SNR：-7.5

In [18]:
for idx in display_indices:
    print("Clean Speech")
    display(Audio(clean_sounds['-7.5'][idx,0],rate=16000))
    print("Noisy Speech")
    display(Audio(noisy_sounds['-7.5'][idx,0],rate=16000))
    for name in model_names:
        print("{} Estimate Speech".format(name))
        display(Audio(pred_sounds['-7.5'][name][idx,0],rate=16000))
    if idx != display_indices[-1]:
        print("------------------------------------------------------")

Output hidden; open in https://colab.research.google.com to view.

## SNR：-2.5

In [19]:
for idx in display_indices:
    print("Clean Speech")
    display(Audio(clean_sounds['-2.5'][idx,0],rate=16000))
    print("Noisy Speech")
    display(Audio(noisy_sounds['-2.5'][idx,0],rate=16000))
    for name in model_names:
        print("{} Estimate Speech".format(name))
        display(Audio(pred_sounds['-2.5'][name][idx,0],rate=16000))
    if idx != display_indices[-1]:
        print("------------------------------------------------------")

Output hidden; open in https://colab.research.google.com to view.

## SNR：2.5

In [20]:
for idx in display_indices:
    print("Clean Speech")
    display(Audio(clean_sounds['2.5'][idx,0],rate=16000))
    print("Noisy Speech")
    display(Audio(noisy_sounds['2.5'][idx,0],rate=16000))
    for name in model_names:
        print("{} Estimate Speech".format(name))
        display(Audio(pred_sounds['2.5'][name][idx,0],rate=16000))
    if idx != display_indices[-1]:
        print("------------------------------------------------------")

Output hidden; open in https://colab.research.google.com to view.

## SNR：7.5

In [21]:
for idx in display_indices:
    print("Clean Speech")
    display(Audio(clean_sounds['7.5'][idx,0],rate=16000))
    print("Noisy Speech")
    display(Audio(noisy_sounds['7.5'][idx,0],rate=16000))
    for name in model_names:
        print("{} Estimate Speech".format(name))
        display(Audio(pred_sounds['7.5'][name][idx,0],rate=16000))
    if idx != display_indices[-1]:
        print("------------------------------------------------------")

Output hidden; open in https://colab.research.google.com to view.