In [1]:
! pip install pydub
! pip install ffmpeg

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hdone
  Created wheel for ffmpeg: filename=ffmpeg-1.4-py3-none-any.whl size=6084 sha256=4c4545a6ebb5f35b78ae9cd28991da2ed92ac17d9255bc645f592184344e1e9e
  Stored in directory: /root/.cache/pip/wheels/30/33/46/5ab7eca55b9490dddbf3441c68a29535996270ef1ce8b9b6d7
Successfully built ffmpeg
Installing collected packages: ffmpeg
Successfully installed ffmpeg-1.4


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import numpy as np
import librosa
import tensorflow as tf 
import tensorflow.keras.layers as layers
from sklearn.model_selection import train_test_split
from pydub import AudioSegment
from keras.models import Sequential
from keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
import soundfile as sf
from IPython.display import Audio

In [4]:
def mu_law_encode(signal, quantization_channels):
    # Manual mu-law companding and mu-bits quantization
    mu = (quantization_channels - 1)
    # signal should be in [-1, +1]
    magnitude = np.log1p(mu * np.abs(signal)) / np.log1p(mu)
    signal = np.sign(signal) * magnitude

    # Map signal from [-1, +1] to [0, mu-1]
    quantized_signal = ((signal + 1) / 2 * mu + 0.5)

    return list(quantized_signal)

class Preprocess:

    def __init__(self, time_steps, sampling_rate, datapath, quantization_channels=256):
        self.time_steps = time_steps
        self.sampling_rate = sampling_rate
        self.datapath = datapath # datapath is the directory path that contains the .wav files
        self.inputs = []
        self.quantization_channels = quantization_channels
        self.normalized = False

    def load_data(self):
        # transform mp3 to wav
        mp3_files = librosa.util.find_files(self.datapath, ext=['mp3'])
        print(f"Found {len(mp3_files)} mp3 files.")
        i = 0
        for mp3_file in mp3_files:
            wav_file = mp3_file[:-4] + '.wav'
            sound = AudioSegment.from_file(mp3_file, format="mp3")
            sound.export(wav_file, format="wav")
            i += 1
            print(f"Created {i} .wav files")

        wav_files = librosa.util.find_files(self.datapath, ext=['wav'])

        if len(wav_files) == 0:
            raise FileNotFoundError("No .wav files found in the directory")
        
        print("Found {} .wav files in the directory".format(len(wav_files)))

        for file in wav_files:
            # load the audio file, range from -1 to 1
            audio, sr = librosa.load(file, sr=self.sampling_rate, mono=True)
            self.normalized = True
            # convert the audio file to mono
            # audio = librosa.to_mono(audio)
            # normalize the audio file
            if not self.normalized:
                audio = audio / np.max(np.abs(audio))
            # discretize the audio file
            audio = mu_law_encode(audio, self.quantization_channels)
            self.inputs.append(audio)
    
        print("Finished loading data")

    # takes in a list of inputs, each is a long array
    def create_dataset(self):

        self.load_data()

        x = []
        y = []

        cnt = 0

        for input in self.inputs:
            for i in range(0, len(input) - self.time_steps):
                # preparing input and output sequences
                input_ = input[i:i + self.time_steps]
                output = input[i + self.time_steps]
                x.append(input_)
                y.append(output)
            cnt += 1
            print(f"Loaded {cnt} input data")
        
        x = np.array(x)
        y = np.array(y)
        y = tf.one_hot(y, self.quantization_channels)

        test_size = 0.2
        i = int(len(x) * test_size)
        x_tr = x[i:]
        x_test = x[:i]
        y_tr = y[i:]
        y_test = y[:i]
        
        print("Finished creating dataset")
        return x_tr, x_test, y_tr, y_test

In [17]:
def mu_law_decode(signal, quantization_channels):
    # Calculate inverse mu-law companding and dequantization
    mu = quantization_channels - 1
    # Map signal from [0, mu-1] to [-1, +1]
    signal = 2 * (signal.astype(np.float32) / mu) - 1
    signal = np.sign(signal) * (1.0 / mu) * ((1.0 + mu)**abs(signal) - 1.0)
    return signal

class Wavenet(tf.keras.Model):
    def __init__(self, timesteps = 32, output_dims = 256, quantization_channels=256, **kwargs):
        super().__init__(**kwargs)
        self.timesteps = timesteps
        self.output_dims = output_dims
        self.quantization_channels = quantization_channels
        self.model = tf.keras.Sequential([
            layers.Embedding(self.quantization_channels, 100, input_length=32, trainable=True),
            layers.Conv1D(64, 3, padding='causal', activation='relu'),
            layers.Dropout(0.2),
            layers.MaxPool1D(2),
            layers.Conv1D(128, 3, activation='relu', dilation_rate=2, padding='causal'),
            layers.Dropout(0.2),
            layers.MaxPool1D(2),
            layers.Conv1D(256, 3, activation='relu', dilation_rate=4, padding='causal'),
            layers.Dropout(0.2),
            layers.MaxPool1D(2),
            layers.GlobalMaxPool1D(),
            layers.Dense(256, activation='relu'),
            layers.Dense(self.output_dims, activation='softmax'),
        ])

    def call(self, inputs):
        return self.model(inputs)

    def generate(self, generate_time, sampling_rate):
        mean = self.quantization_channels / 2
        std = mean * 0.909
        no_samples = generate_time * sampling_rate
        inputs = tf.random.normal((no_samples, self.timesteps), mean=mean, stddev=std, dtype=tf.float32)

        # forward pass:
        predicted_output = self.model.predict(inputs)
        print(predicted_output.shape)

        # generate predictions
        labels = np.argmax(predicted_output, axis=-1)
        print(labels.shape)

        # decode the predictions
        self.output = mu_law_decode(labels, self.quantization_channels)
        sf.write("generated.wav", self.output, sampling_rate)
        print("Finished generating audio")

In [7]:
sr = 16000
ts = 32
qc = 256

preprocess = Preprocess(time_steps=ts, sampling_rate=sr, datapath='/content/drive/MyDrive/guzheng', quantization_channels=qc)
x_tr, x_val, y_tr, y_val = preprocess.create_dataset()

Found 0 mp3 files.
Found 1 .wav files in the directory
Finished loading data
Loaded 1 input data
Finished creating dataset


In [8]:
print(x_tr.shape)
print(x_val.shape)
print(y_tr.shape)
print(y_val.shape)

print(x_tr.dtype)

(2611398, 32)
(652849, 32)
(2611398, 256)
(652849, 256)
float32


In [9]:
model = Wavenet(timesteps=ts, output_dims = 256, quantization_channels=qc)
model(x_tr[:1])
model.model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, 32, 100)           25600     
                                                                 
 conv1d (Conv1D)             (None, 32, 64)            19264     
                                                                 
 dropout (Dropout)           (None, 32, 64)            0         
                                                                 
 max_pooling1d (MaxPooling1D  (None, 16, 64)           0         
 )                                                               
                                                                 
 conv1d_1 (Conv1D)           (None, 16, 128)           24704     
                                                                 
 dropout_1 (Dropout)         (None, 16, 128)           0         
                                                        

In [11]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(x_tr, y_tr, batch_size = 128, epochs=2, validation_data=(x_val, y_val), verbose=1)

Epoch 1/2
Epoch 2/2


In [12]:
history.history

{'loss': [3.4255192279815674, 3.191568374633789],
 'accuracy': [0.08657010644674301, 0.10153795033693314],
 'val_loss': [3.345090866088867, 3.2769808769226074],
 'val_accuracy': [0.10707376152276993, 0.11136725544929504]}

In [18]:
model.generate(generate_time = 10, sampling_rate = sr)

(160000, 256)
(160000,)
Finished generating audio
