In [2]:
import os
import random
from   tqdm            import tqdm
import numpy           as np
import pandas          as pd
from   tqdm            import tqdm
from   IPython.display import Audio
import plotly.express as px
from sklearn.model_selection import train_test_split
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


<h1 style="background-color:#4CAF50;"> <center> Create Validation groups for the first and second challenges  </center> </h1>

In [58]:
# reporoducability
seed = 41
random.seed(seed)
np.random.seed(seed)

<h1 style="background-color:#4CAF50;"> <center> Load Train Dataset </center> </h1>

In [3]:
data_folder = '/content/drive/My Drive/speakathon_data_subset'
print(os.path.exists(data_folder))
audio_files_path  = os.path.join(data_folder, "wav_files_subset")
print(os.path.exists(audio_files_path))
challenge_folder = os.path.join(data_folder, "challenge")
if not os.path.exists(challenge_folder):
    os.makedirs(challenge_folder)
# TODO: Place the file: groups_challenge.csv in challenge_folder

True
True


In [67]:
TRAIN_CSV = os.path.join(data_folder,"hackathon_train_subset.csv")
# in the hackathon don't forget to unmark to the line below
# TRAIN_CSV = os.path.join(data_folder,"hackathon_train.csv")
train_df = pd.read_csv(TRAIN_CSV)
print(f"Number of utterances: {train_df.shape[0]}")
train_df.head()

Number of utterances: 74


Unnamed: 0,language,file,speaker,noise_type
0,russian,2006849.wav,3191,comm
1,russian,3018610.wav,3191,clean
2,russian,7608580.wav,3191,background
3,russian,4426066.wav,3191,comm
4,russian,8924221.wav,3191,background


In [68]:
# @title
# Let's define some helper methods
def get_speaker_lang(df, speaker):
    return df[df.speaker == speaker]['language'].iloc[0]

def get_utterance_lang(df, file):
    return df[df.file == file]['language'].iloc[0]

def get_utterance_noise_type(df, file):
    return df[df.file == file]['noise_type'].iloc[0]

def get_spk_to_utt(df):
    # Creta a dictionary of speaker to utterances
    spk_to_utts = dict()

    for index, row in df.iterrows():
        file_path = row['file']
        spk       = row['speaker']
        file_path = os.path.join(audio_files_path, file_path)
        if not os.path.exists(file_path):
            print(f"Found invalid file: {file_path}")

        if spk not in spk_to_utts:
            spk_to_utts[spk] = [file_path]
        else:
            spk_to_utts[spk].append(file_path)
    return spk_to_utts

<h1 style="background-color:#4CAF50;"> <center> Split to train and validation </center> </h1>

In [69]:
# TODO: You should split the train to train/validation and run the rest of this script on the validation only
# We split here by the speaker but there could be better ways to split...

# Get unique speakers
unique_speakers = train_df['speaker'].unique()

# Split the unique speakers into train and validation sets
train_speakers, validation_speakers = train_test_split(unique_speakers, test_size=0.2, random_state=seed)

# Filter dataframe based on the split speakers
train_df_new = train_df[train_df['speaker'].isin(train_speakers)]
validation_df = train_df[train_df['speaker'].isin(validation_speakers)]

train_df.shape, train_df_new.shape, validation_df.shape

((74, 4), (59, 4), (15, 4))

In [70]:
spk_to_utts = get_spk_to_utt(validation_df)

speakers  = set(spk_to_utts.keys())

print(f"Amount of speakers in validation set: {len(speakers)}")

Amount of speakers in validation set: 1


<h1 style="background-color:#4CAF50;"> <center> Challenge Validation: Create Random Anchors and Groups </center> </h1>

Creating the dataset:
1. Choose the anchor speakers:</br>
    Choose them randomly as long as they have at least 2 utterances per speaker.</br>
  
2. Create the groups. Each group creation gets an anchor as param:</br>
    2.1 Select a file as the anchor utterance</br>
    2.2 Get another utterance from the anchor speaker and add it to the group utterances</br>
    2.3 Randomly decide the number of utterences in the group</br>
    2.4 Select the other utterances (from the same languages) in the group</br>
    2.5 Shuffle the group utterances before returning them

In [71]:
NUM_GROUPS_TO_CREATE = len(speakers)

<span style="font-size: larger;"> Define some helper methods:

In [72]:
# @title
def choose_anchor_utt(speaker, spk_to_utts):
    """
    Selects an anchor utterance and another random utterance for a given speaker.

    Parameters:
    - speaker: The identifier for the speaker of interest.
    - spk_to_utts: A dictionary mapping each speaker to a list of their utterances (file paths).

    Returns:
    - A tuple containing:
        - speaker: The identifier of the speaker.
        - anchor_audio_basename: The base name of the anchor audio file (without folder path).
        - same_speaker_utt_basename: The base name of another utterance from the same speaker.
    """
    anchor_audio = random.choice(spk_to_utts[speaker])

    # get another utterance from the same speaker
    all_speaker_utt = spk_to_utts[speaker]

    all_speaker_utt.remove(anchor_audio)

    random.shuffle(all_speaker_utt)
    same_speaker_utt = all_speaker_utt[0]

    return speaker, os.path.basename(anchor_audio), os.path.basename(same_speaker_utt)

In [73]:
# @title
def create_group(df, anchor_speaker, spk_to_utts):
    """
    Creates a group of audio files based on a given anchor speaker. The group is formed by selecting
    an anchor utterance from the anchor speaker and additional utterances from different speakers
    that share the same language as the anchor.

    Parameters:
    - df: DataFrame containing metadata about the audio files, including speaker, file path, language, and noise type.
    - anchor_speaker: The identifier for the anchor speaker.
    - spk_to_utts: A dictionary mapping each speaker to their utterances (audio file paths).

    Returns:
    - A tuple containing information about the created group, including:
        - group_audio_files: A list of the base names of the audio files in the group.
        - group_audio_speaker: A list of speaker identifiers corresponding to each file in the group.
        - anchor_speaker: The identifier of the anchor speaker.
        - anchor_audio: The base name of the anchor audio file.
        - anchor_type: The noise type of the anchor audio file.
        - same_speaker_utt: The base name of another utterance from the anchor speaker.
        - group_audio_type: A list of noise types corresponding to each audio file in the group.

    The function first selects an anchor utterance for the anchor speaker and another utterance from the
    same speaker. It then identifies additional speakers who speak the same language as the anchor speaker
    and randomly selects utterances from these speakers to form the group.
    """

    speakers = list(spk_to_utts.keys())

    # Choose an anchor audio file for the anchor speaker
    anchor_speaker, anchor_audio, same_speaker_utt = choose_anchor_utt(anchor_speaker, spk_to_utts)

    # Get the anchor's language
    anchor_lang = get_speaker_lang(df, anchor_speaker)
    anchor_type = get_utterance_noise_type(df, anchor_audio)

    # Remove anchor speaker temporarily
    speakers.remove(anchor_speaker)

    group_audio_files   = []
    group_audio_speaker = []
    group_audio_type    = []

    # Randomly decide the number of files in the group
    num_files_in_group = random.randint(5, 20)

    group_audio_files.append(same_speaker_utt)
    group_audio_speaker.append(anchor_speaker)
    group_audio_type.append(get_utterance_noise_type(df, same_speaker_utt))

    num_files_in_group -= 1  # Decrement since we added one from the anchor speaker

    # Randomly select speakers from the same language
    curr_speakers = set(df[(df.language == anchor_lang) & (df.speaker != anchor_speaker)]['speaker'].values.tolist())

    selected_speakers = random.sample(list(curr_speakers), min(num_files_in_group, len(curr_speakers)))

    for speaker in selected_speakers:
        group_utt = random.choice(spk_to_utts[speaker])
        group_audio_files.append(group_utt)
        group_audio_speaker.append(speaker)
        group_audio_type.append(get_utterance_noise_type(df, os.path.basename(group_utt)))

    group_audio_files = [os.path.basename(f) for f in group_audio_files]
    combined_lists = list(zip(group_audio_files, group_audio_speaker))
    random.shuffle(combined_lists)
    group_audio_files, group_audio_speaker = zip(*combined_lists)
    return group_audio_files, group_audio_speaker, anchor_speaker, anchor_audio, anchor_type, same_speaker_utt, group_audio_type

In [74]:
# @title
def create_dataset(df, spk_to_utts, num_groups=NUM_GROUPS_TO_CREATE):
    """
    Generates dataset of groups where each group contains an anchor audio file
    and additional audio files.

    Parameters:
    - df: DataFrame containing audio file metadata, including speaker IDs, file paths, languages, and noise types.
    - spk_to_utts: Dictionary mapping speaker IDs to their utterances (list of audio file paths).
    - num_groups: The desired number of groups to create in the dataset.

    Returns:
    - groups_df: A pandas DataFrame with columns detailing each group's composition, including the group ID, anchor file,
                 anchor speaker, group files, speakers for each file in the group, and the target label (file from the
                 same speaker as the anchor).
    """

    dataset = []

    # Filter speakers to only those with at least two audio files
    speakers_with_at_least_two_utts = [speaker for speaker in speakers if len(spk_to_utts[speaker]) >= 2]

    # Ensure there are enough speakers to meet the num_groups requirement
    if len(speakers_with_at_least_two_utts) >= num_groups:
        # Directly sample from the filtered list of speakers
        anchor_speakers = random.sample(speakers_with_at_least_two_utts, num_groups)
    else:
        raise ValueError(f"Not enough speakers with at least two utterances to form {num_groups} groups.")

    for i in range(num_groups):
        anchor = anchor_speakers[i]
        # create the group for this anchor
        group, group_audio_speaker, anchor_speaker_id, anchor_utt, anchor_type, label, group_audio_type = create_group(df, anchor, spk_to_utts)

        dataset.append({
            'group_index'        : i,
            'group'              : group,
            'group_audio_speaker': group_audio_speaker,
            'group_audio_type'   : group_audio_type,
            'anchor_speaker'     : anchor_speaker_id,
            'anchor'             : anchor_utt,
            'anchor_type'             : anchor_type,
            'label'              : label
        })

    # create a new dataframe with columns: group_id, anchor_file, group_file, group_label, group_audio_type
    rows = []
    for group in dataset:
        group_index      = group['group_index']
        anchor_file      = os.path.basename(group['anchor'])
        anchor_speaker   = group['anchor_speaker']
        anchor_type   = group['anchor_type']
        group_label      = os.path.basename(group['label'])

        for i, f in enumerate(group['group']):
            group_file = os.path.basename(f)
            group_audio_speaker = group['group_audio_speaker'][i]
            group_audio_type    = group['group_audio_type'][i]
            row = {'group_id': group_index, 'group_audio_speaker' : group_audio_speaker, 'group_label': group_label, 'anchor_file': anchor_file, 'anchor_speaker': anchor_speaker, 'anchor_type': anchor_type, 'group_file':group_file, 'group_audio_type':group_audio_type}
            rows.append(row)

    groups_df = pd.DataFrame(rows)

    # Reorder the columns:
    # group_id:            The ID of the group
    # anchor_file:         The file which is the anchor
    # anchor_speaker:      The speaker id of the person speaking in the anchor_file
    # group_file:          A file in the group
    # group_audio_speaker: The speaker is that is speaking in the group_file
    # group_audio_type:    The noise type of this group file
    # group_label:         The label we want to predict, the name of the group_file in which the speaker is the same as the anchor_file

    groups_df = groups_df[["group_id", "anchor_file", "anchor_speaker", "anchor_type", "group_file", "group_audio_speaker", "group_audio_type", "group_label"]]

    return groups_df

<span style="font-size: larger;"> Create the validation dataset:

In [75]:
groups_df = create_dataset(validation_df, spk_to_utts, NUM_GROUPS_TO_CREATE)
groups_df

Unnamed: 0,group_id,anchor_file,anchor_speaker,anchor_type,group_file,group_audio_speaker,group_audio_type,group_label
0,0,5656834.wav,4177,clean,7890459.wav,4177,clean,7890459.wav


In [32]:
groups_df.to_csv(os.path.join(challenge_folder, 'groups_challenge_validation.csv'), index=False)