In [1]:
import numpy as np
import os, sys, librosa
from librosa import display
from scipy import signal
from matplotlib import pyplot as plt
import matplotlib.gridspec as gridspec
import IPython.display as ipd
import pandas as pd
from numba import jit
from IPython.display import Audio 
import IPython

import cdpam

from scipy.stats import pearsonr, spearmanr

import scipy.spatial as sp

import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import numpy as np

import librosa
from IPython.display import Audio, display
from PIL import Image
import matplotlib.pyplot as plt
import scipy.stats as stats
import collections as c

from torch.nn.modules.module import _addindent

import copy
import os
import math

import soundfile as sf
from matplotlib.pyplot import figure

from sklearn.preprocessing import MinMaxScaler

%matplotlib inline
import warnings
warnings.filterwarnings("ignore")

### Per Antognini et al. (Synthesizing Diverse, High-Quality Audio Textures) - https://arxiv.org/abs/1806.08002:
#### Input representation 
1. Ensure hop_length is less than half of win_length
2. Get absolute value of stft, add 1 and take natural logarithm. 
   Adding 1 guarantees that the log-spectrogram is finite and positive

#### Network architecture
1. 6 CNN layers, with filter sizes 2^n
2. Each layer with 512 filters
3. Each filter randomly drawn from Glorot initialization (In PyTorch - torch.nn.init.xavier_uniform) 


In [2]:
BOOTSTRAP_ITERS = 10 #To generate SEMs and Means.

N_PARALLEL_CNNS=6

N_FFT = 512 
K_HOP = 128 
N_FREQ= 257
N_FILTERS = 512

possible_kernels = [2,4,8,16,64,128,256,512,1024,2048]
filters = [0]*N_PARALLEL_CNNS
for j in range(N_PARALLEL_CNNS):
    filters[j]=possible_kernels[j]
    

use_cuda = torch.cuda.is_available() #use GPU if available
print('GPU available =',use_cuda)
dtype = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor

GPU available = False


In [3]:
def read_audio_spectrum(filename):
    #x, fs  = sf.read(filename)
    x, fs  = librosa.load(filename, sr=16000)
    R = np.abs(librosa.stft(x, n_fft=N_FFT, hop_length=K_HOP, win_length=N_FFT,  center=False))  
    R += 1
    R = np.log(R)
    return R,fs


'''
Make input such that frequency bins of Spectrogram are channels.
i.e. if Spectrogram shape is 257 X 247 => Input to convolution should be 1 X 257 X 1 X 247 
(PyTorch uses batch X channels X Height X Width)
'''
def prepare_input(filename):
    R, fs = read_audio_spectrum(filename)
    a_style = np.ascontiguousarray(R[None,None,:,:])
    a_style = torch.from_numpy(a_style).permute(0,2,1,3) 
    converted_img = Variable(a_style).type(dtype)
    return converted_img

# Glorot initialization
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.xavier_uniform_(m.weight)

In [4]:

# This variable lets you 'tap' into the style_net to retrieve Gram Matrices at the layer matching that name.
style_layers_default = ['relu_2']


class style_net(nn.Module):
    """Here create the network you want to use by adding/removing layers in nn.Sequential"""
    def __init__(self, num_channels, num_filters, filter_size):
        super(style_net, self).__init__()
        self.layers = nn.Sequential(c.OrderedDict([
                            ('conv1',nn.Conv2d(num_channels, num_filters, kernel_size=(1,filter_size),bias=False)),
                            ('relu1',nn.ReLU())]))

            
    def forward(self,input):
        out = self.layers(input)
        return out

class GramMatrix(nn.Module):
    '''Compute the feature correlations'''
    def forward(self, input):
        a, b, c, d = input.size()         
        features = input.view(b, a * c * d)
        features2=features.unsqueeze(0)
        G = torch.matmul(features2, torch.transpose(features2, 1,2))
        return G.div(a * c * d)


    
'''
Initialize parallel CNNs as suggested by Antognini et al.
Typical values:
num_cnns = N_PARALLEL_CNNS
per_cnn_num_channels = N_FREQ
per_cnn_num_filters = N_FILTERS
per_cnn_filter_sizes = [2,4,8,16,64,128]
'''
def initialize_parallel_cnns(num_cnns, per_cnn_num_channels, per_cnn_num_filters, per_cnn_filter_sizes):
    cnnlist=[] 
    for j in range(num_cnns) :
        cnn = style_net(per_cnn_num_channels, per_cnn_num_filters, per_cnn_filter_sizes[j])
        cnn.apply(lambda x: weights_init(x))
        for param in cnn.parameters():
            param.requires_grad = False
        if use_cuda:
            cnn = cnn.cuda()

        cnnlist.append(cnn)
        
    return cnnlist


In [5]:
def get_gram(cnn, formatted_input, style_layers=style_layers_default):
    
    result = []
    cnn = copy.deepcopy(cnn)
    
    model = nn.Sequential()
    layer_list = list(cnn.layers)
    gram = GramMatrix()
     
    i = 1  
    for layer in layer_list:
        if isinstance(layer, nn.Conv2d): 
            name = "conv_" + str(i)
            model.add_module(name, layer)
            
            if name in style_layers: 
                target_feature = model(formatted_input).clone()
                target_feature_gram = gram(target_feature)
                target_feature_gram=torch.flatten(target_feature_gram)
                target_feature_gram=target_feature_gram.numpy()
                target_feature_gram=target_feature_gram.reshape(512,512)
                target_feature_gram= MinMaxScaler().fit_transform(target_feature_gram)
                
                result.append((target_feature_gram))

        if isinstance(layer, nn.ReLU):
            name = "relu_" + str(i)
            model.add_module(name, layer)
            
            if name in style_layers:
                target_feature = model(formatted_input).clone()
                target_feature_gram = gram(target_feature)
                target_feature_gram=torch.flatten(target_feature_gram)
                target_feature_gram=target_feature_gram.numpy()
                target_feature_gram=target_feature_gram.reshape(512,512)
                target_feature_gram= MinMaxScaler().fit_transform(target_feature_gram)
                result.append((target_feature_gram))
               
        if isinstance(layer, nn.MaxPool2d): 
            name = "pool_" + str(i)
            model.add_module(name, layer)
            
            if name in style_layers:
                target_feature = model(formatted_input).clone()
                target_feature_gram = gram(target_feature)
                target_feature_gram=torch.flatten(target_feature_gram)
                target_feature_gram=target_feature_gram.numpy()
                target_feature_gram=target_feature_gram.reshape(512,512)
                target_feature_gram= MinMaxScaler().fit_transform(target_feature_gram)
                result.append((target_feature_gram))
                
        i += 1
    return result

## Define Gram Loss (normalized w.r.t a target as defined in Antognini et. al)

### Based on the normalized gram loss defined in Antognini et al. (paper link at the top of this notebook)

#### Find Gram loss between two Gram Matrices. Normalized by the 'referenced' or 'target' gram matrix

$$Gram\_Loss_{G, \hat{G}} = \frac{\sum_{k,\mu,v}(G_{\mu,v}^k - \hat{G}_{\mu,v}^k)^2}{\sum_{k,\mu,v}(\hat{G}_{\mu,v}^k)^2}$$


In [6]:
def get_antognini_gram_loss(gm_1, gm_2):
    # Here both gm_1 and gm_2 are of shape 6 X 1 X 512 X 512
    frobenius_sum_of_gm_diff = 0
    frobenius_sum_of_target_norm = 0
    for layer_num in range(gm_1.shape[0]):
        frobenius_sum_of_gm_diff += np.linalg.norm(gm_1[layer_num][0] - gm_2[layer_num][0])**2
        frobenius_sum_of_target_norm += np.linalg.norm(gm_2[layer_num][0])**2
    return frobenius_sum_of_gm_diff/frobenius_sum_of_target_norm

In [7]:
def compute_cos_distance(gram1,gram2): 
    cos = nn.CosineSimilarity(dim=0, eps=1e-6)
    gram1_ = np.squeeze(gram1, axis=1)
    gram2_ = np.squeeze(gram2, axis=1)
    distance = np.zeros((6,1))
    for i in range(6): 
        temp= cos(torch.from_numpy(gram1_[i].flatten()), torch.from_numpy(gram2_[i].flatten())) 
        distance[i]=temp
    return 1-distance.mean()

In [8]:
cnnlist = initialize_parallel_cnns(N_PARALLEL_CNNS, N_FREQ, N_FILTERS, filters)

In [14]:
#
# We currently use compute_cos_distance. This can be easily switched to using get_antognini_gram_loss
#
def get_param_sense_losses(audio_dir, is_gan=True, num_interp_steps=10):
    example_dirs = [_ for _ in os.listdir(audio_dir) if _ !='.DS_Store']
    print(example_dirs)
    all_gm_losses = np.empty((0,num_interp_steps))
    for example_dir in example_dirs:

        print('Analysing example ', example_dir)
        if is_gan:
            example_loc_path = os.path.join(audio_dir, example_dir, 'one_z_pitch_sweep')
        else:
            example_loc_path = os.path.join(audio_dir, example_dir)

        example_loc = os.listdir(example_loc_path)
        example_loc = [_ for _ in os.listdir(example_loc_path) if '.wav' in _]
        if is_gan:
            example_loc.sort(key=lambda x:int(x.split('_')[3].split('.')[0]))
        else:
            example_loc.sort(key=lambda x:float(x.split('_')[1].split('.wav')[0]))
        #print(example_loc)
        audio_file_list = []
        for example_audio in example_loc:
            if '.wav' in example_audio and example_audio:
                audio_file_list.append(os.path.join(example_loc_path, example_audio))
        
        audio_0 = prepare_input(audio_file_list[0])
        audio_0_gm = []
        for j in range(N_PARALLEL_CNNS):
            temp = get_gram(cnnlist[j], audio_0, style_layers=style_layers_default)
            audio_0_gm.append(temp)
        audio_0_gm = np.array(audio_0_gm)

        example_gm_losses = np.array([])
        for audio_file in audio_file_list:
            audio_i = prepare_input(audio_file)

            audio_i_gm = []
            for j in range(N_PARALLEL_CNNS):
                temp = get_gram(cnnlist[j], audio_i, style_layers=style_layers_default)
                audio_i_gm.append(temp)
            audio_i_gm = np.array(audio_i_gm)
            example_gm_losses = np.append(example_gm_losses, compute_cos_distance(audio_0_gm, audio_i_gm))
        #print(example_gm_losses)
        all_gm_losses = np.append(all_gm_losses, np.array([example_gm_losses]), axis=0)

    print('GM Loss array', np.mean(all_gm_losses, axis=0))
    return np.mean(all_gm_losses, axis=0)

# MorphGAN

In [15]:
morphgan_audio='/Users/purnimakamath/appdir/Github/ieee-tx-on-mm/data/water-wind-new/morphgan/audio/morph'
morphgan_paramsense_losses = get_param_sense_losses(morphgan_audio, is_gan=True, num_interp_steps=11)

lin_line = [a for a in np.arange(0,1.1,0.1)]
print('MorphGAN: Corr coef for Morph Param Sensitivity = ', pearsonr(lin_line, morphgan_paramsense_losses))

['2022-10-17 22:50', '2022-10-17 22:51', '2022-10-17 22:56', '2022-10-17 22:45', '2022-10-17 22:54', '2022-10-17 22:55', '2022-10-17 22:52', '2022-10-17 22:46', '2022-10-17 22:49', '2022-10-17 22:47']
Analysing example  2022-10-17 22:50
Analysing example  2022-10-17 22:51
Analysing example  2022-10-17 22:56
Analysing example  2022-10-17 22:45
Analysing example  2022-10-17 22:54
Analysing example  2022-10-17 22:55
Analysing example  2022-10-17 22:52
Analysing example  2022-10-17 22:46
Analysing example  2022-10-17 22:49
Analysing example  2022-10-17 22:47
GM Loss array [0.         0.03500256 0.07726649 0.14428613 0.22187538 0.27423
 0.29757555 0.30237291 0.32440894 0.34319202 0.35346957]
MorphGAN: Corr coef for Morph Param Sensitivity =  (0.9610206267161887, 2.522054880006216e-06)


# One Hot GAN

In [16]:
onehot_audio='/Users/purnimakamath/appdir/Github/ieee-tx-on-mm/data/water-wind/onehot/audio/morph'
onehot_paramsense_losses = get_param_sense_losses(onehot_audio, is_gan=True, num_interp_steps=11)

lin_line = [a for a in np.arange(0,1.1,0.1)]
print('One Hot: Corr coef for Morph Param Sensitivity = ', pearsonr(lin_line, onehot_paramsense_losses))

['2022-09-02 15:13', '2022-09-02 15:14', '2022-09-02 15:15', '2022-09-02 15:12', '2022-09-02 15:09', '2022-09-02 15:19', '2022-09-02 15:21', '2022-09-02 15:17', '2022-09-02 15:10', '2022-09-02 15:18']
Analysing example  2022-09-02 15:13
Analysing example  2022-09-02 15:14
Analysing example  2022-09-02 15:15
Analysing example  2022-09-02 15:12
Analysing example  2022-09-02 15:09
Analysing example  2022-09-02 15:19
Analysing example  2022-09-02 15:21
Analysing example  2022-09-02 15:17
Analysing example  2022-09-02 15:10
Analysing example  2022-09-02 15:18
GM Loss array [0.         0.01987916 0.03990053 0.06795186 0.1137172  0.18776335
 0.25538763 0.27665292 0.27850003 0.28538674 0.28483344]
One Hot: Corr coef for Morph Param Sensitivity =  (0.9622982016888724, 2.174868328545306e-06)


# Morph2

In [17]:
morph2_audio='/Users/purnimakamath/appdir/Github/ieee-tx-on-mm/data/water-wind/morph2/audio/morph'
morph2_paramsense_losses = get_param_sense_losses(morph2_audio, is_gan=False, num_interp_steps=11)

lin_line = [a for a in np.arange(0,1.1,0.1)]
print('Morph2: Corr coef for Morph Param Sensitivity = ', pearsonr(lin_line, morph2_paramsense_losses))

['9', '0', '7', '6', '1', '10', '8', '4', '3', '2', '5']
Analysing example  9
Analysing example  0
Analysing example  7
Analysing example  6
Analysing example  1
Analysing example  10
Analysing example  8
Analysing example  4
Analysing example  3
Analysing example  2
Analysing example  5
GM Loss array [0.         0.18931039 0.19119945 0.22618087 0.34717031 0.39623025
 0.42982481 0.45498322 0.46450749 0.46383756 0.33671759]
Morph2: Corr coef for Morph Param Sensitivity =  (0.8353060703884401, 0.0013701879567764117)
