In [17]:
import torch 
import torch.nn as nn
import numpy as np
import librosa
from torchsummary import summary
from torch.utils.data import DataLoader, Dataset
from scipy.io import wavfile

In [22]:
class WaveNetLoader(Dataset):
    def __init__(self, in_path, out_path, normalize = False, train = False, val = False, test = False):
        self.input = in_path
        self.output = out_path 
        self.train, self.val, self.test = train, val, test
        
        in_rate, in_data = wavfile.read(self.input)
        out_rate, out_data = wavfile.read(self.output)
        assert in_rate == out_rate, "in_file and out_file must have same sample rate"


        # Load audio files  
        def normalize(data):
            data_max = max(data)
            data_min = min(data)
            data_norm = max(data_max,abs(data_min))
            return data / data_norm
        
        # Trim the length of audio to equal the smaller wav file
        if len(in_data) > len(out_data):
            print("Trimming input audio to match output audio")
        in_data = in_data[0:len(out_data)]
        if len(out_data) > len(in_data): 
            print("Trimming output audio to match input audio")
        out_data = out_data[0:len(in_data)]

        # If stereo data, use channel 0
        if len(in_data.shape) > 1:
            print("[WARNING] Stereo data detected for in_data, only using first channel (left channel)")
            in_data = in_data[:,0]
        if len(out_data.shape) > 1:
            print("[WARNING] Stereo data detected for out_data, only using first channel (left channel)")
            out_data = out_data[:,0]

        #normalize data
        if normalize == True:
            in_data = normalize(in_data)
            out_data = normalize(out_data)

        # Convert PCM16 to FP32
        if in_data.dtype == "int16":
            in_data = in_data/32767
            print("In data converted from PCM16 to FP32")
        if out_data.dtype == "int16":
            out_data = out_data/32767
            print("Out data converted from PCM16 to FP32")

        self.sample_time = 100e-3
        sample_size = int(in_rate * self.sample_time)
        length = len(in_data) - len(in_data) % sample_size

        x = in_data[:length].reshape((-1, 1, sample_size)).astype(np.float32)
        y = out_data[:length].reshape((-1, 1, sample_size)).astype(np.float32)

        split = lambda d: np.split(d, [int(len(d) * 0.6), int(len(d) * 0.8)])

        self.d = {}
        self.d["x_train"], self.d["x_valid"], self.d["x_test"] = split(x)
        self.d["y_train"], self.d["y_valid"], self.d["y_test"] = split(y)
        self.d["mean"], self.d["std"] = self.d["x_train"].mean(), self.d["x_train"].std()
        for key in "x_train", "x_valid", "x_test":
            self.d[key] = (self.d[key] - self.d["mean"]) / self.d["std"]

    def __len__(self):

        if self.val: 
            return len(self.d["x_valid"])
        
        elif self.test: 
            return len(self.d["x_test"])
        
        elif self.train: 
            return len(self.d["x_train"])
             

    def __getitem__(self, index):

        if self.val: 
            return torch.from_numpy(self.d["x_valid"]), torch.from_numpy(self.d["y_valid"])
        
        elif self.test: 
            return torch.from_numpy(self.d["x_test"]), torch.from_numpy(self.d["y_test"])
        
        elif self.train: 
            return torch.from_numpy(self.d["x_train"]), torch.from_numpy(self.d["y_train"])


in_path = "D:\\Documents\\CMU_SUBJECTS\\BlackBoxAudioFx\\PedalNetRT\\data\\ts9_test1_in_FP32.wav"
out_path = "D:\\Documents\\CMU_SUBJECTS\\BlackBoxAudioFx\\PedalNetRT\\data\\ts9_test1_out_FP32.wav"  
train_data = WaveNetLoader(in_path, out_path, normalize = True, train = True)
test_data = WaveNetLoader(in_path, out_path, normalize = True, test = True)
val_data = WaveNetLoader(in_path, out_path, normalize = True, val = True)

num_workers, batch_size = 4, 64
train_loader = DataLoader(train_data, batch_size = 64, num_workers= num_workers)
val_loader = DataLoader(val_data, batch_size = 64, num_workers= num_workers)
test_loader = DataLoader(test_data, batch_size = 64, num_workers= num_workers)


In [5]:
"""
Error function of choice is the error to signal ratio 
"""
def pre_emp(x, coeff = 0.95):

 """ y[n] = x[n] - coeff * x[n-1] 
    coefficient adapted from paper : 0.95
 """ 
 torch.cat(x[:,:,0:1], x[:,:,1:] - coeff * x[:,:,:-1], dim = 2)

def error_to_signal(y,y_pred): 
   y,y_pred = pre_emp(y), pre_emp(y_pred, )
   return np.sum(np.power(y-y_pred,2), dim = 2 ) / (np.sum(np.power(y,2), dim = 2)  +  1e-10)


# sample_data = "D:\\Documents\\CMU_SUBJECTS\\BlackBoxAudioFx\\NeuralAudioModelling\\ts9_test1_in_FP32.wav"
# audio,_ = librosa.load(sample_data)

# audio_copy = audio.copy()
# print(error_to_signal(audio,audio_copy))


In [7]:
"""
Dilated causal convolutions in WaveNet

Causal convolutions 
Causal comes from causality, which means if we have a canonical 'direction' we are reading our data, then data that is ahead of the current position cannot factor 
into the calculation. This is most obvious in time series, so only previous timesteps factor into the current and not something 'future' relative to the current. 
But note it can also be applied to other forms of data like 2D images (like in PixelCNN for e.g.)

The causal convolution concept comes about because when you do convolution, the kernel may overlap with the data from the 'future' points thus breaking causality. 
We don't want this so usually we introduce some kind of zero masking onto these points. This masking procedure is what sets apart causal convolution from standard 
convolution.

"""

class CausalConv1d(torch.nn.Conv1d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, dilation=1, groups=1, bias=True):
        self.__padding = (kernel_size - 1) * dilation

        super(CausalConv1d, self).__init__(
            in_channels,
            out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding = self.__padding,
            dilation=dilation,
            groups=groups,
            bias=bias,
        )

    def forward(self, input):
        result = super(CausalConv1d, self).forward(input)
        if self.__padding != 0:
            return result[:, :, : -self.__padding]
        return result

def _conv_stack(dilations, in_channels, out_channels, kernel_size):
    """
    Create stack of dilated convolutional layers, outlined in WaveNet paper:
    https://arxiv.org/pdf/1609.03499.pdf
    """
    return nn.ModuleList(
        [
            CausalConv1d(
                in_channels=in_channels,
                out_channels=out_channels,
                dilation=d,
                kernel_size=kernel_size,
            )
            for i, d in enumerate(dilations)
        ]
    )

class WaveNet(nn.Module):
    def __init__(self, num_channels, dilation_depth, num_repeat, kernel_size = 2 ): 
        super(WaveNet,self).__init__()
        dilation = [2* d for d in range(dilation_depth)] * num_repeat
        internal_channels = int(num_channels * 2)
        self.hidden = _conv_stack(dilation, num_channels, internal_channels, kernel_size)
        self.residuals = _conv_stack(dilation, num_channels, num_channels, 1)
        self.input_layer = CausalConv1d(
            in_channels=1,
            out_channels=num_channels,
            kernel_size=1,
        )

        self.linear_mix = nn.Conv1d(
            in_channels=num_channels * dilation_depth * num_repeat,
            out_channels=1,
            kernel_size=1,
        )
        self.num_channels = num_channels

    def forward(self,x): 
        out = x
        skips = []
        out = self.input_layer(out)

        for hidden, residual in zip(self.hidden, self.residuals):
            x = out
            out_hidden = hidden(x)

            # gated activation
            # split (32,16,3) into two (16,16,3) for tanh and sigm calculations
            out_hidden_split = torch.split(out_hidden, self.num_channels, dim=1)
            out = torch.tanh(out_hidden_split[0]) * torch.sigmoid(out_hidden_split[1])

            skips.append(out)

            out = residual(out)
            out = out + x[:, :, -out.size(2) :]

        # modified "postprocess" step:
        out = torch.cat([s[:, :, -out.size(2) :] for s in skips], dim=1)
        out = self.linear_mix(out)
        return out


        

In [14]:
"""
hyperparameters 
"""

num_channels = 4
dilation_depth = 9
num_repeat = 2 
kernel_size =3
learning_rate, batch_size = 3e-3, 64
wavenet_model = WaveNet(
            num_channels,
            dilation_depth,
            num_repeat,
            kernel_size
        )
summary(wavenet_model) 

device = "cpu"
if torch.cuda.is_available():
    device == "cuda"
    wavenet_model = wavenet_model.to(device)

optimizer = torch.optim.Adam(wavenet_model.parameters(), lr= learning_rate)


Layer (type:depth-idx)                   Param #
├─ModuleList: 1-1                        --
|    └─CausalConv1d: 2-1                 104
|    └─CausalConv1d: 2-2                 104
|    └─CausalConv1d: 2-3                 104
|    └─CausalConv1d: 2-4                 104
|    └─CausalConv1d: 2-5                 104
|    └─CausalConv1d: 2-6                 104
|    └─CausalConv1d: 2-7                 104
|    └─CausalConv1d: 2-8                 104
|    └─CausalConv1d: 2-9                 104
|    └─CausalConv1d: 2-10                104
|    └─CausalConv1d: 2-11                104
|    └─CausalConv1d: 2-12                104
|    └─CausalConv1d: 2-13                104
|    └─CausalConv1d: 2-14                104
|    └─CausalConv1d: 2-15                104
|    └─CausalConv1d: 2-16                104
|    └─CausalConv1d: 2-17                104
|    └─CausalConv1d: 2-18                104
├─ModuleList: 1-2                        --
|    └─CausalConv1d: 2-19                20
|    └─Ca

In [15]:

learning_rate = hparams["learning_rate"]
batch_size = hparams["batch_size"]

NameError: name 'wavenet_model' is not defined