In [120]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [121]:
import os 
DATA_DIR = "data/icra2025-disturbances/robot_empty_cup-human_full_cup/human_matches_robot_movements" # root dir of your dataset 

assert os.path.exists(DATA_DIR), f"Data directory {DATA_DIR} does not exist"
""" / 
        ├── cat1
        │   ├── <sensor>_<timestamp>.csv
        │   ├── <sensor>_<timestamp>.pkl
        │   ├── ...
        ├── cat2
        │   ├── <sensor>_<timestamp>.csv
        │   ├── <sensor>_<timestamp>.pkl
        │   ├── ...
        ├── ... 
"""

categories_to_drop = ["empty_disturbed", "legacy"]

sample_freq_to_encode= 16000 # 16kHz
store_categories_csv = False # create a csv that maps each category to an index, only do this once and make sure it is consistent across all datasets!
N_FOLDS = 0 # set to zero if you do not want to create train/val splits. 
val_ratio = 0.2

In [122]:
# remove all "wav" files from the data directory to avoid inconsistencies
import os
import glob
# glob cannot find files with a leading dot
files = glob.glob(os.path.join(DATA_DIR, "**", "*.wav"), recursive=True)

for f in files:
    os.remove(f)

In [123]:
# find all csv files in the data directory, create dataframe
categories = os.listdir(DATA_DIR)
categories = [c for c in categories if os.path.isdir(os.path.join(DATA_DIR, c))]
print(categories)


import pandas as pd

# create df with 3 columns: filepath, sensor_type, category
entries = []

# for each category, find all csv files
for category in categories:
    print(category)
    csv_files = os.listdir(os.path.join(DATA_DIR, category))
    csv_files = [f for f in csv_files if f.endswith(".csv")]
    print(csv_files)

    for file in csv_files:
        sensor_type = file.split("_")[0]
        filepath = os.path.join(DATA_DIR, category, file)
        # add to df
        entries.append({"filepath": filepath, "sensor_type": sensor_type, "category": category})    

df = pd.DataFrame(entries)

print(len(df))

['empty']
empty
['laser_2025-02-20[2].csv', 'mic_2025-02-20[7].csv', 'mic_2025-02-20[5].csv', 'laser_2025-02-20[1].csv', 'laser_2025-02-20[7].csv', 'mic_2025-02-20[1].csv', 'mic_2025-02-20[9].csv', 'mic_2025-02-20[10].csv', 'mic_2025-02-20[4].csv', 'mic_2025-02-20[3].csv', 'laser_2025-02-20[9].csv', 'laser_2025-02-20[4].csv', 'mic_2025-02-20[8].csv', 'laser_2025-02-20[8].csv', 'mic_2025-02-20[6].csv', 'laser_2025-02-20[3].csv', 'laser_2025-02-20[6].csv', 'laser_2025-02-20[10].csv', 'laser_2025-02-20[5].csv', 'mic_2025-02-20[2].csv']
20


In [124]:
# drop categories if needed

for cat in categories_to_drop:
    df = df[df["category"] != cat]
print(f"dataset contains {len(df)} entries")


# rename categories
#if "2_9Vbatteries" ,make "3_M6x14"

# df["category"] = df["category"].apply(lambda x: x.replace("2_9Vbatteries", "3_M6x14"))

dataset contains 20 entries


In [125]:
# for each entry, read the csv file save wav file
from converter import SpectrogramCalculator
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm

for i, row in tqdm(df.iterrows(), total=len(df)):
 
    filepath = row["filepath"]
    par_dir = os.path.dirname(filepath)
    filename = os.path.basename(filepath)
    filename = filename.replace(".csv", "")


    calc = SpectrogramCalculator(par_dir, filename)
    calc.encode_as_wav(16000) # AST was pretrained on 16kHz audio

    # plot waveform
    # plot_waveform(torch.tensor(calc.data).unsqueeze(0), int(calc.fs))
    # plt.show()

    # save RAM
    del calc
    


100%|██████████| 20/20 [00:09<00:00,  2.15it/s]


In [126]:
if store_categories_csv:
    # create csv with categories
    input("are you sure you want to create a csv with categories? This should only be done once and should be consistent across all datasets. Press enter to continue")

    categories = df["category"].unique()
    categories = {cat: i for i, cat in enumerate(categories)}

    # create csv with entries
    # idx, midname, category_name
    print(categories)

    with open("data/robomic_categories.csv","w") as f:
        f.write("index,mid,display_name\n")
        for name, index in categories.items():
            midname = f"m/robomic{index:02}"
            f.write(f'{index},{midname},"{name}"\n')



categories = open("data/robomic_categories.csv").readlines()
categories = [c.strip().split(",") for c in categories]
categories = categories[1:]
cat_name_to_idx = {c[2].replace('"',''): int(c[0]) for c in categories}
cat_idx_to_midname = {int(c[0]): c[1] for c in categories}
print(f"found {len(cat_name_to_idx)} categories")


found 2 categories


In [127]:
# helper to create json file that defines dataset
import json
def create_json_from_df(df, filename):

    data = []
    for i, row in df.iterrows():
        wav_path = row["filepath"].replace(".csv", ".wav")
        # make relative to filename directory
        par_dir = os.path.dirname(filename)
        wav_path = os.path.relpath(wav_path, par_dir)
        category = row["category"]
        category_id = cat_name_to_idx[category]
        midname_label = cat_idx_to_midname[category_id]
        data.append({"wav": wav_path, "labels": midname_label})
    json_data = {"data": data}
    with open(filename, "w") as f:
        json.dump(json_data, f)



In [128]:
# split by sensor type
laser_df = df[df["sensor_type"] == "laser"]
mic_df = df[df["sensor_type"] == "mic"]

In [129]:
# store entire dataset 
create_json_from_df(laser_df, f"{DATA_DIR}/robomic_all_laser.json")
create_json_from_df(mic_df, f"{DATA_DIR}/robomic_all_mic.json")

In [130]:
import random
random.seed(42)

if N_FOLDS > 0:
    for k in range(N_FOLDS):
        # create train and test splits 
        train_laser_indices = random.sample(range(len(laser_df)), int(len(laser_df) * (1 - val_ratio)))
        val_laser_indices = [i for i in range(len(laser_df)) if i not in train_laser_indices]

        train_mic_indices = random.sample(range(len(mic_df)), int(len(mic_df) * (1 - val_ratio)))
        val_mic_indices = [i for i in range(len(mic_df)) if i not in train_mic_indices]

        train_laser_df = laser_df.iloc[train_laser_indices]
        val_laser_df = laser_df.iloc[val_laser_indices]

        train_mic_df = mic_df.iloc[train_mic_indices]
        val_mic_df = mic_df.iloc[val_mic_indices]
        print(f"fold {k}: train laser {len(train_laser_df)}, val laser {len(val_laser_df)}, train mic {len(train_mic_df)}, val mic {len(val_mic_df)}")

        create_json_from_df(train_laser_df, f"{DATA_DIR}/robomic_train_laser_fold_{k}.json")
        create_json_from_df(val_laser_df, f"{DATA_DIR}/robomic_val_laser_fold_{k}.json")
        create_json_from_df(train_mic_df, f"{DATA_DIR}/robomic_train_mic_fold_{k}.json")
        create_json_from_df(val_mic_df, f"{DATA_DIR}/robomic_val_mic_fold_{k}.json")
