In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import fft
import wave
import sys
import os
from scipy.io.wavfile import write
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from scipy.io import wavfile
import scipy.signal as signal
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
from torch.cuda.amp import autocast, GradScaler

In [None]:
path = 'C:/Users/User/Desktop/AppStat/MachineLearning/AppliedML2024/final_project/data/nsynth-valid/audio/'

In [None]:
def read_files_in_dir(directory):
    filenames = os.listdir(directory)
    return filenames
filenames = read_files_in_dir(path)
#pianos = [filename for filename in filenames if "piano" in filename] #empty
bass = [filename for filename in filenames if "bass" in filename]
guitar = [filename for filename in filenames if "guitar" in filename]
#drum = [filename for filename in filenames if "drum" in filename] #empty
flutes = [filename for filename in filenames if "flute" in filename]
keyboards = [filename for filename in filenames if "keyboard" in filename] 
guitar_acoustic = [filename for filename in filenames if "guitar_acoustic" in filename]
flute_acoustic = [filename for filename in filenames if "flute_acoustic" in filename]
bass_electronic = [filename for filename in filenames if "bass_electronic" in filename]

In [None]:
import numpy as np

def audio_to_waveform(audio):
    waveform, sr = librosa.load(audio, sr=None)
    return waveform, sr

def waveform_to_spectogram(waveform):
    if waveform.dtype != np.float32:
        waveform = waveform.astype(np.float32)
    spectrogram = librosa.stft(waveform)
    return np.abs(spectrogram)

def pad_spectrogram(spec, target_shape):
    padded_spec = np.zeros(target_shape,dtype = np.float32)
    min_shape = np.minimum(target_shape, spec.shape)
    padded_spec[:min_shape[0], :min_shape[1]] = spec[:min_shape[0], :min_shape[1]]
    return padded_spec

def spectogram_to_audio(spectrogram, sr,output_wav):
    waveform = librosa.istft(spectrogram)
    waveform = np.nan_to_num(waveform)
    waveform = waveform/np.max(np.abs(waveform))
    return write(output_wav, sr, (waveform*32767).astype(np.int16))


def normalize_spectrogram(spectrogram):
    min_val = np.min(spectrogram)
    max_val = np.max(spectrogram)
    normalized_spectrogram = (spectrogram - min_val) / (max_val - min_val + 1e-6)
    return normalized_spectrogram

def pick_samples_and_classify(arrays):
    #Picks a random number of samples, and returns their filepath and label
    instruments = []
    #pick at minimum two instruments
    number_of_instruments = 2
    #np.random.randint(2, len(arrays) + 1)
    labels = np.zeros(len(arrays))
    already_picked = []

    while len(instruments) < number_of_instruments:
        random_pick = np.random.randint(0, len(arrays))
        if random_pick in already_picked:
            break
        else: 
            already_picked.append(random_pick)
            pick = np.random.choice(arrays[random_pick], 1)
            instruments.append(pick)
            labels[random_pick] = 1

    return instruments, labels

        
        

#read the filenames, and add their data to 5 lists
def add_waveform_to_list(filenames):
    waveforms = []
    for filename in filenames:
        waveform, params = audio_to_waveform(path + filename[0])
        waveforms.append(waveform)
    return waveforms

def find_longest_array(arrays):
    longest = 0
    for array in arrays:
        if len(array) > longest:
            longest = len(array)
    return longest

def combine_waveforms(waveforms):
    normalization = 1 / len(waveforms)
    # changed to be equal to the length of the longest waveform
    out = np.zeros(find_longest_array(waveforms), dtype=np.float32)
    for w in waveforms:
        out += w.astype(np.float32) * normalization
    return out # note, this retuns a float32 array - it is needed to convert this to int16 before saving it to a wav file



In [None]:
def nu_gen_spectro(N, target_shape=(1025, 126)):
    data = []
    labels = []
    original_labels = []
    #instrument_list = [bass, guitar, flutes]
    instrument_list = [bass_electronic, flute_acoustic]

    for i in range(N):
        paths, label = pick_samples_and_classify(instrument_list)
        original_labels.append(label)
        waveforms = add_waveform_to_list(paths)
        mixed_waveform = combine_waveforms(waveforms)
        
        #audio = audio_to_waveform(mixed_waveform)
        mixed_spectro = waveform_to_spectogram(mixed_waveform)
        #mixed_spectro = audio_to_spectrogram(mixed_waveform)
        mixed_spectro_padded = pad_spectrogram(mixed_spectro, target_shape)
        mixed_spectro_normalized = normalize_spectrogram(mixed_spectro_padded)
        
        inter_waveforms = []
        

        inst_i = 0
        for n, i in enumerate(label):
            if i == 1:
                spectro = waveform_to_spectogram(waveforms[inst_i])
                spectro_padded = pad_spectrogram(spectro, target_shape)
                inter_waveforms.append(spectro_padded)
                
                inst_i += 1

            if i == 0:
                inter_waveforms.append(np.zeros(target_shape))

        data.append(mixed_spectro_normalized)
        #inter_waveforms.append(np.zeros(target_shape))
        labels.append(inter_waveforms)
    
    data = np.array(data)
    
    return data, np.array(labels), np.array(original_labels)# remove last line if you want to return only data and labels
    

In [None]:
target_shape = (1025, 126)
N = 2000  # Number of samples


data, labels, original_labels = nu_gen_spectro(N, target_shape)

print(np.shape(data))  # Should be (N, 1025, 128)
print(np.shape(labels))
#print(np.shape(piano_labels))


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split


def reset_cuda_memory():
    # Print memory usage before reset
    print("Before reset:")
    print(f"Allocated memory: {torch.cuda.memory_allocated()}")
    print(f"Reserved memory: {torch.cuda.memory_reserved()}")

    # Reset PyTorch memory allocator
    torch.cuda.reset_peak_memory_stats()
    torch.cuda.reset_accumulated_memory_stats()

    # Clear cache
    torch.cuda.empty_cache()

    # Synchronize to ensure all operations are complete
    torch.cuda.synchronize()

    # Print memory usage after reset
    print("After reset:")
    print(f"Allocated memory: {torch.cuda.memory_allocated()}")
    print(f"Reserved memory: {torch.cuda.memory_reserved()}")

# Call the function to reset CUDA memory
reset_cuda_memory()


In [None]:
# Write a multi-source audio seperation function using a unet model from torch
class AudioDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


class UNet(nn.Module):
    def __init__(self, input_channels=1, output_channels=2):
        super(UNet, self).__init__()

        # Define the contracting/downsampling path
        self.conv1 = nn.Conv2d(input_channels, 8, (3,125), padding=(1,62)) # (3,125), padding=(1,62).
        self.bn1 = nn.BatchNorm2d(8)
        self.conv2 = nn.Conv2d(8, 16, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(16)
        self.dropout = nn.Dropout(0.2)
        self.conv3 = nn.Conv2d(16, 32, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.conv5 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn6 = nn.BatchNorm2d(256)
        self.conv7 = nn.Conv2d(256, 512, 3, padding=1)
        self.bn7 = nn.BatchNorm2d(512)
        self.pool = nn.MaxPool2d(2, 2)

        # Add transpose convolutional layers for upsampling
        self.upconv1 = nn.ConvTranspose2d(512, 256, 2, stride=2)
        self.conv8 = nn.Conv2d(512, 256, 3, padding=1)
        self.upconv2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.conv9 = nn.Conv2d(256, 128, 3, padding=1)
        self.upconv3 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.conv10 = nn.Conv2d(128, 64, 3, padding=1)
        self.upconv4 = nn.ConvTranspose2d(64, 32, 2, stride=2)
        self.conv11 = nn.Conv2d(64, 32, 3, padding=1)
        self.upconv5 = nn.ConvTranspose2d(32, 16, 2, stride=2)
        self.conv12 = nn.Conv2d(32, 16, 3, padding=1)
        self.upconv6 = nn.ConvTranspose2d(16, 8, 2, stride=2)
        self.conv13 = nn.Conv2d(16, 8, 3, padding=1)
        self.final_conv = nn.Conv2d(8, output_channels, 3,padding = 1)

    def forward(self, x):
        # Contracting path
        x = self.dropout(x)
        x1 = nn.functional.relu(self.bn1(self.conv1(x)))
        x2 = self.pool(nn.functional.relu(self.bn2(self.conv2(x1))))
        x3 = self.pool(nn.functional.relu(self.bn3(self.conv3(x2))))
        x4 = self.pool(nn.functional.relu(self.bn4(self.conv4(x3))))
        x5 = self.pool(nn.functional.relu(self.bn5(self.conv5(x4))))
        x6 = self.pool(nn.functional.relu(self.bn6(self.conv6(x5))))
        x7 = self.pool(nn.functional.relu(self.bn7(self.conv7(x6))))
        # Print sizes for debugging
        #print(f"x1 shape: {x1.shape}")
        #print(f"x2 shape: {x2.shape}")
        #print(f"x3 shape: {x3.shape}")
        #print(f"x4 shape: {x4.shape}")
        #print(f"x5 shape: {x5.shape}")
        #print(f"x6 shape: {x6.shape}")
        #print(f"x7 shape: {x7.shape}")
        x = self.upconv1(x7)
        x = F.interpolate(x, size=(x6.size(2), x6.size(3)), mode='nearest')
    
        #print(f"x shape before concatenation: {x.shape}")
        x = torch.cat([x, x6], dim=1)
        #print(f"x shape after concatenation: {x.shape}")
        x = nn.functional.relu(self.conv8(x))

        x = self.upconv2(x)
        x = F.interpolate(x, size=(x5.size(2), x5.size(3)), mode='nearest')
   
        #print(f"x shape before concatenation: {x.shape}")
        x = torch.cat([x, x5], dim=1)
        #print(f"x shape after concatenation: {x.shape}")
        x = nn.functional.relu(self.conv9(x))

        x = self.upconv3(x)
        x = F.interpolate(x, size=(x4.size(2), x4.size(3)), mode='nearest')
        #print(f"x shape before concatenation: {x.shape}")
        x = torch.cat([x, x4], dim=1)
        #print(f"x shape before concatenation: {x.shape}")
        x = nn.functional.relu(self.conv10(x))
        
        x = self.upconv4(x)
        x = F.interpolate(x, size=(x3.size(2), x3.size(3)), mode='nearest')
        #print(f"x2_interp shape: {x3.shape}")
        #print(f"x shape before concatenation: {x.shape}")
    
        x = torch.cat([x, x3], dim=1)
        #print(f"x shape before concatenation: {x.shape}")
        x = nn.functional.relu(self.conv11(x))

        x = self.upconv5(x)
        x = F.interpolate(x, size=(x2.size(2), x2.size(3)), mode='nearest')
        #print(f"x shape before concatenation: {x.shape}")
        x = torch.cat([x, x2], dim=1)
        #print(f"x shape before concatenation: {x.shape}")
        x = nn.functional.relu(self.conv12(x))


        x = self.upconv6(x)
        #print(f'upconv6 shape: {x.shape}')
        x = F.interpolate(x, size=(x1.size(2), x1.size(3)), mode='nearest')
        x = torch.cat([x, x1], dim=1)
        x = F.relu(self.conv13(x))
        
        x = self.final_conv(x)

        return x


In [None]:
# Split the data into training and testing sets
#data = data[..., np.newaxis]
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)
print(y_train.shape)
X_train = np.expand_dims(X_train, axis=1)
X_test = np.expand_dims(X_test, axis=1)
# Set all values in x_train and x_test to be 0 for values lower than e-2
X_train[X_train < 1e-2] = 0
X_test[X_test < 1e-2] = 0
# Create the datasets and dataloaders
train_dataset = AudioDataset(X_train, y_train)
test_dataset = AudioDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=60, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=60, shuffle=False)
#print(train_dataset[0][0].shape)

# Get a batch of data
inputs, targets = next(iter(train_loader))
# Print the shape of the inputs
print(inputs.shape)

In [None]:
def torch_normalize(spectogram):
    min_val = torch.min(spectogram)
    max_val = torch.max(spectogram)
    normalized_spectogram = (spectogram - min_val) / (max_val - min_val)
    return normalized_spectogram

In [None]:
class MSELoss(nn.Module):
    def __init__(self):
        super(MSELoss, self).__init__()

    def forward(self, y_pred, y_true,X):
        #print(y_pred)
        l1_loss0 = F.mse_loss(y_pred[:,0,:], y_true[:,0,:])
        l1_loss1 = F.mse_loss(y_pred[:,1,:], y_true[:,1,:])
        #l1_loss2 = F.mse_loss(y_pred[:,2,:], y_true[:,2,:])
        #y_pred = torch.clamp(y_pred, min=0)
        sq_X = torch.squeeze(X)
        summed_spectogram = torch.sum(y_pred, dim=1)
        diff = torch.abs(torch_normalize(summed_spectogram) - torch_normalize(sq_X))
        regulator = 1000
        penalty = torch.mean(diff)
        #print(penalty)
        l1_loss = torch.mean(l1_loss0 + l1_loss1 ) + regulator*penalty
        return l1_loss

In [None]:
import torch.nn as nn
import torch.nn.functional as F
class SDRLoss(nn.Module):
    def __init__(self, smoothness_lambda=0.1, deviation_lambda=0.1,diversity_lambda = 1):
        super(SDRLoss, self).__init__()
        self.smoothness_lambda = smoothness_lambda
        self.deviation_lambda = deviation_lambda
        self.diversity_lambda = diversity_lambda
    def forward(self, y_pred, y_true, X, model_parameters=None):

        #y_true = torch_normalize(y_true)
        #y_pred = torch_normalize(y_pred)
        delta = 1e-7  # avoid numerical errors
        num = torch.sum(torch.square(torch.squeeze(y_true)), dim=(2, 3))
        den = torch.sum(torch.square(torch.squeeze(y_true - y_pred)), dim=(2, 3))
        num += delta
        den += delta
        scores = 10 * torch.log10(num / den)



        sq_X = torch.squeeze(X)
        summed_spectogram = torch.sum(y_pred, dim=1)
        diff = torch.abs(summed_spectogram - sq_X)

        diversity_penalty = 0
        num_sources = y_pred.size(1)
        for i in range(num_sources):
            for j in range(i + 1, num_sources):
                similarity = torch.norm(y_pred[:, i, :, :] - y_pred[:, j, :, :], p=2).mean()
                diversity_penalty += 1 / (similarity + 1e-7)  # Adding a small constant to avoid division by zero
        diversity_penalty *= self.diversity_lambda

        penalty_positive  = 0
        for i in range(y_true.size(0)):
            for j in range(y_true.size(1)):
                mask = torch.gt(y_true[i, j, :, :], 0.0)
                num_nonzero = torch.sum(mask)
                if num_nonzero != 0:
                    penalty_positive += torch.sum(torch.square(y_pred[i, j, :, :][mask] - y_true[i, j, :, :][mask])) / num_nonzero
                    continue




        # Calculate penalty based on the minimum negative SDR score
        negative_scores = torch.clamp(scores, max=0)
        penalty_negative = -torch.sum(negative_scores)


        total_penalty = (
            diversity_penalty +
            torch.mean(diff) * 1 + 0.1 * penalty_positive +  0.1*penalty_negative) # +self.smoothness_lambda * pit_loss_value)   
        
        
        
        print(f"Loss : {-torch.mean(scores) + total_penalty}, SDR : {torch.mean(scores)}, Penalty : {total_penalty}", end = '\r')
        return -torch.mean(scores) + total_penalty



In [None]:
# Initialize the model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_channels = X_train.shape[1]  # Get the number of input channels from the data
print(input_channels)
scaler = torch.cuda.amp.GradScaler()
model = UNet(input_channels=input_channels,output_channels=2).to(device, dtype=torch.float32)


criterion = MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
#print(model)
# Train the model

num_epochs = 300
best_loss = np.inf
patience = 0
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for X, y in train_loader:
        X, y = X.to(device, dtype=torch.float32), y.to(device, dtype=torch.float32)
        optimizer.zero_grad() 
        with torch.cuda.amp.autocast():
            y_pred = model(X)
            loss = criterion(y_pred, y, X)
        #print(f"Shape of x : {X.shape},Shape of y {y.shape}")
        
        # Scales the loss, calls backward(), and unscales the gradients
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        # Unscales the gradients and updates the parameters
        scaler.step(optimizer)
        scaler.update()
        running_loss += loss.item() * X.size(0) # Accumulate loss
        #print(f"Loss: {loss.item()}")
    val_loss = 0.0
    model.eval()
    #Implement early stopping
    with torch.no_grad():
        for X, y in test_loader:
            X, y = X.to(device, dtype=torch.float32), y.to(device, dtype=torch.float32)
            with torch.cuda.amp.autocast():
                y_pred = model(X)
                loss = criterion(y_pred, y, X)
            
            val_loss += 4*loss.item() * X.size(0)
    if val_loss < best_loss:
        best_loss = val_loss
        best_model = model.state_dict()
        patience = 0
    else:
        patience += 1
    if patience > 5:
        print(f'Early stopping at epoch {epoch}')
        break
    

    epoch_loss = running_loss
    print(f'Epoch {epoch + 1}, Loss: {epoch_loss:.4f}')
    print(f'Validation Loss: {val_loss:.4f}')
   
torch.save(model.state_dict(), 'UNET_SourceSeperation_Simple.pth')


In [None]:
torch.save(model.state_dict(), 'UNET_SourceSeperation_Simple.pth')

In [None]:
#Generate a random waveform
instrument_list = [bass_electronic, flute_acoustic]
#Pick a random sample of each instrument
filepaths, labels = pick_samples_and_classify(instrument_list)
print(filepaths)
#Extract .wav data into to a list using librosa
waveforms = add_waveform_to_list(filepaths)

#Combine the waveforms
combined_waveform = combine_waveforms(waveforms)

# Convert the combined waveform to a spectrogram
spectrogram = waveform_to_spectogram(combined_waveform)

# Set all values lower than e-2 to 0
spectogram_0 = np.where(spectrogram < 1e-2, 0, spectrogram)
# Normalize the spectrogram
normalized_spectrogram = normalize_spectrogram(spectrogram)
normalized_spectrogram_0 = normalize_spectrogram(spectogram_0)
# Convert the normalized spectrogram back to audio and save it
spectogram_to_audio(spectogram_0, 16000, "combo1.wav")
spectogram_to_audio(spectrogram, 16000, "combo2.wav")

# Function to plot spectrogram using librosa
def plot_spectrogram(waveform, sr, title):
    # Compute the STFT
    D = librosa.stft(waveform)
    # Convert to magnitude
    D_mag = np.abs(D)
    print(D_mag.shape)
    # Plot the spectrogram
    plt.figure(figsize=(10, 6))
    librosa.display.specshow(librosa.amplitude_to_db(D_mag, ref=np.max), sr=sr, x_axis='time', y_axis='log')
    plt.colorbar(format='%+2.0f dB')
    plt.title(title)
    plt.ylabel('Frequency [Hz]')
    plt.xlabel('Time [sec]')
    plt.show()

# Plot the spectrograms for the individual waveforms
sr = 16000  # Assuming a sample rate of 16kHz

plot_spectrogram(waveforms[0], sr, 'Spectrogram of First Waveform')
plot_spectrogram(waveforms[1], sr, 'Spectrogram of Second Waveform')

# If you want to plot the spectrogram of the combined waveform as well
plot_spectrogram(combined_waveform, sr, 'Spectrogram of Combined Waveform')

In [None]:
import torch
import numpy as np
import scipy.signal as signal
import wave
import soundfile as sf
import librosa
# Initialize and load the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(input_channels=1,output_channels=2).to(device, dtype=torch.float32)
model.load_state_dict(torch.load('UNET_SourceSeperation_Simple.pth', map_location=device))
model.eval()


with wave.open('combo2.wav', 'rb') as wf:
    input_audio = wf.readframes(-1)
    input_sr = wf.getframerate()

#Load the combined audio waveform
combined_waveform, input_sr = librosa.load('combo2.wav', sr=None)

# Compute spectrogram using scipy signal
#f, t, Zxx = signal.spectrogram(combined_waveform, fs=input_sr, nperseg=256)
spectrogram = waveform_to_spectogram(combined_waveform)
#print(spectrogram.shape)
# Convert spectrogram to PyTorch tensor
input_tensor = torch.tensor(spectrogram, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device, dtype=torch.float32)  # Add batch and channel dimensions
#print(input_tensor[0])
# Pass input through the model to get predictions
with torch.no_grad():
    output_tensor = model(input_tensor)
# Renormalize the output tensor
def plot_separated_spectrograms(output_tensor, sr):
    for i, spec in enumerate(output_tensor):
        plt.figure(figsize=(15, 5))
        librosa.display.specshow(librosa.amplitude_to_db(spec, ref=np.max), sr=sr, x_axis='time', y_axis='log')
        plt.colorbar(format='%+2.0f dB')
        plt.ylabel('Frequency [Hz]')
        plt.xlabel('Time [sec]')
        plt.title(f'Separated Spectrogram {i + 1}')
        plt.show()
        
# Convert output tensor to numpy array and save as audio files
output_tensor = output_tensor.squeeze().cpu().numpy()
#print(output_tensor[0])
# Plot the separated spectrograms
plot_separated_spectrograms(output_tensor, input_sr)
output_folder = "output_audio"
#print(output_tensor)
# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Convert each source back from spectrogram to audio and save
for i, source in enumerate(output_tensor):
    # Convert the spectrogram back to audio

    # Save as .wav file
    output_filepath = os.path.join(output_folder, f'source_{i}.wav')
    spectogram_to_audio(source, input_sr, output_filepath)
    print(f"Saved {output_filepath}")

