In [16]:
import torch as t
import numpy as np
import pandas as pd
import os
import random
import re
import csv
import tqdm
import torchaudio

import librosa
import soundfile
from ast import literal_eval
from transformers import AutoProcessor, WhisperForConditionalGeneration
from datasets import load_dataset, DatasetDict, Audio
from pydub import AudioSegment

device = t.device("mps" if t.backends.mps.is_available() else "cuda" if t.cuda.is_available() else "cpu")
model_id = "openai/whisper-small"
sr=16000
random.seed(100) 


In [17]:
processor = AutoProcessor.from_pretrained(model_id)
model = WhisperForConditionalGeneration.from_pretrained(model_id).to(device)
model.generation_config.language = "english"
model.generation_config.task = "transcribe"
processor.feature_extractor.return_attention_mask = True

In [None]:

def audiosegment_to_array(seg, target_sr=sr):
    samples = np.array(seg.get_array_of_samples())
    max_val = float(1 << (8 * seg.sample_width - 1))
    y = samples.astype(np.float32) / max_val
    if seg.frame_rate != target_sr:
        y = librosa.resample(y, orig_sr=seg.frame_rate, target_sr=target_sr)
    return y

def transcribe_batch(segments, processor, model, sr=16000, device=None, batch_size=8):
    model.eval()
    texts = []
    for i in range(0, len(segments), batch_size):
        chunk = segments[i:i+batch_size]
        arrays = [audiosegment_to_array(seg, target_sr=sr) for seg in chunk]
        inputs = processor(arrays, sampling_rate=sr, return_tensors="pt", padding=True)
        input_features = inputs["input_features"].to(device)
        attention_mask = inputs["attention_mask"].to(device)
        gen_ids = model.generate(input_features=input_features, attention_mask=attention_mask, task="transcribe")  
        texts.extend(processor.batch_decode(gen_ids, skip_special_tokens=True))
    return [t.strip() for t in texts]


def create_trial(condition, target, frames, talker, n_continuum):
    endpoints = target.split("_")
    frames = [f for f in frames if endpoints[0] not in f and endpoints[1] not in f]
    random.shuffle(frames)

    silence = AudioSegment.silent(duration=500)
    combined = AudioSegment.silent(duration=0)
    for f in frames:
        audio = AudioSegment.from_wav(f'audio/MP/{talker}/{condition}/{f}')
        combined += audio + silence
    
    trials = [combined + AudioSegment.from_wav(f'audio/MP/{talker}/continuum/{target}_1_{i}.wav') + silence for i in range(n_continuum)]
    return trials, frames

def create_filler(target, frames, talker, n_continuum):
    silence = AudioSegment.silent(duration=500)
    combined = AudioSegment.silent(duration=0)
    for f in frames:
        audio = AudioSegment.from_wav(f'audio/fillers/{talker}/{f}')
        combined += audio + silence

    trials = [combined + AudioSegment.from_wav(f'audio/MP/{talker}/continuum/{target}_1_{i}.wav') + silence for i in range(n_continuum)]

    return trials, frames



In [19]:
def exp(output_path, talkers, n_continuum, n_trials):
    with open(output_path, "w", newline="") as f:
        # Prep csv
        writer = csv.writer(f)
        writer.writerow(["id", "talker", "condition", "context", "transcript", "target", "target_step"])
        # Get possible targets
        for t in talkers:

            # Get all the audio paths
            targets = os.listdir(f'audio/MP/{t}/continuum')
            targets = list(set([re.search(r'([a-zA-Z]+_[a-zA-Z]+)_[0-9]', target).group(1) for target in targets]))
            voiced = os.listdir(f'audio/MP/{t}/voiced')
            voiced = [v for v in voiced if v != '.DS_Store']
            voiceless = os.listdir(f'audio/MP/{t}/voiceless')
            voiceless = [v for v in voiceless if v != '.DS_Store']
            fillers = os.listdir(f'audio/fillers/{t}')
            fillers = [f for f in fillers if f != '.DS_Store']

            # A "trial" here represents every time a target is selected. Iterate throught the three conditions and every step of the continuum for each.
            for n in tqdm.tqdm(range(n_trials)):
                target = targets[n % len(targets)]

                for c in ['voiced', 'voiceless', 'control']:
                    if c == 'voiced':
                        audio, frames = create_trial(c, target, voiced, t, n_continuum)
                    elif c == 'voiceless':
                        audio, frames = create_trial(c, target, voiceless, t, n_continuum)
                    else:
                        random.shuffle(fillers)
                        control_frames = fillers[:len(voiced)-1]
                        audio, frames = create_filler(target, control_frames, t, n_continuum)

                    transcript = transcribe_batch(audio, processor, model, sr, device, batch_size=n_continuum)
                    for i in range(n_continuum):
                        
                        row = [n, t, c, frames, transcript[i], target, i]
                        writer.writerow(row)

exp('data/SelAd.csv', ['hope'], 13, 300)



100%|██████████| 300/300 [55:53<00:00, 11.18s/it] 
