Script to trim one of the 4 hour files I grabbed for negatives to take about 220 10 second segments

In [None]:
import os
!pip install pydub
from pydub import AudioSegment

def trim_audio(input_file, start_time, duration, output_dir):
    # Load audio file
    audio = AudioSegment.from_wav(input_file)

    # Convert start time and duration to milliseconds
    start_time = start_time * 1000
    duration = duration * 1000

    # Trim audio
    trimmed_audio = audio[start_time:start_time + duration]

    # Define output filename based on input filename
    base_filename = os.path.basename(input_file)
    output_file = os.path.join(output_dir, base_filename)

    # Save trimmed audio
    trimmed_audio.export(output_file, format="wav")

# Input file path
input_file = "ExtraNegative/20221208_180000.WAV"

# Start time in seconds (1 hour 30 minutes = 5400 seconds)
start_time = 5400

# Duration in seconds
duration = 2200

# Output directory
output_dir = "ExtraNegative"

# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

trim_audio(input_file, start_time, duration, output_dir)




This next cell will split given audio files of any length into 10 second segments, and save them to the specified output directory

In [None]:
import os
import math
import torch
import torchaudio

def split_all_audio_files(input_dir, output_dir, segment_length_sec=10):
    # Make sure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Loop over all files in the input directory
    for filename in os.listdir(input_dir):
        # Check if the file is a .wav file
        if filename.endswith('.wav') or filename.endswith('.WAV'):
            try:
                # Full path to the original audio file
                audio_path = os.path.join(input_dir, filename)
                # Load the audio file
                waveform, sample_rate = torchaudio.load(audio_path)

                # Calculate number of samples in segment_length_sec
                num_samples_segment = segment_length_sec * sample_rate

                # Calculate total number of segments
                total_segments = math.ceil(waveform.shape[1] / num_samples_segment)

                # Split waveform into segments and save each segment to a new .wav file
                for i in range(total_segments):
                    start = i * num_samples_segment
                    end = start + num_samples_segment
                    segment = waveform[:, start:end]

                    # Prepare filename for the segment
                    segment_filename = f"{filename.rstrip('.wav')}_segment{i}.wav"
                    segment_path = os.path.join(output_dir, segment_filename)

                    # Save segment as a .wav file
                    segment = (segment * 32767).short()  # Convert to 16-bit PCM format
                    torchaudio.save(segment_path, segment, sample_rate)
            except Exception as e:
                  print(f"Error processing file {audio_path}: {str(e)}")

In [None]:
input_dir = 'ExtraPositive'
output_dir = 'ExtraPositive'
split_all_audio_files(input_dir, output_dir, segment_length_sec=10)

**Optional**

zip and download split files to local. or use google drive

files stored on google colab do not persist. I will download these files to review the positives again as some segments will no longer have our call after splitting.

In [None]:
from google.colab import files

!zip -r /content/SplitNegative.zip /content/SplitNegative
#!zip -r /content/SplitPositive.zip /content/SplitPositive



In [None]:
files.download("/content/SplitNegative.zip")
#files.download("/content/SplitPositive.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

After reviewing the files, I placed them in a google drive. Gain access here:

In [None]:
from google.colab import drive

drive.mount('/content/gdrive')
%cd gdrive/MyDrive/
%cd Rana_Draytonii/ # Location of data

Mounted at /content/gdrive
/content/gdrive/MyDrive
[Errno 2] No such file or directory: 'Rana_Draytonii/ # Location of data'
/content/gdrive/MyDrive


In [None]:
%cd Rana_Draytonii/

/content/gdrive/MyDrive/Rana_Draytonii


In [None]:
%ls

[0m[01;34m'Negative samples'[0m/   [01;34mRana3[0m/   [01;34mReducedNegative[0m/   ResultRana7.zip
[01;34m'Positive samples'[0m/   [01;34mRana4[0m/   [01;34mReducedPositive[0m/   [01;34mSplitNegative[0m/
 [01;34mRana1[0m/               [01;34mRana7[0m/   [01;34mResampledAudio[0m/    [01;34mSplitPositive[0m/


In [None]:
# Check how many CPU cores are available
!cat /proc/cpuinfo | grep 'processor' | wc -l


2


**Reduce frequency range of files**

This takes way too long, I will need to find a better way of doing this


In [None]:
import os
import torch
import torchaudio
import numpy as np
from scipy.signal import butter, lfilter
from multiprocessing import Pool

def process_file(args):
    audio_path, save_dir = args
    # Check for '.wav' or '.WAV' file extensions here
    if audio_path.endswith('.wav') or audio_path.endswith('.WAV'):
        new_filepath = process_audio(audio_path, save_dir)
        print(f"File {new_filepath} finished")

def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

def bandpass_filter(buffer):
    return butter_bandpass_filter(buffer, lowcut, highcut, FRAME_RATE, order=6)

def process_audio(audio_path, save_dir):
    # create new filename
    base_filename = os.path.basename(audio_path)
    new_filename = os.path.splitext(base_filename)[0] + '_reduced.wav'
    new_path = os.path.join(save_dir, new_filename)

    # Skip if file already processed
    if os.path.exists(new_path):
        return new_path

    # load your audio file
    waveform, sample_rate = torchaudio.load(audio_path)

    # Convert tensor to numpy array for bandpass filter
    waveform_np = waveform.numpy()

    # Apply bandpass filter
    waveform_np = np.apply_along_axis(bandpass_filter, 0, waveform_np)

    # Normalize the waveform if necessary
    if waveform_np.min() < -1.0 or waveform_np.max() > 1.0:
        waveform_np = waveform_np / np.max(np.abs(waveform_np))

    # Convert back to tensor and ensure the type is float32
    waveform = torch.from_numpy(waveform_np).float()

    # save the resampled audio
    torchaudio.save(new_path, waveform, sample_rate)

    return new_path

# For use with bandpass filter
lowcut = 10.0
highcut = 3000.0
FRAME_RATE = 16000

# Define the path where your positive and negative .wav files are stored
positive_audio_path = 'SplitPositive'
negative_audio_path = 'SplitNegative'
# Define directories to save updated audio files (0 - 3kHz)
positive_reduced_audio_dir = 'ReducedPositive'
negative_reduced_audio_dir = 'ReducedNegative'

# Create a list of arguments for process_file
positive_args = [(os.path.join(positive_audio_path, filename), positive_reduced_audio_dir)
                 for filename in os.listdir(positive_audio_path)]
negative_args = [(os.path.join(negative_audio_path, filename), negative_reduced_audio_dir)
                 for filename in os.listdir(negative_audio_path)]
args = positive_args + negative_args

# Create a pool of workers
with Pool(processes=2) as pool:  # adjust based on number of CPU cores
    # Map process_file function over all arguments
    pool.map(process_file, args)


**Resample to correct frequency, apply labels, and create label.csv and json files**



**Before Running:**


1.   specify path to positive and negative samples
2.   create folder and specify path for resampled audio
**Prepare JSON files and labels.csc for datasets:**

This script creates a JSON file for each of the train, validation, and test sets, each named train_data.json, val_data.json, and test_data.json, respectively.

Each entry in the JSON file will contain the path to the audio file, and the corresponding label.

The script will also resample the audio files, and convert and stereo files to mono.

This could be run in the greater Rana_Draytonii folder, then these files and the ResampledAudio folder should be moved into the Rana7 folder.

**Delete all files in ResampledAudio**

In [None]:
import os
import shutil

# Directory to clear
resampled_audio_dir = 'ResampledAudio'

# Delete all files in the directory
for filename in os.listdir(resampled_audio_dir):
    file_path = os.path.join(resampled_audio_dir, filename)
    try:
        if os.path.isfile(file_path) or os.path.islink(file_path):
            os.unlink(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
    except Exception as e:
        print('Failed to delete %s. Reason: %s' % (file_path, e))


In [None]:
import os
import torch
import torchaudio
import pandas as pd
from sklearn.model_selection import train_test_split
import json
import csv
!pip install pydub
from pydub import AudioSegment


# Changes sampling frequency of audio file to 16kHz required by the AST model
def resampler(audio_path, save_dir):
    # load your audio file
    waveform, sample_rate = torchaudio.load(audio_path)

    # define resampler
    resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)

    # resample the waveform
    waveform_resampled = resampler(waveform)

    # create new filename
    base_filename = os.path.basename(audio_path)
    new_filename = os.path.splitext(base_filename)[0] + '_resampled.wav'
    new_path = os.path.join(save_dir, new_filename)

    # save the resampled audio
    torchaudio.save(new_path, waveform_resampled, sample_rate=16000)

    return new_path

# Function to create an index dictionary file per model specifications
def make_index_dict(label_csv):
    index_lookup = {}
    with open(label_csv, 'r') as f:
        csv_reader = csv.DictReader(f)
        for row in csv_reader:
            index_lookup[row['display_name']] = row['mid']
    return index_lookup

import json

# Function to create .json file with filenames and labels per model specifications
def create_data_json(dataset, labels, filename, index_dict):
    data = []
    for wav_path, label in zip(dataset, labels):
        entry = {
            "wav": wav_path,
            "labels": index_dict[label],
        }
        data.append(entry)
    with open(filename, 'w') as f:
        json.dump({"data": data}, f, indent=4)

# Function to convert strero files to mono
def stereo_to_mono(directory_path):
  # Loop through all files in the directory
  for filename in os.listdir(directory_path):
      # Check if the file is a .wav file
      if filename.endswith('.wav'):
          # Get the full path of the file
          file_path = os.path.join(directory_path, filename)

          # Load audio file
          audio = AudioSegment.from_wav(file_path)

          # If the audio file is stereo
          if audio.channels == 2:
              print(f"Converting stereo file: {filename}")

              # Convert to mono
              mono_audio = audio.set_channels(1)

              # Replace the original file with the mono version
              mono_audio.export(file_path, format='wav')


# Define the path where your positive and negative .wav files are stored
positive_audio_path = 'SplitPositive'
negative_audio_path = 'SplitNegative'
# Define a directory to save resampled audio files (16kHz)
resampled_audio_dir = 'ResampledAudio'

# Define the target length for your spectrograms (only used for mel spectrogram)
target_length = 1000
mel_bins = 128  # Number of bins in Mel spectrogram

# Define labels
positive_label = 0
negative_label = 1

# Prepare dataset
dataset = []
numeric_labels = []  # For train_test_split and torch.Tensor
string_labels = []  # For JSON file

# Convert to mono
stereo_to_mono(positive_audio_path)
stereo_to_mono(negative_audio_path)

# Process positive samples
for filename in os.listdir(positive_audio_path):
    if filename.endswith('.wav') or filename.endswith('.WAV'):
        filepath = os.path.join(positive_audio_path, filename)
        filepath = resampler(filepath, resampled_audio_dir)  # Resample and get new file path
        dataset.append(filepath)  # Save filepath instead of spectrogram
        numeric_labels.append(positive_label)
        string_labels.append('Positive')

# Process negative samples
for filename in os.listdir(negative_audio_path):
    if filename.endswith('.wav') or filename.endswith('.WAV'):
        filepath = os.path.join(negative_audio_path, filename)
        filepath = resampler(filepath, resampled_audio_dir)  # Resample and get new file path
        dataset.append(filepath)  # Save filepath instead of spectrogram
        numeric_labels.append(negative_label)
        string_labels.append('Negative')

numeric_labels = torch.Tensor(numeric_labels)

# train_test_split
dataset_trainval, dataset_test, labels_trainval, labels_test, string_labels_trainval, string_labels_test = train_test_split(dataset, numeric_labels, string_labels, test_size=0.15, random_state=42, stratify=numeric_labels)
dataset_train, dataset_val, labels_train, labels_val, string_labels_train, string_labels_val = train_test_split(dataset_trainval, labels_trainval, string_labels_trainval, test_size=0.15, random_state=42, stratify=labels_trainval)

# Create labels.csv
labels = {
    'index': [0, 1],  # Modify the index values as per your labels
    'mid': ['/m/positive', '/m/negative'],  # Modify the MID values as per your labels
    'display_name': ['Positive', 'Negative']  # Modify the display names as per your labels
}

df = pd.DataFrame(labels)
df.to_csv('labels.csv', index=False)

index_dict = make_index_dict('labels.csv')

# Check if the keys you'll use exist in the dictionary (testing)
expected_keys = ['Positive', 'Negative']
for key in expected_keys:
    if key not in index_dict:
        print(f"Key '{key}' not found in index_dict")
    else:
        print(f"Key '{key}' found in index_dict. Corresponding value is {index_dict[key]}")
print("index_dict:", index_dict)


# Create json files
create_data_json(dataset_train, string_labels_train, 'train_data.json', index_dict)
create_data_json(dataset_val, string_labels_val, 'val_data.json', index_dict)
create_data_json(dataset_test, string_labels_test, 'test_data.json', index_dict)




Key 'Positive' found in index_dict. Corresponding value is /m/positive
Key 'Negative' found in index_dict. Corresponding value is /m/negative
index_dict: {'Positive': '/m/positive', 'Negative': '/m/negative'}


In [None]:
%cd Rana7/

/content/gdrive/MyDrive/Rana_Draytonii/Rana7
