In [2]:
import os
import random
import librosa
import librosa.display
import matplotlib.pyplot as plt
import IPython.display as ipd
from scipy.signal import butter, lfilter
import numpy as np
import pandas as pd
from IPython.display import Audio, display
import numpy as np
import librosa
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras import Input, Model
from sklearn.model_selection import train_test_split
import csv
import torch
from tqdm import tqdm
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping


# GET SAMPLE

In [3]:
num_of_samples = 10

In [4]:
# Function to get all .wav files from a directory
def get_wav_files_from_folder(path):
    return [os.path.join(path, file) for file in os.listdir(path) if file.endswith('.wav')]

# Function to load and display a .wav file
def load_and_display_wav(file_path, num_of_samples):
    try:
        # Load the audio file using librosa
        audio_data, sample_rate = librosa.load(file_path, sr=None)

        # Plot the waveform
        plt.figure(figsize=(10, 4))
        librosa.display.waveshow(audio_data, sr=sample_rate)
        plt.title(f'Waveform of {os.path.basename(file_path)}')
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')
        plt.tight_layout()
        plt.show()
        return audio_data, sample_rate
    except Exception as e:
        print(f"Error loading or displaying {file_path}: {e}")

# Function to randomly select files from the lists
def select_random_files(real_files, fake_files, num_real, num_fake):
    if not real_files:
        print("No real files found!")
    if not fake_files:
        print("No fake files found!")

    # Select random files from the real and fake lists
    selected_real_files = random.sample(real_files, min(num_real, len(real_files)))
    selected_fake_files = random.sample(fake_files, min(num_fake, len(fake_files)))

    return selected_real_files, selected_fake_files

# Paths to the real and fake directories (replace with your actual paths)
real_path = "./content/LibriSeVoc/diffwave"
fake_path = "./content/LibriSeVoc/gt"

# Load the lists of .wav files from each directory
real_files = get_wav_files_from_folder(real_path)
fake_files = get_wav_files_from_folder(fake_path)

# Get lists of 10 random real and fake file paths
random_real_files, random_fake_files = select_random_files(real_files, fake_files, num_real=num_of_samples, num_fake=num_of_samples)

Real_Audio = []
Fake_Audio = []

# # Display the real files
# print("Displaying random real files:")
# for file in random_real_files:
#     Real_Audio.append(load_and_display_wav(file, num_of_samples))

# # Display the fake files
# print("Displaying random fake files:")
# for file in random_fake_files:
#     Fake_Audio.append(load_and_display_wav(file, num_of_samples))

# print(len(Real_Audio))
# print(len(Fake_Audio))

In [5]:
def play_audio(audio_data_list):
    for audio_data, sample_rate in audio_data_list:
        print(f"Playing audio with sample rate: {sample_rate} Hz")
        ipd.display(ipd.Audio(data=audio_data, rate=sample_rate))

print("Real Audio")
play_audio(Real_Audio)
print("Fake Audio")
play_audio(Fake_Audio)

Real Audio
Fake Audio


# *PREPROCESSING FUNCTION*

In [6]:
# Check if MPS (Metal Performance Shaders) is available and set the device accordingly
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: mps


In [7]:
import torch
from scipy.signal import butter, lfilter

# Check MPS availability and set device
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")

def bandpass_filter(y, sr, lowcut=250, highcut=4000, order=5):
    """
    Applies a bandpass filter to an audio signal.

    Args:
        y (torch.Tensor): The audio signal as a PyTorch tensor.
        sr (int): The sample rate of the audio signal.
        lowcut (int, optional): The lower cutoff frequency. Defaults to 250.
        highcut (int, optional): The upper cutoff frequency. Defaults to 4000.
        order (int, optional): The order of the filter. Defaults to 5.

    Returns:
        torch.Tensor: The filtered audio signal as a PyTorch tensor.
    """
    # Perform the filtering (this part uses scipy and will run on the CPU)
    nyq = 0.5 * sr
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    y_filtered = lfilter(b, a, y)  # Move to CPU for scipy

    # Move the filtered signal back to the original device
    return torch.tensor(y_filtered, dtype=y.dtype).to(device)

Using device: mps


In [8]:
import torch

def decrease_low_db(y, sr, threshold_db=-50, target_db=-80):
    """
    Giảm độ lớn của các mẫu âm thanh dưới ngưỡng dB cho trước đến độ to mong muốn,
    giữ nguyên thời gian của tín hiệu âm thanh.

    :param y: Tín hiệu âm thanh (tensor)
    :param sr: Tần số lấy mẫu (Hz)
    :param threshold_db: Ngưỡng dB để xác định các mẫu cần giảm độ lớn (ví dụ: -40 dB)
    :param target_db: Độ to mong muốn cho các mẫu dưới ngưỡng (ví dụ: -80 dB)
    :return: Tín hiệu đã được điều chỉnh (tensor)
    """
    # Calculate the absolute amplitude of the signal
    abs_y = torch.abs(y)

    # Calculate the reference amplitude (maximum amplitude)
    ref_amplitude = torch.max(abs_y) if torch.max(abs_y) > 0 else torch.tensor(1.0, dtype=torch.float32).to(device)

    # Calculate the dB level of each sample relative to the reference amplitude
    y_db = 20 * torch.log10(abs_y / ref_amplitude + 1e-10)  # Add epsilon to avoid log(0)

    # Create a mask for samples below the dB threshold
    mask = y_db < threshold_db

    # Calculate the desired amplitude for samples below the dB threshold
    desired_amplitude = 10 ** (target_db / 20) * ref_amplitude  # Example: -80 dB

    # Create a copy of the signal to adjust
    y_adjusted = y.clone()

    # Reduce the amplitude of samples below the dB threshold
    # Avoid division by zero by adding epsilon
    y_adjusted[mask] = y_adjusted[mask] / (abs_y[mask] + 1e-10) * desired_amplitude

    return y_adjusted  # Convert back to numpy array if needed

# SHOW PROCESSED INSTANCES


In [9]:
# Filtered_Real_Audio = []
# for audio_data, sample_rate in Real_Audio:
#     filtered_audio = bandpass_filter(audio_data, sample_rate, lowcut=250, highcut=4000)
#     final_audio = decrease_low_db(filtered_audio, sample_rate)
#     Filtered_Real_Audio.append(final_audio)

# Filtered_Fake_Audio = []
# for audio_data, sample_rate in Fake_Audio:
#     filtered_audio = bandpass_filter(audio_data, sample_rate, lowcut=250, highcut=4000)
#     filtered_audio = decrease_low_db(filtered_audio, sample_rate)
#     Filtered_Fake_Audio.append(filtered_audio)


In [10]:

# def play_and_show_wave_spectrogram(audio_data, sample_rate, title):
#     """Plays audio, displays waveform, and spectrogram."""
#     ipd.display(ipd.Audio(data=audio_data, rate=sample_rate))  # Play audio

#     # Display waveform
#     # plt.figure(figsize=(10, 4))
#     # librosa.display.waveshow(audio_data, sr=sample_rate)
#     # plt.title(f"{title} - Waveform")
#     # plt.xlabel("Time (s)")
#     # plt.ylabel("Amplitude")
#     # plt.tight_layout()
#     # plt.show()

#     # Display spectrogram
#     # plt.figure(figsize=(10, 4))
#     # D = librosa.amplitude_to_db(np.abs(librosa.stft(audio_data)), ref=np.max)
#     # librosa.display.specshow(D, sr=sample_rate, x_axis='time', y_axis='log')
#     # plt.colorbar(format='%+2.0f dB')
#     # plt.title(f"{title} - Spectrogram")
#     # plt.tight_layout()
#     # plt.show()

# # Play, display waveform, and spectrogram for Filtered_Real_Audio
# print("Real Audio Display")
# for i, audio_data in enumerate(Filtered_Real_Audio):
#     play_and_show_wave_spectrogram(audio_data, Real_Audio[i][1], f"Filtered Real Audio {i+1}")

# # Play, display waveform, and spectrogram for Filtered_Fake_Audio
# print("Fake Audio Display")
# for i, audio_data in enumerate(Filtered_Fake_Audio):
#     play_and_show_wave_spectrogram(audio_data, Fake_Audio[i][1], f"Filtered Fake Audio {i+1}")

# TRAIN TEST SPLIT

In [11]:
# Define the main data directory

# List to hold file paths
train_file_paths = []
test_file_paths = []

label_dict = {
    fake_path: 0,
    real_path: 1
}

# Split ratio
train_ratio = 0.8

# Split files in each class directory
for class_dir in [fake_path, real_path]:

    # Get all file paths for the class
    all_files = [os.path.join(class_dir, f) for f in os.listdir(class_dir) if f.endswith('.wav')]

    # Split into train and test sets
    train_files, test_files = train_test_split(all_files, train_size=train_ratio, random_state=42)

    # Append to the respective lists with corresponding labels (class)
    for file_path in train_files:
        train_file_paths.append((file_path, label_dict[class_dir]))  # Store path and label
    for file_path in test_files:
        test_file_paths.append((file_path, label_dict[class_dir]))

print(f"Train file paths: {len(train_file_paths)}")
print(f"Test file paths: {len(test_file_paths)}")


Train file paths: 21120
Test file paths: 5282


In [12]:

# File paths to save the CSVs
train_csv = 'output/train_file_paths.csv'
test_csv = 'output/test_file_paths.csv'

# Save train_file_paths to CSV
with open(train_csv, mode='w', newline='') as train_file:
    writer = csv.writer(train_file)
    writer.writerow(['file_path', 'label'])  # Write the header
    for file_path, label in train_file_paths:
        writer.writerow([file_path, label])  # Write the file path and label

# Save test_file_paths to CSV
with open(test_csv, mode='w', newline='') as test_file:
    writer = csv.writer(test_file)
    writer.writerow(['file_path', 'label'])  # Write the header
    for file_path, label in test_file_paths:
        writer.writerow([file_path, label])  # Write the file path and label

print(f"Train file paths saved to {train_csv}")
print(f"Test file paths saved to {test_csv}")


Train file paths saved to output/train_file_paths.csv
Test file paths saved to output/test_file_paths.csv


# DATA PROCESSING

In [13]:
SEGMENT_LENGTH = 1
NUM_SEGMENT = 30
SR = 24000
BATCH_SIZE = 32
LR = 0.0001
EPOCHS = 30

In [14]:
# Check if MPS (Metal Performance Shaders) is available and set the device accordingly
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: mps


In [15]:
import torch
import torchaudio
import torchaudio.transforms as T

# Check MPS availability and set device
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
print(f"Using device: {device}")

def segment_to_spectrogram(segment, sr=24000, n_fft=2048, hop_length=512, n_mels=128):
    """
    Extracts a Mel spectrogram from an audio segment, ensuring execution on the MPS GPU if available.

    Args:
        segment (torch.Tensor): The audio segment as a PyTorch tensor.
        sr (int, optional): The sample rate of the audio segment. Defaults to 24000.
        n_fft (int, optional): The size of the FFT. Defaults to 2048.
        hop_length (int, optional): The hop length for the STFT. Defaults to 512.
        n_mels (int, optional): The number of Mel filterbanks. Defaults to 128.

    Returns:
        torch.Tensor: The Mel spectrogram in decibels (dB).
    """

    # Create the MelSpectrogram transform
    mel_spectrogram = T.MelSpectrogram(
        sample_rate=sr,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels
    ).to(device)  # Ensure the transform is also on the correct device

    # Apply the MelSpectrogram transform
    mel_spectrogram = mel_spectrogram(segment)

    # Convert to decibels (dB)
    spectrogram_db = T.AmplitudeToDB().to(device)  # Move AmplitudeToDB to the device
    spectrogram_db = spectrogram_db(mel_spectrogram)

    return spectrogram_db

Using device: mps


In [16]:
import torchaudio
import torch

import torchaudio.transforms as T

def extract_segments(audio_file, segment_length=SEGMENT_LENGTH, num_segments=NUM_SEGMENT):
    # Load audio file using torchaudio
    waveform, sr = torchaudio.load(audio_file)
    waveform = bandpass_filter(waveform, sr, lowcut=250, highcut=4000)
    waveform = torch.tensor(waveform, dtype=torch.float32).to(device)
    waveform = decrease_low_db(waveform, sr)

    # Resample if necessary
    if sr != SR:
        resampler = T.Resample(orig_freq=sr, new_freq=SR)
        waveform = resampler(waveform)
        sr = SR

    # Calculate the total duration in seconds
    total_duration = waveform.shape[1] / sr

    # Calculate the overlap to ensure exactly num_segments
    overlap = (total_duration - segment_length) / (num_segments - 1)

    # Convert segment length and overlap to samples
    segment_samples = int(segment_length * sr)
    overlap_samples = int(overlap * sr)

    # Extract the segments
    segments = []
    for i in range(num_segments):
        start_sample = i * overlap_samples
        end_sample = start_sample + segment_samples
        segment = waveform[:, start_sample:end_sample]
        spectrogram = segment_to_spectrogram(segment)
        segments.append(spectrogram)

    return segments

# Example usage
audio_file = './content/LibriSeVoc/gt/19_227_000003_000000.wav'
segments = extract_segments(audio_file)
print(f"Extracted {len(segments)} segments")

  waveform = torch.tensor(waveform, dtype=torch.float32).to(device)


Extracted 30 segments


In [17]:
# Load the CSV containing train and validation file paths and labels
train_csv = './output/train_file_paths.csv'  # Path to the train data CSV

train_data = pd.read_csv(train_csv)

train_data_head = train_data.head(1000)
train_data_tail = train_data.tail(1000)
train_data_tail = train_data_tail.reset_index(drop=True)
train_data_head = train_data_head.reset_index(drop=True)


# Concatenate the head and tail along rows (axis=0)
demo_train_data = pd.concat([train_data_head, train_data_tail], axis=0)
demo_train_data = demo_train_data.reset_index(drop=True)

# Display the merged data
print(demo_train_data)


                                              file_path  label
0     ./content/LibriSeVoc/gt/60_121082_000096_00000...      0
1     ./content/LibriSeVoc/gt/8312_279790_000004_000...      0
2     ./content/LibriSeVoc/gt/3168_173564_000017_000...      0
3     ./content/LibriSeVoc/gt/7302_86814_000053_0000...      0
4     ./content/LibriSeVoc/gt/4813_248638_000012_000...      0
...                                                 ...    ...
1995  ./content/LibriSeVoc/diffwave/1116_132851_0000...      1
1996  ./content/LibriSeVoc/diffwave/87_121553_000086...      1
1997  ./content/LibriSeVoc/diffwave/2002_139469_0000...      1
1998  ./content/LibriSeVoc/diffwave/1088_129236_0000...      1
1999  ./content/LibriSeVoc/diffwave/3168_173565_0000...      1

[2000 rows x 2 columns]


In [18]:
SAVE_PATH = './output/train_data_checkpoint/train_data_partial'
SAVE_INTERVAL = 100

def load_data(data, segment_length=SEGMENT_LENGTH, num_segments=NUM_SEGMENT, save_interval=SAVE_INTERVAL, save_path=SAVE_PATH, start_index=0):
    segments = []
    labels = []
    # Check for existing partial files
    partial_files = [f for f in os.listdir(os.path.dirname(save_path)) if f.startswith(os.path.basename(save_path)) and f.endswith('.pt')]
    num_partial_files = len(partial_files)
    if start_index == 0:
        start_index = num_partial_files * save_interval
    else:
        start_index = start_index * save_interval
    print(f"Resuming from index {start_index}")

    if num_partial_files * save_interval >= len(data): # Check if all files have been processed
        print("Already finished processing. Merging.")
        return
    else:
        print("Continuing processing from the last checkpoint.")

    for idx, (_, row) in tqdm(enumerate(data.iterrows()), total=len(data)):
        if idx < start_index:
            continue

        file_path = row['file_path']
        label = row['label']

        # Extract segments from the audio file
        file_segments = extract_segments(file_path, segment_length, num_segments)

        # Stack the segments and convert to torch tensor
        file_segments_stacked = torch.stack([torch.tensor(segment, dtype=torch.float32) for segment in file_segments])

        # Append the stacked segments and their corresponding labels
        segments.append(file_segments_stacked)  # Append the 30 segments as a single element
        labels.append(label)  # Append the label only once per file

        # Save progress every save_interval
        if (idx + 1) % save_interval == 0 or (idx + 1) == len(data):
            partial_path = f"{save_path}_{int((idx + 1))}.pt"
            torch.save({'segments': segments, 'labels': labels}, partial_path)
            print(f"{len(segments)} saved at index {idx + 1} to {partial_path}")  # More informative print
            print({'segments': segments[0], 'labels': labels[0]}) # Check the saved data
            segments = []
            labels = []

    print("All segments saved individually.")

def load_saved_segments(data, save_path=SAVE_PATH):
    all_segments = []
    all_labels = []

    # Load all partial files in the SAVE_PATH subdir
    partial_files = [f for f in os.listdir(os.path.dirname(save_path)) if f.startswith(os.path.basename(save_path)) and f.endswith('.pt')]
    for partial_file in partial_files:
        print(f"Loading {partial_file}")
        partial_data = torch.load(os.path.join(os.path.dirname(save_path), partial_file))
        print(f"Loaded {len(partial_data['segments'])} segments from {partial_file}")
        all_segments.extend(partial_data['segments'])  # Extend directly with the list of stacked segments
        all_labels.extend(partial_data['labels'])  # Extend the labels list
    print("Finished loading, stacking loaded tensors...")
    return torch.stack([segment.cpu() for segment in all_segments]), torch.tensor(all_labels, dtype=torch.float32).cpu()


if os.path.exists(SAVE_PATH + '.pt'):
    # Load train and validation data
    data = torch.load(SAVE_PATH + '.pt')
    train_segments = data['segments']
    print(f"Loaded {len(train_segments)} segments")
    train_labels = data['labels']
else:
    print("Loading and saving train data: ", len(train_data))
    # Load the train and validation data
    load_data(train_data)  # Save segments individually
    train_segments, train_labels = load_saved_segments(train_data)  # Load and combine saved segments
    print("Completed loading train data. Saving...")
    # Save the train and validation data
    torch.save({'segments': train_segments, 'labels': train_labels}, f'{SAVE_PATH}.pt')

train_segments = np.array([segment.cpu().numpy() for segment in train_segments])
train_segments = np.transpose(train_segments, (0, 1, 3, 4, 2))
train_labels = train_labels.cpu().numpy().astype(int)
print(f"Train segments shape: {train_segments.shape}, Train labels shape: {train_labels.shape}")

Loading and saving train data:  21120
Resuming from index 21200
Already finished processing. Merging.
Loading train_data_partial_19700.pt
Loaded 100 segments from train_data_partial_19700.pt
Loading train_data_partial_17800.pt
Loaded 100 segments from train_data_partial_17800.pt
Loading train_data_partial_8600.pt


  partial_data = torch.load(os.path.join(os.path.dirname(save_path), partial_file))


Loaded 100 segments from train_data_partial_8600.pt
Loading train_data_partial_20600.pt
Loaded 100 segments from train_data_partial_20600.pt
Loading train_data_partial_6900.pt
Loaded 100 segments from train_data_partial_6900.pt
Loading train_data_partial_13900.pt
Loaded 100 segments from train_data_partial_13900.pt
Loading train_data_partial_21000.pt
Loaded 100 segments from train_data_partial_21000.pt
Loading train_data_partial_9000.pt
Loaded 100 segments from train_data_partial_9000.pt
Loading train_data_partial_18100.pt
Loaded 100 segments from train_data_partial_18100.pt
Loading train_data_partial_2800.pt
Loaded 100 segments from train_data_partial_2800.pt
Loading train_data_partial_11500.pt
Loaded 100 segments from train_data_partial_11500.pt
Loading train_data_partial_5300.pt
Loaded 100 segments from train_data_partial_5300.pt
Loading train_data_partial_14200.pt
Loaded 100 segments from train_data_partial_14200.pt
Loading train_data_partial_15400.pt
Loaded 100 segments from train