In [None]:
import os
import pickle
import random
import shutil
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

In [None]:
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
from librosa import feature
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

In [None]:
def split_data_and_create_folders(base_path, output_base_path):
    genres = os.listdir(base_path)
    data = []

    # Collect all file paths and their corresponding genres
    for genre in genres:
        genre_path = os.path.join(base_path, genre)
        for filename in os.listdir(genre_path):
            if filename.endswith('.png'):
                file_path = os.path.join(genre_path, filename)
                data.append((file_path, genre))

    # Split the data into train, test, and eval
    train, test = train_test_split(data, test_size=0.2, random_state=42)
    train, eval = train_test_split(train, test_size=0.2, random_state=42)
    def copy_files(file_list, set_name):
        for file_path, genre in file_list:
            dest_folder = os.path.join(output_base_path, set_name, genre)
            if not os.path.exists(dest_folder):
                os.makedirs(dest_folder)
            shutil.copy(file_path, dest_folder)

    # Copy files to respective directories
    copy_files(train, 'train')
    copy_files(test, 'test')
    copy_files(eval, 'validation')

In [None]:
def build_classification_dataset(data_path, split_phase):
    X = []
    y = []
    for data_type in ['test']:
        genres = sorted(os.listdir(os.path.join(data_path, data_type)))
        genre_to_index = {genre: i for i, genre in enumerate(genres)}
        for genre in genres:
            print(f"Processing genre: {genre}")
            genre_path = os.path.join(data_path, data_type, genre)
            for image_file in os.listdir(genre_path):
                if image_file.endswith('.png'):
                    image_path = os.path.join(genre_path, image_file)
                    image = Image.open(image_path).convert('L')
                    image_array = np.array(image) / 255.0
                    X.append(image_array)
                    y.append(genre_to_index[genre])
        X = np.array(X)
        y = to_categorical(np.array(y), num_classes=len(genres))
        X_phase_2 = X[int(len(X) * split_phase):]
        y_phase_2 = y[int(len(y) * split_phase):]
        phase_data = os.path.join(data_path, 'phase_2')
        if not os.path.exists(phase_data):
            os.makedirs(phase_data)
        print("Saving the datasets...")
        np.save(f"{phase_data}/X_{data_type}.npy", X_phase_2)
        np.save(f"{phase_data}/y_{data_type}.npy", y_phase_2)

In [None]:
def npy_to_spectrogram_png(target_path):
    genres = os.listdir(target_path)
    for genre in genres:
        genre_path = os.path.join(target_path, genre)
        for filename in os.listdir(genre_path):
            if filename.endswith('.npy'):
                original_filename = os.path.join(genre_path, filename)
                png_filename = filename.replace('.npy', '.png')
                target_filename = os.path.join(genre_path, png_filename)
                if os.path.exists(target_filename):
                    continue
                spectrogram_path = os.path.join(genre_path, filename)
                S_DB = np.load(spectrogram_path)
                plt.figure(figsize=(3, 3))
                librosa.display.specshow(S_DB, x_axis='time', y_axis='mel')
                plt.axis('off')
                plt.savefig(target_filename, bbox_inches='tight', pad_inches=0)
                plt.close()
                os.remove(original_filename)

In [None]:
def process_file(file_path, target_genre_path, genre, sr=None, data_length=30):
    assert data_length in [3, 30], "Data length must be 3 or 30 seconds"
    if file_path.endswith('.wav'):
        try:
            x, sr = librosa.load(file_path, sr=sr)
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return
        sr = int(sr)
        if data_length == 3:
            frames = librosa.util.frame(x=x, frame_length=3 * sr, hop_length=3 * sr)
            for i, frame in enumerate(frames.T):
                S = librosa.feature.melspectrogram(y=frame, sr=sr)
                S_DB = librosa.power_to_db(S, ref=np.max)
                filename = os.path.basename(file_path)
                new_filename = f"{'.'.join(filename.split('.')[:2])}.{i}.png"
                np.save(os.path.join(target_genre_path, new_filename), S_DB)
        else:
            S = librosa.feature.melspectrogram(y=x, sr=sr)
            S_DB = librosa.power_to_db(S, ref=np.max)
            np.save(os.path.join(target_genre_path, f"{os.path.basename(file_path).replace('.wav', '')}"), S_DB)

In [None]:
def wav_to_spectrogram(source_path, target_path, data_length):
    genres = os.listdir(source_path)
    with ThreadPoolExecutor() as executor:
        futures = []
        for genre in genres:
            print(f"Processing genre: {genre}")
            genre_path = os.path.join(source_path, genre)
            target_genre_path = os.path.join(target_path, genre)
            Path(target_genre_path).mkdir(parents=True, exist_ok=True)
            for filename in os.listdir(genre_path):
                file_path = os.path.join(genre_path, filename)
                args = (file_path, target_genre_path, genre, 22050, data_length)
                futures.append(executor.submit(process_file, *args))

        # Wait for all threads to complete
        for future in futures:
            future.result()

In [None]:
def create_pairs_dataset(base_dir="Data/3_sec_datasets/B_W_datasets", dist_matrix_path='Data/3_sec_datasets/dist_matrix_3_sec.pkl',
                         split_phase=None, convert_to_grey=False):
    dist_matrix = pd.read_pickle(dist_matrix_path)
    datasets = ['train', 'test', 'validation']
    # datasets = ['validation']
    for dataset in datasets:
        print(f'Creating pairs for {dataset} dataset...')
        X_pairs = []
        y_pairs = []
        for genre in os.listdir(os.path.join(base_dir, dataset)):
            genre_dir = os.path.join(base_dir, dataset, genre)
            images = [f for f in os.listdir(genre_dir) if f.endswith('.png')]
            for img_filename in images:
                img1_path = os.path.join(genre_dir, img_filename)
                if convert_to_grey:
                    image1 = Image.open(img1_path).convert('L')
                else:
                    image1 = Image.open(img1_path).convert('RGB')
                img1 = np.array(image1) / 255.0
                pair_img_filename = random.choice(images)
                img2_path = os.path.join(genre_dir, pair_img_filename)
                if convert_to_grey:
                    image2 = Image.open(img2_path).convert('L')
                else:
                    image2 = Image.open(img2_path).convert('RGB')
                img2 = np.array(image2) / 255.0
                try:
                    concatenated_images = np.stack((img1, img2))
                    dist = dist_matrix.loc[img_filename.replace('png', 'wav'), pair_img_filename.replace('png', 'wav')]
                    X_pairs.append(concatenated_images)
                    y_pairs.append(dist)
                except:
                    print(f'Error with {img_filename} and {pair_img_filename}')
                    continue
        X_pairs = np.array(X_pairs)
        y_pairs = np.array(y_pairs)
        if split_phase:
            pairs_dataset_dir = os.path.join(base_dir, 'phase_1', 'pairs_dataset')
            if not os.path.exists(pairs_dataset_dir):
                os.makedirs(pairs_dataset_dir)
            X_phase_1 = X_pairs[:int(len(X_pairs) * split_phase)]
            y_phase_1 = y_pairs[:int(len(y_pairs) * split_phase)]
            dataset = 'val' if dataset == 'validation' else dataset
            np.save(os.path.join(pairs_dataset_dir, f'X_{dataset}_pairs.npy'), X_phase_1)
            np.save(os.path.join(pairs_dataset_dir, f'y_{dataset}_pairs.npy'), y_phase_1)
        else:
            pairs_dataset_dir = os.path.join(base_dir, 'pairs_dataset')
            if not os.path.exists(pairs_dataset_dir):
                os.makedirs(pairs_dataset_dir)
            np.save(os.path.join(pairs_dataset_dir, f'X_{dataset}_pairs.npy'), X_pairs)
            np.save(os.path.join(pairs_dataset_dir, f'y_{dataset}_pairs.npy'), y_pairs)
        print(f'Created {len(X_pairs)} pairs for {dataset} dataset.')

In [None]:
def create_triplets_dataset_threshold(base_dir='Data/3_sec_datasets/B_W_datasets',
                            dist_matrix_path='Data/3_sec_datasets/dist_matrix_3_sec.pkl',
                            threshold=0.985, split_phase=0.5):
    dist_matrix = pd.read_pickle(dist_matrix_path)
    dist_matrix.index = [f.replace('wav', 'png') for f in dist_matrix.index]
    dist_matrix.columns = [f.replace('wav', 'png') for f in dist_matrix.columns]
    np.fill_diagonal(dist_matrix.values, np.nan)
    similarity_mask = dist_matrix > threshold
    datasets = ['train', 'test', 'validation']
    triplets_dataset_dir = os.path.join(base_dir, 'phase_1', 'triplets_dataset')
    if not os.path.exists(triplets_dataset_dir):
        os.makedirs(triplets_dataset_dir)
    for dataset in datasets:
        print(f'Creating triplets for {dataset} dataset...')
        anchors = []
        positives = []
        negatives = []
        dataset_images = []
        for genre in os.listdir(os.path.join(base_dir, dataset)):
            genre_dir = os.path.join(base_dir, dataset, genre)
            images = [f for f in os.listdir(genre_dir) if f in dist_matrix.index]
            dataset_images.extend(images)
        dataset_similarity_mask = similarity_mask.loc[dataset_images, dataset_images] > threshold
        for anchor in dataset_images:
            anchor_genre = anchor.split('.')[0]
            positive_candidates = dataset_similarity_mask.loc[
                anchor, dataset_similarity_mask.loc[anchor]].index.tolist()
            negative_candidates = dataset_similarity_mask.loc[
                anchor, ~dataset_similarity_mask.loc[anchor]].index.tolist()
            positive_candidates = [p for p in positive_candidates if p != anchor]
            negative_candidates = [n for n in negative_candidates if n != anchor]
            if positive_candidates and negative_candidates:
                positive = random.choice(positive_candidates)
                positive_genre = positive.split('.')[0]
                negative = random.choice(negative_candidates)
                negative_genre = negative.split('.')[0]
                anchor_path = os.path.join(base_dir, dataset, anchor_genre, anchor)
                positive_path = os.path.join(base_dir, dataset, positive_genre, positive)
                negative_path = os.path.join(base_dir, dataset, negative_genre, negative)
                anchor_img = np.array(Image.open(anchor_path).convert('L')) / 255.0
                positive_img = np.array(Image.open(positive_path).convert('L')) / 255.0
                negative_img = np.array(Image.open(negative_path).convert('L')) / 255.0
                anchors.append(anchor_img)
                positives.append(positive_img)
                negatives.append(negative_img)
        anchors = np.array(anchors)[:int(len(anchors) * split_phase)]
        positives = np.array(positives)[:int(len(positives) * split_phase)]
        negatives = np.array(negatives)[:int(len(negatives) * split_phase)]
        np.save(os.path.join(triplets_dataset_dir, f'anchors_{dataset}.npy'), anchors)
        np.save(os.path.join(triplets_dataset_dir, f'positives_{dataset}.npy'), positives)
        np.save(os.path.join(triplets_dataset_dir, f'negatives_{dataset}.npy'), negatives)
        print(f'Created {len(anchors)} triplets for {dataset} dataset.')

In [None]:
def create_triplets_dataset(base_dir='Data/3_sec_datasets/B_W_datasets',
                            dist_matrix_path='Data/3_sec_datasets/dist_matrix_3_sec.pkl',
                            split_phase=None, convert_to_grey=False):
    dist_matrix = pd.read_pickle(dist_matrix_path)
    dist_matrix.index = [f.replace('wav', 'png') for f in dist_matrix.index]
    dist_matrix.columns = [f.replace('wav', 'png') for f in dist_matrix.columns]
    triplets_dataset_dir = os.path.join(base_dir, 'triplets_dataset')
    if not os.path.exists(triplets_dataset_dir):
        os.makedirs(triplets_dataset_dir)
    for dataset in ['train', 'test', 'validation']:
        print(f'Creating triplets for {dataset} dataset...')
        anchors = []
        positives = []
        negatives = []
        genre_to_images = {}
        for genre in os.listdir(os.path.join(base_dir, dataset)):
            genre_dir = os.path.join(base_dir, dataset, genre)
            genre_to_images[genre] = [f for f in os.listdir(genre_dir) if f.endswith('.png')]
        for genre, images in genre_to_images.items():
            for anchor in images:
                positive_candidates = [img for img in images if img != anchor]
                if positive_candidates:
                    positive = random.choice(positive_candidates)
                    different_genres = [g for g in genre_to_images if g != genre]
                    negative_genre = random.choice(different_genres)
                    negative_candidates = genre_to_images[negative_genre]
                    negative = random.choice(negative_candidates)
                    anchor_path = os.path.join(base_dir, dataset, genre, anchor)
                    positive_path = os.path.join(base_dir, dataset, genre, positive)
                    negative_path = os.path.join(base_dir, dataset, negative_genre, negative)
                    if convert_to_grey:
                        anchor_img = np.array(Image.open(anchor_path).convert('L')) / 255.0
                        positive_img = np.array(Image.open(positive_path).convert('L')) / 255.0
                        negative_img = np.array(Image.open(negative_path).convert('L')) / 255.0
                    else:
                        anchor_img = np.array(Image.open(anchor_path).convert('RGB')) / 255.0
                        positive_img = np.array(Image.open(positive_path).convert('RGB')) / 255.0
                        negative_img = np.array(Image.open(negative_path).convert('RGB')) / 255.0
                    anchors.append(anchor_img)
                    positives.append(positive_img)
                    negatives.append(negative_img)
        if split_phase:
            limit = int(len(anchors) * split_phase)
            anchors = np.array(anchors)[:limit]
            positives = np.array(positives)[:limit]
            negatives = np.array(negatives)[:limit]
            dataset = 'val' if dataset == 'validation' else dataset
            np.save(os.path.join(triplets_dataset_dir, f'anchors_{dataset}.npy'), anchors)
            np.save(os.path.join(triplets_dataset_dir, f'positives_{dataset}.npy'), positives)
            np.save(os.path.join(triplets_dataset_dir, f'negatives_{dataset}.npy'), negatives)
        else:
            dataset = 'val' if dataset == 'validation' else dataset
            np.save(os.path.join(triplets_dataset_dir, f'anchors_{dataset}.npy'), np.array(anchors))
            np.save(os.path.join(triplets_dataset_dir, f'positives_{dataset}.npy'), np.array(positives))
            np.save(os.path.join(triplets_dataset_dir, f'negatives_{dataset}.npy'), np.array(negatives))
        print(f'Created {len(anchors)} triplets for {dataset} dataset.')

In [None]:
def convert_images_to_np_dataset(folder_path):
    for phase in ['train', 'test', 'validation']:
        print(f'Loading {phase} images...')
        images = []
        labels = []
        for genre in os.listdir(os.path.join(folder_path, phase)):
            print(f'Loading {genre} images...')
            genre_path = os.path.join(folder_path, phase, genre)
            for image_file in os.listdir(genre_path):
                if image_file.endswith('.png'):
                    image_path = os.path.join(genre_path, image_file)
                    image = Image.open(image_path).convert('RGB')
                    images.append(np.array(image))
                    labels.append(genre)
        images = np.array(images) / 255.0
        a = set(labels)
        labels = pd.get_dummies(labels).values
        np.save(f"{os.path.join(folder_path)}/classifier_dataset/X_{phase}.npy", images)
        np.save(f"{os.path.join(folder_path)}/classifier_dataset/y_{phase}.npy", labels)

In [None]:
def split_xgb_dataset(datasets_path, datasets):
    label_encoder = LabelEncoder()
    for dataset in datasets:
        df = pd.read_csv(f'{datasets_path}/{dataset}_datasets/features_{dataset}.csv', index_col=0)

        # Encode labels
        df['label'] = label_encoder.fit_transform(df['label'])

        # Splitting the data into train, validation, and test sets
        train, test = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
        train, eval = train_test_split(train, test_size=0.25, random_state=42, stratify=train['label'])

        # Separate features and labels
        X_train, y_train = train.drop('label', axis=1), train['label']
        X_eval, y_eval = eval.drop('label', axis=1), eval['label']
        X_test, y_test = test.drop('label', axis=1), test['label']
        pickle_path = f'{datasets_path}/{dataset}_datasets/xgboost_datasets'
        pickle.dump(X_train, open(f'{pickle_path}/X_train.pkl', 'wb'))
        pickle.dump(y_train, open(f'{pickle_path}/y_train.pkl', 'wb'))
        pickle.dump(X_eval, open(f'{pickle_path}/X_eval.pkl', 'wb'))
        pickle.dump(y_eval, open(f'{pickle_path}/y_eval.pkl', 'wb'))
        pickle.dump(X_test, open(f'{pickle_path}/X_test.pkl', 'wb'))
        pickle.dump(y_test, open(f'{pickle_path}/y_test.pkl', 'wb'))

In [None]:
if __name__ == "__main__":
    # build_classification_dataset('Data/3_sec_datasets/B_W_datasets', split_phase)
    # split_data_and_create_folders('Data/3_sec_datasets/3_sec_spectograms_full', 'Data/3_sec_datasets/RGB_datasets')
    # process_images_and_split('Data/3_sec_datasets/3_sec_spectograms_full')
    # wav_to_spectrogram('Data/genres_original', 'Data/30_sec_datasets/30_sec_spectograms_full', 30)
    # npy_to_spectrogram_png('Data/30_sec_datasets/30_sec_spectograms_full')
    # for folder in ['train', 'test', 'validation'][::-1]:
    #     convert_images_to_np_dataset(f'Data/3_sec_datasets/{folder}')
    datasets = ['30_sec', '3_sec']
    datasets_path = 'Data'
    split_xgb_dataset(datasets_path, datasets)
    print(1/0)
    for data_type in [30, 3]:
        convert_images_to_np_dataset(f'Data/{data_type}_sec_datasets/RGB_datasets')
        # create_pairs_dataset(base_dir=f'Data/{data_type}_sec_datasets/RGB_datasets', dist_matrix_path=f'Data/{data_type}_sec_datasets/dist_matrix_{data_type}_sec.pkl', split_phase=split_phase)
        # create_triplets_dataset(base_dir=f'Data/{data_type}_sec_datasets/RGB_datasets')