In [1]:
import base64
import io
import json
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List

import dotenv
import soundfile as sf
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from tqdm.auto import tqdm

from audio_processing import get_vad_slices
from models.audio import SlicedAudioFile
from models.events import ComedySession

dotenv.load_dotenv(dotenv_path=dotenv.find_dotenv())

DATASET_DIR = Path("./dataset").absolute()
SLICE_CACHE = DATASET_DIR / "slice_cache.jsonl"
AUDIO_DIR = (DATASET_DIR / "audio_outputs").absolute()
audio_files = list(AUDIO_DIR.glob('*.flac'))

print(f'Found {len(audio_files)} audio files in {AUDIO_DIR}')

  from .autonotebook import tqdm as notebook_tqdm


Found 31 audio files in /Users/riverfog7/Workspace/SCA/data_prep/dataset/audio_outputs


In [2]:
file_list: List[SlicedAudioFile] = []

if not SLICE_CACHE.exists():
    for audio_path in tqdm(audio_files):
        tqdm.write(f'Processing {audio_path.name}...')
        file_list.append(get_vad_slices(audio_path, slice_min=2))

    with open(SLICE_CACHE, 'w') as f:
        for file in file_list:
            f.write(file.model_dump_json() + '\n')

with open(SLICE_CACHE, 'r') as f:
    for line in f:
        file_list.append(SlicedAudioFile.model_validate_json(line))

chunk_lst = [slice for file in file_list for slice in file.slices]
print(f"Total {sum(len(file.slices) for file in file_list)} slices found in {len(file_list)} audio files.")

Total 660 slices found in 31 audio files.


In [3]:
def get_segmented_audio_with_timestamps(base_path: Path, slice, segment_duration: int = 6) -> List[HumanMessage]:
    """
    Split audio slice into segments with text timestamp markers.

    Args:
        segment_duration: Duration of each segment (default 6s, optimal for AuT encoder)

    Returns:
        List of HumanMessage objects with interleaved text/audio content
    """
    audio_path = base_path / slice.file
    info = sf.info(audio_path)
    sr = info.samplerate

    start_frame = int(slice.start_time * sr)
    stop_frame = int(slice.end_time * sr)

    total_frames = stop_frame - start_frame
    segment_frames = int(segment_duration * sr)

    content = [{"type": "text", "text": f"T={slice.start_time:.1f}s (CHUNK START)"}]

    current_time = slice.start_time
    for i in range(0, total_frames, segment_frames):
        segment_start = start_frame + i
        segment_stop = min(segment_start + segment_frames, stop_frame)

        # Read audio segment
        data, _ = sf.read(audio_path, start=segment_start, stop=segment_stop, dtype='int16')

        # Encode to base64
        buffered = io.BytesIO()
        sf.write(buffered, data, sr, format='WAV')
        audio_b64 = base64.b64encode(buffered.getvalue()).decode()

        # Add audio segment
        content.append({
            "type": "audio_url",
            "audio_url": {"url": f"data:audio/wav;base64,{audio_b64}"}
        })

        # Add timestamp marker after each segment
        current_time += segment_duration
        if current_time < slice.end_time:
            content.append({"type": "text", "text": f"T={min(current_time, slice.end_time):.1f}s"})

    content.append({"type": "text", "text": f"T={slice.end_time:.1f}s (CHUNK END)"})
    content.append({
        "type": "text",
        "text": f"Transcribe the above audio from video '{slice.file}'. Use T= markers for accurate timestamp prediction."
    })

    # Return as single HumanMessage with multipart content
    return [HumanMessage(content=content)]

In [4]:
res = get_segmented_audio_with_timestamps(AUDIO_DIR, chunk_lst[4], segment_duration=4)
[val for val in res[0].content if val['type'] == 'text']

[{'type': 'text', 'text': 'T=482.0s (CHUNK START)'},
 {'type': 'text', 'text': 'T=486.0s'},
 {'type': 'text', 'text': 'T=490.0s'},
 {'type': 'text', 'text': 'T=494.0s'},
 {'type': 'text', 'text': 'T=498.0s'},
 {'type': 'text', 'text': 'T=502.0s'},
 {'type': 'text', 'text': 'T=506.0s'},
 {'type': 'text', 'text': 'T=510.0s'},
 {'type': 'text', 'text': 'T=514.0s'},
 {'type': 'text', 'text': 'T=518.0s'},
 {'type': 'text', 'text': 'T=522.0s'},
 {'type': 'text', 'text': 'T=526.0s'},
 {'type': 'text', 'text': 'T=530.0s'},
 {'type': 'text', 'text': 'T=534.0s'},
 {'type': 'text', 'text': 'T=538.0s'},
 {'type': 'text', 'text': 'T=542.0s'},
 {'type': 'text', 'text': 'T=546.0s'},
 {'type': 'text', 'text': 'T=550.0s'},
 {'type': 'text', 'text': 'T=554.0s'},
 {'type': 'text', 'text': 'T=558.0s'},
 {'type': 'text', 'text': 'T=562.0s'},
 {'type': 'text', 'text': 'T=566.0s'},
 {'type': 'text', 'text': 'T=570.0s'},
 {'type': 'text', 'text': 'T=574.0s'},
 {'type': 'text', 'text': 'T=578.0s'},
 {'type': '

In [5]:
template = ChatPromptTemplate.from_messages([
    ("system", """You are an advanced audio analysis AI specializing in Standup Comedy transcription.
Your task is to listen to the audio and generate a high-precision JSON timeline according to this schema:
{schema}

### TIMESTAMP REFERENCE SYSTEM:
- The audio is divided into segments with text timestamp markers (e.g., "T=5.0s", "T=11.0s")
- Use these markers to accurately predict event timestamps
- **DO NOT transcribe the timestamp markers themselves** - they are timing references only
- All predicted timestamps must be between the CHUNK START and CHUNK END markers

**Example interpretation:**
- Text marker: "T=5.0s"
- Audio: *audience laughter*
- Text marker: "T=11.0s"
- Audio: "That's the problem with dating apps"
→ Laughter timestamp: ~5.3s (shortly after T=5.0s marker)
→ Speech start: ~11.2s (shortly after T=11.0s marker)

### CRITICAL TRANSCRIPTION PROTOCOLS:

1. **CONTEXT & SEGMENTATION (Chunk Processing):**
   - **Partial Input:** The provided audio is a specific **chunk** sliced from a longer performance. It is NOT the full video.
   - **Boundary Handling:** Do not hallucinate words that might be cut off at the very start or very end of the file. Transcribe exactly what is audible within this slice.
   - **Scope:** Your timeline must strictly reflect events occurring within this specific audio segment.

2. **SOURCE SEPARATION (Comedian vs. Audience vs. Environment):**
   - You must acoustically distinguish between the three distinct sound sources defined in the schema:
   - **ComedianEvent:** Speech, breathing, or self-laughter coming specifically from the microphone/performer.
   - **AudienceEvent:** Strictly crowd reactions (cheering, applause, collective laughter, heckling).
   - **EnvironmentEvent:** Non-human or background sounds (Music, technical noise, accidental noise).

3. **TEMPORAL DYNAMICS (Overlaps):**
   - **Allow Overlaps:** Real comedy is fluid. Audience laughter often begins *before* the comedian finishes their punchline.
   - **Timestamp Accuracy:** Capture these overlaps precisely using the T= markers as anchors. Do not artificially force events to happen sequentially if they occur simultaneously.

4. **CLASSIFICATION & 'OTHER' HANDLING:**
   - **Strict Categorization:** Attempt to categorize sounds using the specific Enums provided in the schema (e.g., 'laughter', 'applause', 'mic_feedback').
   - **The 'Other' Fallback:** If a sound occurs that does not strictly fit the provided categories, you MUST select the **'other'** option for the `type` or `category` field.
   - **Self-Explanation:** When you select **'other'**, you must explicitly describe the sound in the `content` field.
     - *Example:* If a baby cries in the crowd (and 'crying' isn't an option), set `reaction_type='other'` and `content='[baby crying]'`.
     - *Example:* If a chair squeaks on stage (and 'squeak' isn't an option), set `event_type='other'` and `content='[chair squeak]'`.
   - **Speech Content:** For standard speech, transcribe verbatim.
"""),
    MessagesPlaceholder(variable_name="user_messages"),
])

model = ChatOpenAI(
    api_key="EMPTY",
    model=os.getenv('VLLM_MODEL'),
    base_url=os.getenv('VLLM_URL'),
    max_tokens=8192,
).with_structured_output(ComedySession).with_retry()

chain = template | model

In [6]:
chunk_try = chunk_lst[1]
user_messages = get_segmented_audio_with_timestamps(AUDIO_DIR, chunk_try, segment_duration=6)

chain.invoke({
    "user_messages": user_messages,
    "schema": ComedySession.model_json_schema(),
})

ComedySession(video_id='Judge Me If You Can! Ep01 ft. @ComicKaustubhAgarwal & @yuvrajdua4094.flac', timeline=[ComedianEvent(start=122.2, end=124.8, role='comedian', content='इसे लाल पेस्ट का पता है ना?', event_type='speech', delivery_tag='deadpan'), ComedianEvent(start=125.3, end=126.4, role='comedian', content='चलिए तो कराएँ।', event_type='speech', delivery_tag='deadpan'), AudienceEvent(start=127.3, end=127.6, role='audience', content='[audience laughter]', reaction_type='laughter'), ComedianEvent(start=127.7, end=129.6, role='comedian', content='लाल पेस्ट वाले इसको बोलते हैं कि आपके...', event_type='speech', delivery_tag='deadpan'), ComedianEvent(start=129.7, end=131.0, role='comedian', content='full body में दो महीने लगे हैं भाई।', event_type='speech', delivery_tag='deadpan'), AudienceEvent(start=131.4, end=133.1, role='audience', content='[audience laughter]', reaction_type='laughter'), ComedianEvent(start=133.1, end=134.2, role='comedian', content='आपको...', event_type='speech', d

In [7]:
OUTPUT_FILE = DATASET_DIR / "inference_results.jsonl"
file_lock = threading.Lock()

with open(OUTPUT_FILE, 'w') as f:
    pass

def process_and_save_chunk(chunk):
    try:
        user_messages = get_segmented_audio_with_timestamps(
            AUDIO_DIR,
            chunk,
            segment_duration=4  # Adjust to 4-8s as needed
        )

        inference_result = chain.invoke({
            "user_messages": user_messages,
            "schema": ComedySession.model_json_schema(),
        })

        chunk_data = chunk.model_dump() if hasattr(chunk, 'model_dump') else vars(chunk)

        combined_record = {
            "chunk_metadata": chunk_data,
            "inference": inference_result.model_dump()
        }

        with file_lock:
            with open(OUTPUT_FILE, 'a') as f:
                f.write(json.dumps(combined_record) + "\n")
        return combined_record

    except Exception as e:
        raise RuntimeError(f"Failed on chunk {chunk}: {str(e)}")

results = []
num_workers = 128

print(f"Starting processing with {num_workers} workers. Saving to {OUTPUT_FILE}...")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
    future_to_chunk = {
        executor.submit(process_and_save_chunk, chunk): chunk
        for chunk in chunk_lst
    }

    for future in tqdm(as_completed(future_to_chunk), total=len(chunk_lst)):
        chunk = future_to_chunk[future]
        try:
            result = future.result()
            results.append(result)
        except Exception as exc:
            tqdm.write(f"Error processing {chunk.file} [{chunk.start}-{chunk.end}]: {exc}")

print(f"\nProcessing complete. {len(results)} chunks successfully processed.")

Starting processing with 128 workers. Saving to /Users/riverfog7/Workspace/SCA/data_prep/dataset/inference_results.jsonl...


 67%|██████▋   | 445/660 [1:16:17<36:51, 10.29s/it]  


AttributeError: 'AudioSlice' object has no attribute 'start'