In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="3"

In [None]:
%load_ext autoreload
%autoreload 2

def sound( x, rate=8000, label=''):
    from IPython.display import display, Audio, HTML
    if label is '':
        display( Audio( x, rate=rate))
    else:
        display( HTML( 
        '<style> table, th, td {border: 0px; }</style> <table><tr><td>' + label + 
        '</td><td>' + Audio( x, rate=rate)._repr_html_()[3:] + '</td></tr></table>'
        ))

In [None]:
from linclf_train import dataio_prep
import numpy as np
import speechbrain as sb
import torch
import torchaudio
from tqdm.notebook import tqdm
from hyperpyyaml import load_hyperpyyaml
import pdb

In [None]:
with open('./hparams/visualize.yaml') as fin:
    hparams = load_hyperpyyaml(fin)

In [None]:
label_encoder = sb.dataio.encoder.CategoricalEncoder()
label_encoder.load_or_create(hparams['label_encoder_path'])

In [None]:
train_ds = dataio_prep(
    hparams,
    hparams['csv_path']['train'],
    label_encoder
)
train_loader = sb.dataio.dataloader.make_dataloader(
    train_ds, **hparams['valid_dataloader_opts']
)

test_ds = dataio_prep(
    hparams,
    hparams['csv_path']['test'],
    label_encoder
)
test_loader = sb.dataio.dataloader.make_dataloader(
    test_ds, **hparams['valid_dataloader_opts']
)

In [None]:
# load weights from pretrained embedder and normalizer
ssl_checkpointer = sb.utils.checkpoints.Checkpointer(
    os.path.join(hparams['ssl_checkpoints_dir'], 'task4'),
    recoverables={
        'embedding_model': hparams['embedding_model'],
        'normalizer': hparams['mean_var_norm'],
    },
)
ssl_checkpointer.recover_if_possible(
    min_key='loss',
)
for p in hparams['embedding_model'].parameters():
    p.requires_grad = False
hparams['embedding_model'].eval()
print("==> Recovering embedder checkpointer at {}".format(ssl_checkpointer.checkpoints_dir))

In [None]:
device = torch.device('cuda:0')
hparams['compute_features'].to(device)
hparams['mean_var_norm'].to(device)
hparams['embedding_model'].to(device)

In [None]:
def compute_embedding(hparams, batch):
    batch = batch.to(device)
    wavs, lens = batch.sig
    with torch.no_grad():
        feats = hparams['compute_features'](wavs)
        if hparams["amp_to_db"]:
            Amp2db = torchaudio.transforms.AmplitudeToDB(
                stype="power", top_db=80
            )  # try "magnitude" Vs "power"? db= 80, 50...
            feats = Amp2db(feats)

        # Normalization
        if hparams["normalize"]:
            feats = hparams["mean_var_norm"](feats, lens)
        embeddings = hparams['embedding_model'](feats) # [B, 1, D]
    return embeddings

In [None]:
train_embs = []
train_labels = []
with torch.no_grad():
    for batch in tqdm(
        train_loader, dynamic_ncols=True
    ):
        emb = compute_embedding(hparams, batch).squeeze(1) # [B, D]
        train_embs.append(emb.cpu().numpy())
        train_labels.append(batch.class_string_encoded.data.cpu().numpy())
train_embs = np.concatenate(train_embs)
train_labels = np.concatenate(train_labels).squeeze()

In [None]:
test_embs = []
test_labels = []
with torch.no_grad():
    for batch in tqdm(
        test_loader, dynamic_ncols=True
    ):
        emb = compute_embedding(hparams, batch).squeeze(1) # [B, D]
        test_embs.append(emb.cpu().numpy())
        test_labels.append(batch.class_string_encoded.data.cpu().numpy())
test_embs = np.concatenate(test_embs)
test_labels = np.concatenate(test_labels).squeeze()

In [None]:
def plot_PCA_projection(x_train, y_train, x_test, y_test):
    from sklearn.decomposition import PCA
    pca_transform = PCA(n_components=2, random_state=0)
    pca_train = pca_transform.fit_transform(x_train)
    pca_test = pca_transform.transform(x_test)
    import matplotlib.pyplot as plt
    plt.figure(figsize=(10,10))
    plt.subplot(111)
    for label in range(10):
        sub_embs = pca_train[y_train==label]
        print(sub_embs.shape)
        plt.scatter(sub_embs[:, 0], sub_embs[:, 1], label=label_encoder.ind2lab[label], alpha=0.3)
    plt.legend(loc='best')

In [None]:
plot_PCA_projection(train_embs, train_labels, test_embs, test_labels)

In [None]:
def plot_tsne_projection(x_train, y_train, x_test, y_test, pca_dim=None):
    from sklearn.decomposition import PCA
    from sklearn.manifold import TSNE
    if pca_dim is not None:
        pca_transform = PCA(n_components=50, random_state=0)
        x_train = pca_transform.fit_transform(x_train)
        x_test = pca_transform.transform(x_test)
    tsne_transform = TSNE(n_components=2, random_state=0, verbose=1, perplexity=40, n_iter=300)
    tsne_train = tsne_transform.fit_transform(x_train)
#     tsne_test = tsne_transform.transform(x_test)
    import matplotlib.pyplot as plt
    plt.figure(figsize=(10,10))
    plt.subplot(111)
    for label in range(10):
        sub_embs = tsne_train[y_train==label]
        print(sub_embs.shape)
        plt.scatter(sub_embs[:, 0], sub_embs[:, 1], label=label_encoder.ind2lab[label], alpha=0.3)
    plt.legend(loc='best')

In [None]:
plot_tsne_projection(train_embs, train_labels, test_embs, test_labels, pca_dim=None)