In [None]:
import os
from pydub import AudioSegment
from tqdm.notebook import tqdm
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import random
import string
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.regularizers import l2  # Add this line



In [None]:


#convert from opus to wav
def convert_opus_to_wav(directory):
    for root, dirs, files in tqdm(os.walk(directory)):
        for file in files:
            if file.endswith(".opus"):
                opus_path = os.path.join(root, file)
                wav_path = os.path.splitext(opus_path)[0] + ".wav"
                audio = AudioSegment.from_file(opus_path, format="opus")
                audio.export(wav_path, format="wav")

#resample wav files to 16kHz
def resample_wav_files(directory):
    for root, dirs, files in tqdm(os.walk(directory)):
        for file in files:
            if file.endswith(".wav"):
                wav_path = os.path.join(root, file)
                audio = AudioSegment.from_file(wav_path, format="wav")
                audio = audio.set_frame_rate(16000)
                audio.export(wav_path, format="wav")

#Change .wav subtype to PCM_16
def change_wav_subtype(directory):
    for root, dirs, files in tqdm(os.walk(directory)):
        for file in files:
            if file.endswith(".wav"):
                wav_path = os.path.join(root, file)
                audio = AudioSegment.from_file(wav_path, format="wav")
                audio.export(wav_path, format="wav", subtype="PCM_16")


def process_wav_duration(directory):
    for root, dirs, files in tqdm(os.walk(directory)):
        for file in files:
            if file.endswith(".wav"):
                wav_path = os.path.join(root, file)
                audio = AudioSegment.from_file(wav_path, format="wav")
                duration = len(audio) / 1000  # duration in seconds

                if duration < 1 and duration != 0:
                    audio = audio + AudioSegment.silent(duration=1000 - duration * 1000)
                elif duration > 1:
                    audio = audio[:1000]

                audio.export(wav_path, format="wav")

#split audio into 1 second chunks
def split_audio_chunks(file_path, output_dir, chunk_length_ms=1000):
    audio = AudioSegment.from_file(file_path)

    # Calculate the number of chunks
    num_chunks = len(audio) // chunk_length_ms

    # Split the audio and save each chunk
    for i in range(num_chunks):
        chunk = audio[i*chunk_length_ms:(i+1)*chunk_length_ms]

        # Only save the chunk if it's at least 1 second long
        if len(chunk) >= chunk_length_ms:
            chunk_path = os.path.join(output_dir, f"{os.path.basename(file_path).rsplit('.', 1)[0]}_chunk{i}.wav")
            chunk.export(chunk_path, format="wav")
            print(f"Saved chunk to: {chunk_path}")

def process_directory_chunks(input_directory, output_directory):
    for root, dirs, files in tqdm(os.walk(input_directory)):
        for file in files:
            if file.endswith(".wav"):
                file_path = os.path.join(root, file)
                output_dir = os.path.join(output_directory, os.path.basename(file_path).rsplit('.', 1)[0])

                # Create the output directory if it doesn't exist
                os.makedirs(output_dir, exist_ok=True)

                split_audio_chunks(file_path, output_dir)


def overlay_random_wav(source_dir_1, source_dir_2, target_dir, volume_dB=-20, repeat=None):
    skipped_files = 0
    wav_files_1 = []
    wav_files_2 = []

    # Collect all .wav files in the source directories and their subdirectories
    for root, dirs, files in os.walk(source_dir_1):
        for file in files:
            if file.endswith('.wav'):
                wav_files_1.append(os.path.join(root, file))

    for root, dirs, files in os.walk(source_dir_2):
        for file in files:
            if file.endswith('.wav'):
                wav_files_2.append(os.path.join(root, file))

    # Process each file in the first directory
    for file_1 in tqdm(wav_files_1, desc="Processing files"):
        # Load the file
        audio_1 = AudioSegment.from_file(file_1)

        # Check if the file is exactly 1 second long
        if len(audio_1) != 1000:
            print(f"Skipped file: {file_1} (length is not 1 second)")
            skipped_files += 1
            continue

        # Repeat for the specified number of times, or once if no repeat count is provided
        for i in range(repeat if repeat is not None else 1):
            # Pick a random file from the second directory
            while True:
                file_2 = random.choice(wav_files_2)
                audio_2 = AudioSegment.from_file(file_2)

                # Check if the random file is exactly 1 second long
                if len(audio_2) == 1000:
                    break

            # Mix the random file into the first file at the specified volume
            mixed_audio = audio_1.overlay(audio_2 + volume_dB)

            # Save the mixed audio to the target directory
            relative_path = os.path.relpath(file_1, source_dir_1)
            target_file = os.path.join(target_dir, relative_path)

            # If repeating, append the repeat count to the filename
            if repeat is not None:
                target_file = f"{os.path.splitext(target_file)[0]}_{i}{os.path.splitext(target_file)[1]}"

            os.makedirs(os.path.dirname(target_file), exist_ok=True)
            mixed_audio.export(target_file, format="wav")

    return skipped_files


def create_spectrogram(audio_path, save_path):
    # Load the audio file
    y, sr = librosa.load(audio_path, sr=None)

    # Generate a spectrogram
    S = librosa.feature.melspectrogram(y=y, sr=sr)

    # Save the spectrogram as a numpy array
    np.save(save_path, S)

def process_dir_spectogram(input_dir, output_dir):
    # Iterate over all files in input_dir and its subdirectories
    for root, dirs, files in tqdm(os.walk(input_dir)):
        for file in files:
            # Check if the file is a .wav file
            if file.endswith('.wav'):
                # Compute the full path to the file
                file_path =  os.path.join(root, file)
        
                # Generate a random 16 digit name
                random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
                
                # Compute the output path
                output_path = os.path.join(output_dir, random_name + '.npy')
                
                # Create a spectrogram and save it to the output path
                create_spectrogram(file_path, output_path)
#

In [None]:
wake_word_dir = ""
not_wake_word_dir = ""

background_noise_dir = ""
background_noise_chunks_dir = ""

wake_word_with_noise_dir = ""
not_wake_word_with_noise_dir = ""

wake_word_spectrogram_dir = ""
not_wake_word_spectrogram_dir = ""

# Convert all .opus files to .wav
print("Converting .opus files to .wav")
convert_opus_to_wav(wake_word_dir)
convert_opus_to_wav(not_wake_word_dir)
convert_opus_to_wav(background_noise_dir)

#Convert all wav files to 16kHz
print("Resampling .wav files to 16kHz")
resample_wav_files(wake_word_dir)
resample_wav_files(not_wake_word_dir)
resample_wav_files(background_noise_dir)

#Change .wav subtype to PCM_16
print("Changing .wav subtype to PCM_16")
change_wav_subtype(wake_word_dir)
change_wav_subtype(not_wake_word_dir)
change_wav_subtype(background_noise_dir)

# Process the audio files to be exactly 1 second long
print("Processing audio files to be exactly 1 second long")
process_wav_duration(wake_word_dir)
process_wav_duration(not_wake_word_dir)

# Split the audio files into 1 second chunks
print("Splitting audio files into 1 second chunks")
process_directory_chunks(background_noise_dir, background_noise_chunks_dir)

# Overlay random background noise into the wake word files
print("Overlaying random background noise into the wake word files")
overlay_random_wav(background_noise_chunks_dir, wake_word_dir, wake_word_with_noise_dir, repeat=170)
overlay_random_wav(background_noise_chunks_dir, not_wake_word_dir, not_wake_word_with_noise_dir, repeat=0)

#create spectrogram
print("Creating spectrograms")
process_dir_spectogram(wake_word_with_noise_dir, wake_word_spectrogram_dir)
process_dir_spectogram(not_wake_word_with_noise_dir, not_wake_word_spectrogram_dir)


In [None]:
# Load the spectrograms and create the labels
data = []
labels = []

for folder in [wake_word_spectrogram_dir, not_wake_word_spectrogram_dir]:
    for file in os.listdir(folder):
        if file.endswith('.npy'):
            spectrogram = np.load(os.path.join(folder, file))
            data.append(spectrogram)
            labels.append(1 if os.path.basename(folder) == 'wake_word' else 0)

# Convert the lists to numpy arrays
data = np.array(data)
labels = np.array(labels)

# Reshape the data to fit the model
data = data.reshape((-1, data.shape[1], data.shape[2], 1))

# Assuming data is your feature set and labels is your target variable
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.05, random_state=42)

from sklearn.model_selection import cross_val_score
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier

# Define a function to create the model, required for KerasClassifier
def create_model():
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', kernel_regularizer=l2(0.01), input_shape=(data.shape[1], data.shape[2], 1)),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', kernel_regularizer=l2(0.01)),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model




In [None]:
# Create a classifier with KerasClassifier
model = create_model()

# Perform cross-validation
history = model.fit(data, labels, epochs=10, batch_size=10, verbose=2, validation_split=0.2)


In [None]:
model.save("working_model_1")
