Construction of the RatPup dataset

In [6]:
# 1. Imports
import os
import glob
import pandas as pd

# 2. Function to parse a time string "M:SS.mmm" or "H:MM:SS.mmm" into seconds (float)
def parse_time_to_seconds(time_str: str) -> float:
    parts = time_str.split(':')
    if len(parts) == 3:
        hours = float(parts[0])
        minutes = float(parts[1])
        seconds = float(parts[2])
    elif len(parts) == 2:
        hours = 0.0
        minutes = float(parts[0])
        seconds = float(parts[1])
    else:
        raise ValueError(f"Unexpected time format: {time_str}")
    return hours * 3600 + minutes * 60 + seconds

# 3. Collect all durations, track empty files
root_dir = 'XXXX'
csv_pattern = os.path.join(root_dir, '**', '*.csv')
csv_files = glob.glob(csv_pattern, recursive=True)

all_durations = []
empty_files = []
skipped = []

for csv_file in csv_files:
    try:
        # split on comma or tab
        df = pd.read_csv(csv_file, sep=r'[,\t]+', engine='python', dtype=str)
    except Exception as e:
        skipped.append((csv_file, f'read error: {e}'))
        continue

    df.columns = df.columns.str.strip()

    if 'Duration' not in df.columns:
        skipped.append((csv_file, f"no 'Duration' column; got {df.columns.tolist()}"))
        continue

    # drop rows where Duration is NaN or empty
    df = df.dropna(subset=['Duration'])
    df = df[df['Duration'].str.strip() != '']

    # if no data left, record as empty
    if df.empty:
        empty_files.append(csv_file)
        continue

    # parse durations
    try:
        durations = df['Duration'].apply(parse_time_to_seconds)
        all_durations.extend(durations.tolist())
    except Exception as e:
        skipped.append((csv_file, f'parse error: {e}'))
        continue

# 4. Report skipped and empty files
if skipped:
    print("\nSkipped files:")
    for fname, reason in skipped:
        print(f"  - {fname}: {reason}")

# 5. Compute statistics if any durations collected
if not all_durations:
    print("\nNo valid Duration data found. Exiting.")
    exit()

dur_series = pd.Series(all_durations, name='duration_sec')
mean = dur_series.mean()
median = dur_series.median()
std = dur_series.std()
q1 = dur_series.quantile(0.25)
q3 = dur_series.quantile(0.75)
iqr = q3 - q1

# 6. Define outlier thresholds using the IQR method
lower_fence = q1 - 1.5 * iqr
upper_fence = q3 + 1.5 * iqr

# 7. Identify outliers
outliers = dur_series[(dur_series < lower_fence) | (dur_series > upper_fence)]

# 8. Print results
print("\n=== Duration Statistics (seconds) ===")
print(f"Mean: {mean:.4f}")
print(f"Median: {median:.4f}")
print(f"Std Dev: {std:.4f}")
print(f"Q1: {q1:.4f}, Q3: {q3:.4f}, IQR: {iqr:.4f}")

print("\n=== Outlier Thresholds ===")
print(f"Lower fence (Q1 - 1.5·IQR): {lower_fence:.4f} sec")
print(f"Upper fence (Q3 + 1.5·IQR): {upper_fence:.4f} sec")

print("\n=== Outliers Detected ===")
print(f"Number of outlier events: {len(outliers)}")
if not outliers.empty:
    print(f"Minimum outlier duration: {outliers.min():.4f} sec")
    print(f"Maximum outlier duration: {outliers.max():.4f} sec")
    print("\nList of outlier durations (first 20 shown):")
    print(outliers.sort_values().head(20).to_string(index=False))
else:
    print("No outliers found.")



=== Duration Statistics (seconds) ===
Mean: 0.1066
Median: 0.1020
Std Dev: 0.0666
Q1: 0.0517, Q3: 0.1532, IQR: 0.1015

=== Outlier Thresholds ===
Lower fence (Q1 - 1.5·IQR): -0.1005 sec
Upper fence (Q3 + 1.5·IQR): 0.3055 sec

=== Outliers Detected ===
Number of outlier events: 12
Minimum outlier duration: 0.3060 sec
Maximum outlier duration: 0.5000 sec

List of outlier durations (first 20 shown):
0.306
0.311
0.319
0.332
0.342
0.343
0.347
0.368
0.388
0.433
0.460
0.500


In [7]:
# 1. Imports
import os
import glob
import pandas as pd
import soundfile as sf

# 2. Time parser (as before)
def parse_time_to_seconds(time_str: str) -> float:
    parts = time_str.split(':')
    if len(parts) == 3:
        hours = float(parts[0]); minutes = float(parts[1]); seconds = float(parts[2])
    elif len(parts) == 2:
        hours = 0.0; minutes = float(parts[0]); seconds = float(parts[1])
    else:
        raise ValueError(f"Unexpected time format: {time_str}")
    return hours * 3600 + minutes * 60 + seconds

# 3. Paths
root_dir = 'XXXX'
csv_pattern = os.path.join(root_dir, '**', '*.csv')
wav_pattern = os.path.join(root_dir, '**', '*.wav')

csv_files = glob.glob(csv_pattern, recursive=True)
wav_files = glob.glob(wav_pattern, recursive=True)


# 4. Sum up USV durations from CSVs
total_usv_time = 0.0
for f in csv_files:
    try:
        df = pd.read_csv(f, sep=r'[,\t]+', engine='python', dtype=str)
    except:
        continue
    df.columns = df.columns.str.strip()
    if 'Duration' not in df.columns:
        continue
    df = df.dropna(subset=['Duration'])
    df = df[df['Duration'].str.strip() != '']
    if df.empty:
        continue
    total_usv_time += df['Duration'].apply(parse_time_to_seconds).sum()

# 5. Sum up total audio time from WAVs
total_audio_time = 0.0
for wav in wav_files:
    try:
        data, sr = sf.read(wav, dtype='float32')
    except:
        continue
    duration = len(data) / sr
    total_audio_time += duration

# 6. Compute non-USV time and ratio
total_no_usv_time = total_audio_time - total_usv_time
ratio = total_no_usv_time / total_usv_time if total_usv_time > 0 else float('inf')

# 7. Print results
print("\n=== Summary ===")
print(f"Total audio duration:       {total_audio_time:.2f} sec")
print(f"Total USV event duration:   {total_usv_time:.2f} sec")
print(f"Total non-USV duration:     {total_no_usv_time:.2f} sec")
print(f"Non-USV is {ratio:.1f}× USV duration")



=== Summary ===
Total audio duration:       16024.57 sec
Total USV event duration:   142.87 sec
Total non-USV duration:     15881.70 sec
Non-USV is 111.2× USV duration


In [9]:
# Configuration
ROOT_DIR             = 'XXXX'
USV_CSV_PATTERN      = ROOT_DIR + '/**/*.csv'
WAV_PATTERN          = ROOT_DIR + '/**/*.wav'
DATASET_CSV          = 'RatPup_dataset.csv'
DATASET_NAME         = 'RatPup'

WINDOW_LENGTH_SEC    = 0.220    # 220 ms windows
STEP_SEC             = 0.110    # 50% overlap
TRAIN_FRAC           = 0.70
VAL_FRAC             = 0.20
TEST_FRAC            = 0.10

# Heuristic target class fractions
ACTUAL_POS_FRAC      = 0.0089   # ≈ 0.89%
ACTUAL_NEG_FRAC      = 0.9911   # ≈ 99.11%
TARGET_POS_FRAC      = (ACTUAL_POS_FRAC + 0.50) / 2   # ≈ 25.44%
TARGET_NEG_FRAC      = (ACTUAL_NEG_FRAC + 0.50) / 2   # ≈ 74.56%

# For window-count–based negative sampling
NEG_PER_POS          = TARGET_NEG_FRAC / TARGET_POS_FRAC

RANDOM_SEED          = 42

# 1. Imports
import glob
import os
import random
import pandas as pd
import soundfile as sf
from sklearn.model_selection import train_test_split

random.seed(RANDOM_SEED)

# 2. Parse time string to seconds
def parse_time_to_seconds(ts: str) -> float:
    parts = ts.split(':')
    if len(parts) == 3:
        h, m, s = parts
    elif len(parts) == 2:
        h, m, s = '0', parts[0], parts[1]
    else:
        raise ValueError(f"Unexpected time format: {ts}")
    return float(h) * 3600 + float(m) * 60 + float(s)

# 3. Load USV events and filter out outliers
durations = []
for csv_file in glob.glob(USV_CSV_PATTERN, recursive=True):
    df = pd.read_csv(csv_file, sep=r'[,\t]+', engine='python', dtype=str)
    df.columns = df.columns.str.strip()
    if 'Duration' not in df.columns or 'Start' not in df.columns:
        continue
    df = df.dropna(subset=['Duration','Start'])
    df = df[df['Duration'].str.strip()!='']
    df['dur_sec']   = df['Duration'].apply(parse_time_to_seconds)
    durations.extend(df['dur_sec'].tolist())

q1 = pd.Series(durations).quantile(0.25)
q3 = pd.Series(durations).quantile(0.75)
upper_fence = q3 + 1.5*(q3 - q1)

usv_events = {}
for csv_file in glob.glob(USV_CSV_PATTERN, recursive=True):
    df = pd.read_csv(csv_file, sep=r'[,\t]+', engine='python', dtype=str)
    df.columns = df.columns.str.strip()
    if 'Duration' not in df.columns or 'Start' not in df.columns:
        continue
    df = df.dropna(subset=['Duration','Start'])
    df = df[df['Duration'].str.strip()!='']
    df['dur_sec']   = df['Duration'].apply(parse_time_to_seconds)
    df['start_sec'] = df['Start'].apply(parse_time_to_seconds)
    df['end_sec']   = df['start_sec'] + df['dur_sec']
    df = df[df['dur_sec'] <= upper_fence]
    if df.empty:
        continue
    wav_file = os.path.splitext(csv_file)[0] + '.wav'
    usv_events[wav_file] = list(zip(df['start_sec'], df['end_sec']))

# 4. Gather all WAV files
all_wavs = glob.glob(WAV_PATTERN, recursive=True)

# 5. Generate fixed-length windows and label
rows = []
for wav in all_wavs:
    try:
        data, sr = sf.read(wav, dtype='float32')
    except:
        continue
    total_dur = len(data) / sr
    events = usv_events.get(wav, [])
    t = 0.0
    while t + WINDOW_LENGTH_SEC <= total_dur:
        start = t
        end = t + WINDOW_LENGTH_SEC
        is_pos = any(end > ev_start and start < ev_end for ev_start, ev_end in events)
        rows.append({
            'file_name': os.path.basename(wav),
            'start'    : start,
            'duration' : WINDOW_LENGTH_SEC,
            'end'      : end,
            'usv'      : int(is_pos),
            'dataset'  : DATASET_NAME
        })
        t += STEP_SEC

df = pd.DataFrame(rows)
pos_df = df[df['usv']==1]
neg_df = df[df['usv']==0]

# 6. Window-count–based negative sampling
n_neg_needed = int(len(pos_df) * NEG_PER_POS)
n_neg_needed = min(n_neg_needed, len(neg_df))  # in case of shortage
neg_df = neg_df.sample(n=n_neg_needed, random_state=RANDOM_SEED)

# 7. Combine and shuffle
combined = pd.concat([pos_df, neg_df]).sample(frac=1, random_state=RANDOM_SEED).reset_index(drop=True)

# 8. Split train/val/test with stratification
train, temp = train_test_split(combined, train_size=TRAIN_FRAC, stratify=combined['usv'], random_state=RANDOM_SEED)
val_size = VAL_FRAC / (VAL_FRAC + TEST_FRAC)
val, test = train_test_split(temp, train_size=val_size, stratify=temp['usv'], random_state=RANDOM_SEED)

for subset, name in [(train,'train'), (val,'val'), (test,'test')]:
    subset['type'] = name

# 9. Save dataset CSV
dataset = pd.concat([train, val, test], ignore_index=True)
dataset = dataset[['file_name','start','duration','end','usv','type','dataset']]
dataset.to_csv(DATASET_CSV, index=False)

print(f"Dataset saved to {DATASET_CSV}")
print(dataset['type'].value_counts(normalize=True))
print(dataset['usv'].value_counts(normalize=True))


Dataset saved to RatPup_dataset.csv
type
train    0.699974
val      0.200017
test     0.100009
Name: proportion, dtype: float64
usv
0    0.745535
1    0.254465
Name: proportion, dtype: float64
