In [None]:
#Convert audio to mono
import os
import torchaudio
import torch
from tqdm import tqdm
from pathlib import Path

def convert_to_mono(filepath, output_path):
    audio, sample_rate = torchaudio.load(filepath)
    if audio.shape[0] == 2:
        mono = torch.mean(audio, dim=0, keepdim=True)
        torchaudio.save(output_path, mono, sample_rate)
    else:
        print(f"The audio at {filepath} is not stereo.")

def convert_folder_to_mono(input_dir, output_dir):
    wav_files = [f for f in os.listdir(input_dir) if f.endswith('.wav')]
    for wav_file in tqdm(wav_files, desc="Converting files"):
        input_path = os.path.join(input_dir, wav_file)
        output_path = os.path.join(output_dir, wav_file)
        convert_to_mono(input_path, output_path)


input_dir = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_Audio'))
output_dir = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_Audio_mono'))
convert_folder_to_mono(input_dir, output_dir)

In [38]:
##Generate label class
import csv
from pathlib import Path

# -------------------------------------------------------------------------------------- #

input_csv_path = str(Path.cwd().parents[2].joinpath('Audio_data', 'setting', 'species.csv'))
output_csv_path = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_labels_indices.csv'))
add_dummy_label = False
add_nota = True

# -------------------------------------------------------------------------------------- #

with open(input_csv_path, "r") as input_file:
    reader = csv.DictReader(input_file)
    rows = [row for row in reader if row["target"] == "TRUE"]
with open(output_csv_path, "w") as output_file:
    fieldnames = ["index", "mid", "display_name"]
    writer = csv.DictWriter(output_file, fieldnames=fieldnames)
    writer.writeheader()

    if add_dummy_label:
        writer.writerow({"index": 0, "mid": "/m/dummy", "display_name": "dummy"})
        start_index = 1
    else:
        start_index = 0

    for index, row in enumerate(rows, start=start_index):
        writer.writerow({
            "index": index,
            "mid": f"/m/bs{str(index).zfill(2)}",
            "display_name": row["code"]
        })
    if add_nota:
        writer.writerow({
            "index": index + 1,
            "mid": f"/m/bs{str(index + 1).zfill(2)}",
            "display_name": "NoneOfTheAbove"
        })

In [None]:
import os
import numpy as np
import torchaudio
from tqdm import tqdm
import json
import pandas as pd

def generate_segments(audio_path, audio_duration, segment_duration=1, gap=0.25):
    num_segments = int(np.ceil((audio_duration - segment_duration) / gap))
    segments = []

    for i in range(num_segments):
        start_time = i * gap
        end_time = start_time + segment_duration
        segments.append({
            "wav": audio_path,
            "start_time": start_time,
            "end_time": end_time
        })
    return segments

def buil_json_data(audio_directories=None, 
                   label_directories=None,
                   out_put_path=None,
                   segment_duration=1,
                   gap=0.25, 
                   compare_gap=0.4,
                   min_time_threshold=0.1,
                   finetune_labels_csv=None,
                   enable_deletion=True,
                   trans_label_neme=True,
                   AS_switch=True):

    audio_files = []
    for audio_directory_list in audio_directories:
        audio_directory = audio_directory_list[0]
        for filename in os.listdir(audio_directory):
            if filename.endswith(".wav"):
                audio_files.append(filename[:-4])

    filtered_segments = []
    for base_filename in tqdm(audio_files):
        audio_path = None
        txt_path = None

        for audio_directory_list in audio_directories:
            audio_directory = audio_directory_list[0]
            possible_path = os.path.join(audio_directory, base_filename + ".wav")
            if os.path.isfile(possible_path):
                audio_path = possible_path
                break

        for label_directory in label_directories:
            possible_path = os.path.join(label_directory, base_filename + ".txt")
            if os.path.isfile(possible_path):
                txt_path = possible_path
                break

        if audio_path is None or txt_path is None:
            print(f"Skipped {base_filename} as corresponding file not found.")
            continue

        waveform, sample_rate = torchaudio.load(audio_path)
        audio_duration = waveform.shape[1] / sample_rate

        segments = generate_segments(audio_path, audio_duration, segment_duration, gap)

        with open(txt_path, "r") as file:
            lines = file.readlines()

        intervals_and_labels = []
        for line in lines:
            start, end, label = line.split()
            intervals_and_labels.append({
                "start": float(start),
                "end": float(end),
                "label": label
            })

        labeled_segments = []

        for segment in segments:
            label_names = []
            delete_flag = False
            for interval_and_label in intervals_and_labels:
                if (segment["start_time"] < interval_and_label["end"] and segment["end_time"] > interval_and_label["start"]):
                    overlap_start = max(segment["start_time"], interval_and_label["start"])
                    overlap_end = min(segment["end_time"], interval_and_label["end"])
                    overlap_time = overlap_end - overlap_start
                    if overlap_time >= compare_gap:
                        label_names.append(interval_and_label["label"])
                    elif min_time_threshold <= overlap_time < compare_gap:
                        if enable_deletion:
                            delete_flag = True
                            break
            if label_names and not delete_flag:
                filtered_segments.append({
                    "wav": audio_path,
                    "start_time": segment["start_time"],
                    "end_time": segment["end_time"],
                    "labels": ",".join(label_names)
                })

        formatted_segments = {"data": filtered_segments}
        with open(out_put_path, 'w') as json_file:
            json.dump(formatted_segments, json_file, indent=2)

    if trans_label_neme:
        with open(out_put_path, 'r') as json_file:
            data = json.load(json_file)
        df = pd.read_csv(finetune_labels_csv)
        name_to_mid = pd.Series(df.mid.values, index=df.display_name).to_dict()
        processed_data = []
        for entry in data['data']:
            labels = entry['labels'].split(',')
            valid_labels = []
            for label in labels:
                if '-' in label:
                    species, sound_type = label.split('-')
                    if species in name_to_mid and (sound_type.startswith('S') or (AS_switch and sound_type.startswith('AS'))):
                        valid_labels.append(name_to_mid[species])
            if valid_labels:
                valid_labels = sorted(set(valid_labels))
                entry['labels'] = ','.join(valid_labels)
                processed_data.append(entry)
        with open(out_put_path, 'w') as json_file:
            json.dump({"data": processed_data}, json_file, indent=2)

segment_duration = 1 
gap = 0.25  
compare_gap = 0.4 
min_time_threshold = 0.1  
enable_deletion = True  

trans_label_neme = True 
finetune_labels_csv = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_labels_indices.csv'))
AS_switch = True 

audio_directories = [[str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_Audio_mono/'))]]
label_directories = [str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_Label_txt'))]
out_put_path = str(Path.cwd().parents[2].joinpath('temporary_file', 'segment.json'))
buil_json_data(audio_directories=audio_directories, 
                   label_directories=label_directories,
                   out_put_path=out_put_path,
                   segment_duration=segment_duration,
                   gap=gap, 
                   compare_gap=compare_gap,
                   min_time_threshold=min_time_threshold,
                   finetune_labels_csv=finetune_labels_csv,
                   enable_deletion=enable_deletion,
                   trans_label_neme=trans_label_neme,
                   AS_switch=AS_switch)

# open source
audio_directories = [[str(Path.cwd().parents[2].joinpath('Audio_data', 'opensource_Audio_mono/'))]]
label_directories = [str(Path.cwd().parents[2].joinpath('Audio_data', 'opensource_Label_txt'))]
out_put_path = str(Path.cwd().parents[2].joinpath('temporary_file', 'opensource_segment.json'))
buil_json_data(audio_directories=audio_directories, 
                   label_directories=label_directories,
                   out_put_path=out_put_path,
                   segment_duration=segment_duration,
                   gap=gap, 
                   compare_gap=compare_gap,
                   min_time_threshold=min_time_threshold,
                   finetune_labels_csv=finetune_labels_csv,
                   enable_deletion=enable_deletion,
                   trans_label_neme=trans_label_neme,
                   AS_switch=AS_switch)


In [None]:
## Select 10% of files for the validation / testing dataset
import os
import re
import random
import shutil
from collections import defaultdict
from pathlib import Path

wav_directory = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_Audio_mono'))

val_txt_path = str(Path.cwd().parents[2].joinpath('Audio_data', 'val_list.txt'))
test_txt_path = str(Path.cwd().parents[2].joinpath('Audio_data', 'test_list.txt'))
train_txt_path = str(Path.cwd().parents[2].joinpath('Audio_data', 'train_list.txt'))

pick_probability_1 = 0.10
pick_probability_2 = 0.10

wav_files = os.listdir(wav_directory)
regex = re.compile(r"([A-Z0-9]+)_(\d{8})_\d{6}\.wav")

counts = defaultdict(lambda: defaultdict(int))
filenames = defaultdict(lambda: defaultdict(list))
selected_counts_1 = defaultdict(lambda: defaultdict(int))
selected_counts_2 = defaultdict(lambda: defaultdict(int))
original_satisfying_dates = defaultdict(set)
skipped_files = []

copied_files_1 = []
copied_files_2 = []
remaining_files = []

total = 0
for file in wav_files:
    match = regex.match(file)
    if match:
        station, date = match.group(1), match.group(2)
        counts[station][date] += 1
        filenames[station][date].append(file)
        if counts[station][date] == 10:
            original_satisfying_dates[station].add(date)
        total += 1
    else:
        skipped_files.append(file)

def process_files(sampled_files, selected_counts, station, date, copied_files):
    for file in sampled_files:
        selected_counts[station][date] += 1
        copied_files.append(os.path.splitext(file)[0])

for station in sorted(counts.keys()):
    for date, files in sorted(filenames[station].items()):
        if len(files) >= 10:
            total_files = len(files)
            num_to_sample_1 = int(total_files * pick_probability_1)
            num_to_sample_2 = int(total_files * pick_probability_2)

            sampled_files = random.sample(files, min(total_files, num_to_sample_1 + num_to_sample_2))
            sampled_files_1 = sampled_files[:num_to_sample_1]
            sampled_files_2 = sampled_files[num_to_sample_1:num_to_sample_1 + num_to_sample_2]

            if sampled_files_1:
                process_files(sampled_files_1, selected_counts_1, station, date, copied_files_1)

            if sampled_files_2:
                process_files(sampled_files_2, selected_counts_2, station, date, copied_files_2)

    not_satisfying_files = [(date, files) for date, files in filenames[station].items() if len(files) < 10]
    total_files = sum(len(files) for _, files in not_satisfying_files)
    num_to_sample_1 = int(total_files * pick_probability_1)
    num_to_sample_2 = int(total_files * pick_probability_2)

    if not_satisfying_files:
        dates_sampled_1 = random.sample(not_satisfying_files, min(len(not_satisfying_files), num_to_sample_1))
        dates_sampled_2 = random.sample(not_satisfying_files, min(len(not_satisfying_files), num_to_sample_2))

        for date, files in dates_sampled_1:
            file_selected = random.choice(files)
            process_files([file_selected], selected_counts_1, station, date, copied_files_1)

        for date, files in dates_sampled_2:
            file_selected = random.choice(files)
            process_files([file_selected], selected_counts_2, station, date, copied_files_2)

all_files = [os.path.splitext(file)[0] for file in wav_files]
remaining_files = list(set(all_files) - set(copied_files_1) - set(copied_files_2))

with open(val_txt_path, "w") as file:
    file.write("\n".join(copied_files_1))

with open(test_txt_path, "w") as file:
    file.write("\n".join(copied_files_2))

with open(train_txt_path, "w") as file:
    file.write("\n".join(remaining_files))


In [None]:
import json
from pathlib import Path

def process_files(json_filename, txt_filenames, output_paths):
    with open(json_filename, 'r') as json_file:
        json_data = json.load(json_file)['data']
    
    for txt_filename, output_path in zip(txt_filenames, output_paths):
        with open(txt_filename, 'r') as txt_file:
            txt_data = [line.strip().split('/')[-1].split('.')[0] for line in txt_file]
        matched_data = [entry for entry in json_data if entry['wav'].split('/')[-1].split('.')[0] in txt_data]

        with open(output_path, 'w') as output_file:
            json.dump({"data": matched_data}, output_file, indent=4)

json_filename = str(Path.cwd().parents[2].joinpath('temporary_file', 'segment.json'))
txt_filenames = [
    str(Path.cwd().parents[2].joinpath('Audio_data', 'train_list.txt')),
    str(Path.cwd().parents[2].joinpath('Audio_data', 'val_list.txt')),
    str(Path.cwd().parents[2].joinpath('Audio_data', 'test_list.txt'))
]

output_paths = [
    str(Path.cwd().parents[2].joinpath('temporary_file', 'train.json')),
    str(Path.cwd().parents[2].joinpath('temporary_file', 'val.json')),
    str(Path.cwd().parents[2].joinpath('temporary_file', 'test.json'))
]

process_files(json_filename, txt_filenames, output_paths)


In [None]:
import json
import random
from pathlib import Path

def merge_shuffle_write_json(file_path1, file_path2, output_file):
    with open(file_path1, 'r') as f1, open(file_path2, 'r') as f2:
        data1 = json.load(f1)
        data2 = json.load(f2)
        merged_data = data1['data'] + data2['data']

    random.shuffle(merged_data)

    with open(output_file, 'w') as f:
        json.dump({'data': merged_data}, f, indent=2)


input_file1 = str(Path.cwd().parents[2].joinpath('temporary_file', 'train.json'))
input_file2 = str(Path.cwd().parents[2].joinpath('temporary_file', 'NoneOfTheAbove_train.json'))
input_file3 = str(Path.cwd().parents[2].joinpath('temporary_file', 'opensource_segment.json'))
output_file = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_train.json'))
merge_shuffle_write_json(input_file1, input_file2, input_file3, output_file)

input_file1 = str(Path.cwd().parents[2].joinpath('temporary_file', 'val.json'))
input_file2 = str(Path.cwd().parents[2].joinpath('temporary_file', 'NoneOfTheAbove_val.json'))
output_file = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_val.json'))
merge_shuffle_write_json(input_file1, input_file2, output_file)

input_file1 = str(Path.cwd().parents[2].joinpath('temporary_file', 'test.json'))
input_file2 = str(Path.cwd().parents[2].joinpath('temporary_file', 'NoneOfTheAbove_test.json'))
output_file = str(Path.cwd().parents[2].joinpath('Audio_data', 'finetune_test.json'))
merge_shuffle_write_json(input_file1, input_file2, output_file)