In [12]:
import json
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List

import dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from tqdm.auto import tqdm

from audio_processing import get_vad_slices
from models.audio import SlicedAudioFile
from models.events import ComedySession
from utils import get_sliced_audio_base64

dotenv.load_dotenv(dotenv_path=dotenv.find_dotenv())

DATASET_DIR = Path("./dataset").absolute()
SLICE_CACHE = DATASET_DIR / "slice_cache.jsonl"
AUDIO_DIR = (DATASET_DIR / "audio_outputs").absolute()
audio_files = list(AUDIO_DIR.glob('*.flac'))

print(f'Found {len(audio_files)} audio files in {AUDIO_DIR}')

Found 31 audio files in /Users/riverfog7/Workspace/SCA/data_prep/dataset/audio_outputs


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
file_list: List[SlicedAudioFile] = []

if not SLICE_CACHE.exists():
    for audio_path in tqdm(audio_files):
        tqdm.write(f'Processing {audio_path.name}...')
        file_list.append(get_vad_slices(audio_path, slice_min=2))

    with open(SLICE_CACHE, 'w') as f:
        for file in file_list:
            f.write(file.model_dump_json() + '\n')

with open(SLICE_CACHE, 'r') as f:
    for line in f:
        file_list.append(SlicedAudioFile.model_validate_json(line))

chunk_lst = [slice for file in file_list for slice in file.slices]
print(f"Total {sum(len(file.slices) for file in file_list)} slices found in {len(file_list)} audio files.")

Total 660 slices found in 31 audio files.


In [8]:
template = ChatPromptTemplate.from_messages([
    ("system", """You are an advanced audio analysis AI specializing in Standup Comedy transcription.
Your task is to listen to the audio and generate a high-precision JSON timeline according to this schema:
{schema}

### CRITICAL TRANSCRIPTION PROTOCOLS:

1. **CONTEXT & SEGMENTATION (Chunk Processing):**
   - **Partial Input:** The provided audio is a specific **chunk** sliced from a longer performance. It is NOT the full video.
   - **Boundary Handling:** Do not hallucinate words that might be cut off at the very start or very end of the file. Transcribe exactly what is audible within this slice.
   - **Scope:** Your timeline must strictly reflect events occurring within this specific audio segment.

2. **SOURCE SEPARATION (Comedian vs. Audience vs. Environment):**
   - You must acoustically distinguish between the three distinct sound sources defined in the schema:
   - **ComedianEvent:** Speech, breathing, or self-laughter coming specifically from the microphone/performer.
   - **AudienceEvent:** Strictly crowd reactions (cheering, applause, collective laughter, heckling).
   - **EnvironmentEvent:** Non-human or background sounds (Music, technical noise, accidental noise).

3. **TEMPORAL DYNAMICS (Overlaps):**
   - **Allow Overlaps:** Real comedy is fluid. Audience laughter often begins *before* the comedian finishes their punchline.
   - **Timestamp Accuracy:** Capture these overlaps precisely. Do not artificially force events to happen sequentially if they occur simultaneously.

4. **CLASSIFICATION & 'OTHER' HANDLING:**
   - **Strict Categorization:** Attempt to categorize sounds using the specific Enums provided in the schema (e.g., 'laughter', 'applause', 'mic_feedback').
   - **The 'Other' Fallback:** If a sound occurs that does not strictly fit the provided categories, you MUST select the **'other'** option for the `type` or `category` field.
   - **Self-Explanation:** When you select **'other'**, you must explicitly describe the sound in the `content` field.
     - *Example:* If a baby cries in the crowd (and 'crying' isn't an option), set `reaction_type='other'` and `content='[baby crying]'`.
     - *Example:* If a chair squeaks on stage (and 'squeak' isn't an option), set `event_type='other'` and `content='[chair squeak]'`.
   - **Speech Content:** For standard speech, transcribe verbatim.
"""),
    ('user', [
        {"type": "audio_url", "audio_url": {"url": "data:audio/wav;base64,{audio_base64}"}},
        {"type": "text", "text": "Please transcribe this audio chunk and extract the relevant information according to the schema. This chunk belongs to the video: {video_id}. The chunk duration is {duration} seconds."},
    ]),
])

model = ChatOpenAI(
    api_key="EMPTY",
    model=os.getenv('VLLM_MODEL'),
    base_url=os.getenv('VLLM_URL'),
    max_tokens=8192,
    # extra_body={
    #     "structured_outputs": {"json": ImageAnalysis.model_json_schema()}
    # },
).with_structured_output(ComedySession).with_retry()

chain = template | model

In [9]:
chunk_try = chunk_lst[1]
chain.invoke({
    "audio_base64": get_sliced_audio_base64(AUDIO_DIR, chunk_try),
    "video_id": chunk_try.file,
    "schema": ComedySession.model_json_schema(),
    "duration": chunk_try.duration,
})

ComedySession(video_id='Judge Me If You Can! Ep01 ft. @ComicKaustubhAgarwal & @yuvrajdua4094.flac', timeline=[AudienceEvent(start=0.0, end=0.8, role='audience', content='Audience laughing', reaction_type='laughter'), ComedianEvent(start=0.8, end=5.0, role='comedian', content='Isse lal patth ka pata hai na? Chali to karaya. Bilkul hai.', event_type='speech', delivery_tag='energetic'), ComedianEvent(start=5.0, end=7.5, role='comedian', content='Lal patth wale isko bolte hain ki aapke full body mein do mahine lage hain, bhai.', event_type='speech', delivery_tag='energetic'), AudienceEvent(start=7.5, end=9.5, role='audience', content='Audience laughing', reaction_type='laughter'), ComedianEvent(start=9.5, end=13.9, role='comedian', content='Kya hum contestant ko bulane se pehle hi pooch len? Inlo ka stake pe kya laga hua hai?', event_type='speech', delivery_tag='energetic'), ComedianEvent(start=13.9, end=16.1, role='comedian', content='Woh bada bura laga raha kartehu, lekin yeh perfume mei

In [13]:
OUTPUT_FILE = DATASET_DIR / "inference_results.jsonl"
file_lock = threading.Lock()
with open(OUTPUT_FILE, 'w') as f:
    pass
def process_and_save_chunk(chunk):
    try:
        inference_result = chain.invoke({
            "audio_base64": get_sliced_audio_base64(AUDIO_DIR, chunk),
            "video_id": "placeholder",
            "schema": ComedySession.model_json_schema(),
            "duration": chunk.duration,
        })
        chunk_data = chunk.model_dump() if hasattr(chunk, 'model_dump') else vars(chunk)

        combined_record = {
            "chunk_metadata": chunk_data,
            "inference": inference_result.model_dump()
        }

        with file_lock:
            with open(OUTPUT_FILE, 'a') as f:
                f.write(json.dumps(combined_record) + "\n")
        return combined_record

    except Exception as e:
        raise RuntimeError(f"Failed on chunk {chunk}: {str(e)}")

results = []
num_workers = 32

print(f"Starting processing with {num_workers} workers. Saving to {OUTPUT_FILE}...")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
    future_to_chunk = {
        executor.submit(process_and_save_chunk, chunk): chunk
        for chunk in chunk_lst
    }

    for future in tqdm(as_completed(future_to_chunk), total=len(chunk_lst)):
        chunk = future_to_chunk[future]
        try:
            result = future.result()
            results.append(result)
        except Exception as exc:
            tqdm.write(f"Error processing {chunk.file} [{chunk.start}-{chunk.end}]: {exc}")

print(f"\nProcessing complete. {len(results)} chunks successfully processed.")

Starting processing with 32 workers. Saving to /Users/riverfog7/Workspace/SCA/data_prep/dataset/inference_results.jsonl...


 45%|████▌     | 297/660 [49:55<1:01:01, 10.09s/it]


AttributeError: 'AudioSlice' object has no attribute 'start'