Should be signal in signal out.

In [7]:
import numpy as np
import tensorflow as tf
# import tensorflow.contrib.slim as slim
from keras import layers
import keras as keras

# LEAKY RELU UNIT
# “pointwise nonlinear leaky rectified linear unit (LReLU) [28] max(0.2x, x)” 
# ([Germain et al., 2018, p. 2](zotero://select/library/items/A6D78SNY)) 
# ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=2&annotation=9QWWB82R))
def LeakyReLU(x):
    return tf.maximum(0.2*x,x)

# GENERATE DILATED LAYER FROM 1D SIGNAL
def signal_to_dilated(signal, dilation, n_channels):
    shape = tf.shape(signal)
    pad_elements = dilation - 1 - (shape[2] + dilation - 1) % dilation
    dilated = tf.pad(signal, [[0, 0], [0, 0], [0, pad_elements], [0, 0]])
    dilated = tf.reshape(dilated, [shape[0],-1,dilation,n_channels])
    return tf.transpose(dilated, perm=[0,2,1,3]), pad_elements


# COLLAPSE DILATED LAYER TO 1D SIGNAL
def dilated_to_signal(dilated, pad_elements, n_channels):
    shape = tf.shape(dilated)
    signal = tf.transpose(dilated, perm=[0,2,1,3])
    signal = tf.reshape(signal, [shape[0],1,-1,n_channels])
    return signal[:,:,:shape[1]*shape[2]-pad_elements,:]


# ADAPTIVE BATCH NORMALIZATION LAYER
#b) Adaptive normalization:
def AdaptiveNormalization(x):
    alpha=tf.Variable(1.0,name='alpha')
    beta=tf.Variable(0.0,name='beta')
    return alpha*x + beta*layers.BatchNormalization(x)


# IDENTITY INITIALIZATION OF CONV LAYERS
def identity_initializer():
    def _initializer(shape, dtype=tf.float32, partition_info=None):
        array = np.zeros(shape, dtype=float)
        cx, cy = shape[0]//2, shape[1]//2
        for i in range(np.minimum(shape[2],shape[3])):
            array[cx, cy, i, i] = 1
        return tf.constant(array, dtype=dtype)
    return _initializer


# “In our experiments, simple training losses (e.g., L1) led to noticeably degraded output quality at lower signal-to-noise ratios (SNRs).” ([Germain et al., 2018, p. 2](zotero://select/library/items/A6D78SNY)) ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=2&annotation=DZN467TR))
def loss_function(target, current, type):
    if type == 'L1':
        return tf.reduce_mean(tf.abs(target-current))
    elif type == 'L2':
        return tf.reduce_mean(tf.square(target-current))


“To compute the loss between two waveforms, we apply a pretrained audio classification network to each waveform and compare the internal activation patterns induced in the network by the two signals” ([Germain et al., 2018, p. 1](zotero://select/library/items/A6D78SNY)) ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=1&annotation=Y3F49L4C))

“The network consists of 15 convolutional layers with 3×1 kernels, batch normalization, LReLU units, and zero padding” ([Germain et al., 2018, p. 2](zotero://select/library/items/A6D78SNY)) ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=2&annotation=J3JNI54Q))


In [5]:
# FEATURE LOSS NETWORK
import keras as keras
def LossNetwork(input, n_layers=14, training=True, reuse=False, norm_type="SBN"):
    # “The number of channels is doubled every 5 layers, with 32 channels in the first intermediate layer.” 
    base_channels=32
    doubling_rate=5
    conv_layers = []
    if norm_type == "NM": # ADAPTIVE BATCH NORM
        norm_fn = AdaptiveNormalization
    elif norm_type == "SBN": # BATCH NORM
        norm_fn = layers.BatchNormalization
    else: # NO LAYER NORMALIZATION
        norm_fn = None
        
    for current_layer in range(n_layers):
        n_channels = base_channels * (2 ** (current_layer // doubling_rate)) # UPDATE CHANNEL COUNT
        if current_layer == 0:
            # "Each Layer is decimated by 2" - just means stride of 2 in the time dimension.
            net = layers.Conv2D(input, n_channels, kernel_size=[1, 3], activation_fn=LeakyReLU, normalizer_fn=norm_fn, stride=[1, 2],
                              scope='loss_conv_%d' % current_layer, padding='SAME', reuse=reuse)
            conv_layers.append(net)
        elif current_layer < n_layers - 1:
            net = layers.Conv2D(layers[-1], n_channels, kernel_size=[1, 3], activation_fn=LeakyReLU, normalizer_fn=norm_fn,
                              stride=[1, 2], scope='loss_conv_%d' % current_layer, padding='SAME', reuse=reuse)
            conv_layers.append(net)
        else:
            net = layers.Conv2D(layers[-1], n_channels, kernel_size=[1, 3], activation_fn=LeakyReLU, normalizer_fn=norm_fn,
                              scope='loss_conv_%d' % current_layer, padding='SAME', reuse=reuse)
            conv_layers.append(net)

    return conv_layers


def featureloss(target, current, loss_weights, loss_layers, n_layers=14, norm_type="SBN", base_channels=32, blk_channels=5):

    feat_current = LossNetwork(current, reuse=False, n_layers=n_layers, norm_type=norm_type)

    feat_target = LossNetwork(target, reuse=True, n_layers=n_layers, norm_type=norm_type,
                         base_channels=base_channels, blk_channels=blk_channels)

    loss_vec = [0]
    for id in range(loss_layers):
        loss_vec.append(loss_function(feat_current[id], feat_target[id], type="L1") / loss_weights[id])

    for id in range(1,loss_layers+1):
        loss_vec[0] += loss_vec[id]

    return loss_vec


“receptive field of the pipeline is 2^14 + 1 samples, i.e., about 1 s of audio for fs = 16 kHz.” ([Germain et al., 2018, p. 2](zotero://select/library/items/A6D78SNY)) ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=2&annotation=WTGLQ8JQ))

In [None]:
# Context aggregation MODULE:
# ENHANCEMENT NETWORK
def senet(input, n_layers=13, training=True, reuse=False, norm_type="NM", n_channels=32):
    # ksz = kernel size
    # n_channels = number of feature maps in paper (64 in paper)
    if norm_type == "NM": # ADAPTIVE BATCH NORM
        norm_fn = AdaptiveNormalization
    elif norm_type == "SBN": # BATCH NORM
        norm_fn = layers.BatchNormalization
    else: # NO LAYER NORMALIZATION
        norm_fn = None

    for current_layer in range(n_layers):

        if current_layer == 0:
            net = layers.Conv2D(input, n_channels, kernel_size=[1, 3], activation_fn=LeakyReLU,
                              normalizer_fn=norm_fn, scope='se_conv_%d' % current_layer,
                              padding='SAME', reuse=reuse)
        else:
            # The content of each intermediate layer is computed from the previous layer via a dilated convolution with 3 × 1 convolutional kernels
            # “Here, we increase the dilation factor exponentially with depth from 2^0 for the 1st intermediate layer to 2^12 for the 13th one.” ([Germain et al., 2018, p. 2](zotero://select/library/items/A6D78SNY)) ([pdf](zotero://open-pdf/library/items/P4HPP4P3?page=2&annotation=53YMMHY2))
            dilation_factor = 2 ** current_layer
            net, pad_elements = signal_to_dilated(net, n_channels=n_channels, dilation=dilation_factor)
            net = layers.Conv2D(net, n_channels, kernel_size=[1, 3], activation=LeakyReLU,
                              normalizer_fn=norm_fn, scope='se_conv_%d' % current_layer,
                              padding='SAME', reuse=reuse)
            net = dilated_to_signal(net, n_channels=n_channels, pad_elements=pad_elements)

    net = layers.Conv2D(net, n_channels, kernel_size=[1, 3], activation=LeakyReLU,
                      normalizer_fn=norm_fn, scope='se_conv_last',
                      padding='SAME', reuse=reuse)

    output = layers.Conv2D(net, 1, kernel_size=[1, 1], activation=None,
                         scope='se_fc_last', padding='SAME', reuse=reuse)

    return output