In [24]:
from pathlib import Path
import glob
import os
import random

from sklearn.model_selection import train_test_split
from pydub.exceptions import CouldntDecodeError
from pydub import AudioSegment

In [25]:
DATASET = Path("./fma/")

In [26]:
all_files = list(DATASET.glob("**/*.mp3"))

In [16]:
# testing for invalid files
invalid_files = []
for file in all_files:
    try:
        AudioSegment.from_file(file)
    except CouldntDecodeError:
        invalid_files.append(file)
        print(f"{file}")
for file in invalid_files:
    os.remove(file)

fma/099/099134.mp3


In [27]:
train_files, test_files = train_test_split(all_files, test_size=0.1, random_state=0)

In [28]:
len(train_files), len(test_files)

(7197, 800)

In [16]:
def split_and_save_file(file, save_to):
    audio = AudioSegment.from_file(file)
    total_duration = len(audio)
    
    first_10_secs = audio[:10000]  # 10000 milliseconds = 10 seconds
    last_10_secs = audio[max(0, total_duration - 10000):]

    base_filename = file.parts[-1].split(".")[0]
    first_output_path = os.path.join(save_to, f"{base_filename}_s1.mp3")
    last_output_path = os.path.join(save_to, f"{base_filename}_s2.mp3")
    
    # Export segments
    first_10_secs.export(first_output_path, format="mp3")
    last_10_secs.export(last_output_path, format="mp3")
    return f"{base_filename}_s1.mp3", f"{base_filename}_s2.mp3"

In [17]:
def split_and_save_file_triplet(file, save_to):
    audio = AudioSegment.from_file(file)
    total_duration = len(audio)
    
    first_10_secs = audio[:10000]  # 10000 milliseconds = 10 seconds
    second_10_secs = audio[10000: total_duration - 10000]
    last_10_secs = audio[max(0, total_duration - 10000):]

    base_filename = file.parts[-1].split(".")[0]
    first_output_path = os.path.join(save_to, f"{base_filename}_s1.mp3")
    second_output_path = os.path.join(save_to, f"{base_filename}_s2.mp3")
    last_output_path = os.path.join(save_to, f"{base_filename}_s3.mp3")
    
    # Export segments
    first_10_secs.export(first_output_path, format="mp3")
    second_10_secs.export(second_output_path, format="mp3")
    last_10_secs.export(last_output_path, format="mp3")
    return f"{base_filename}_s1.mp3", f"{base_filename}_s2.mp3", f"{base_filename}_s3.mp3"

In [7]:
train_path= Path("./train_pairs")
val_path = Path("./val_pairs")

if not train_path.exists():
    os.mkdir(train_path)

if not val_path.exists():
    os.mkdir(val_path)

In [8]:
pair_data = []
for i, file in enumerate(train_files):
    s1, s2 = split_and_save_file(file, train_path)
    pair_data.append((s1, s2))
    if i % 200 == 0: 
        print(f"[{i}/{len(train_files)}]")


[0/7197]
[200/7197]
[400/7197]
[600/7197]
[800/7197]
[1000/7197]
[1200/7197]
[1400/7197]
[1600/7197]
[1800/7197]
[2000/7197]
[2200/7197]
[2400/7197]
[2600/7197]
[2800/7197]
[3000/7197]
[3200/7197]
[3400/7197]
[3600/7197]
[3800/7197]
[4000/7197]
[4200/7197]
[4400/7197]
[4600/7197]
[4800/7197]
[5000/7197]
[5200/7197]
[5400/7197]
[5600/7197]
[5800/7197]
[6000/7197]
[6200/7197]
[6400/7197]
[6600/7197]
[6800/7197]
[7000/7197]


In [12]:
with open(train_path / "info.txt", "w") as f:
    f.write("\n".join("\t".join(segments) for segments in pair_data))

In [10]:
t_pair_data = []
for i, file in enumerate(test_files):
    s1, s2 = split_and_save_file(file, val_path)
    t_pair_data.append((s1, s2))
    if i % 200 == 0: 
        print(f"[{i}/{len(test_files)}]")

[0/800]
[200/800]
[400/800]
[600/800]


In [11]:
with open(val_path / "info.txt", "w") as f:
    f.write("\n".join("\t".join(segments) for segments in t_pair_data))

In [10]:
train_path_3 = Path("./train_pairs_trip")
val_path_3 = Path("./val_pairs_trip")

if not train_path_3.exists():
    os.mkdir(train_path_3)

if not val_path_3.exists():
    os.mkdir(val_path_3)

In [21]:
pair_data_3 = []
for i, file in enumerate(train_files):
    s1, s2, s3 = split_and_save_file_triplet(file, train_path_3)
    pair_data_3.extend([(s1, s2), (s2, s3), (s3, s1)])
    if i % 200 == 0: 
        print(f"[{i}/{len(train_files)}]")

[0/7197]


KeyboardInterrupt: 

In [22]:
random.shuffle(pair_data_3)
with open(train_path_3 / "info.txt", "w") as f:
    f.write("\n".join("\t".join(segments) for segments in pair_data_3))

In [30]:
pair_data_t_3 = []
for i, file in enumerate(test_files):
    s1, s2, s3 = split_and_save_file_triplet(file, val_path_3)
    pair_data_t_3.extend([(s1, s2), (s2, s3), (s3, s1)])
    if i % 100 == 0: 
        print(f"[{i}/{len(test_files)}]")

[0/800]
[100/800]
[200/800]
[300/800]
[400/800]
[500/800]
[600/800]
[700/800]


In [31]:
random.shuffle(pair_data_t_3)
with open(val_path_3 / "info.txt", "w") as f:
    f.write("\n".join("\t".join(segments) for segments in pair_data_t_3))