# Voice Mode Conversation Compiler

This notebook helps you explore, preview, and compile voice conversations saved by voice-mode.

Audio files are stored in `~/.voicemode/audio/` with filenames like:
- `YYYYMMDD_HHMMSS_SSS-tts.wav` (Text-to-Speech output)
- `YYYYMMDD_HHMMSS_SSS-stt.wav` (Speech-to-Text input)

In [None]:
# Install required packages if needed
# !pip install gradio pydub scipy numpy pandas

In [None]:
import os
import re
from pathlib import Path
from datetime import datetime
import wave
import json

import gradio as gr
import pandas as pd
import numpy as np
from scipy.io import wavfile
from pydub import AudioSegment
from pydub.silence import split_on_silence

# Configuration
AUDIO_DIR = Path.home() / ".voicemode" / "audio"
TRANSCRIPTIONS_DIR = Path.home() / ".voicemode" / "transcriptions"
OUTPUT_DIR = Path.home() / ".voicemode" / "compilations"
OUTPUT_DIR.mkdir(exist_ok=True)

print(f"Audio directory: {AUDIO_DIR}")
print(f"Transcriptions directory: {TRANSCRIPTIONS_DIR}")
print(f"Output directory: {OUTPUT_DIR}")

In [None]:
def parse_audio_filename(filename):
    """Parse timestamp and type from audio filename."""
    # Pattern: YYYYMMDD_HHMMSS_SSS-{tts|stt}.wav
    match = re.match(r'(\d{8})_(\d{6})_(\d{3})-(tts|stt)\.wav', filename)
    if match:
        date_str, time_str, ms_str, audio_type = match.groups()
        timestamp = datetime.strptime(f"{date_str}_{time_str}", "%Y%m%d_%H%M%S")
        timestamp = timestamp.replace(microsecond=int(ms_str) * 1000)
        return {
            'filename': filename,
            'timestamp': timestamp,
            'type': audio_type,
            'date': date_str,
            'time': time_str
        }
    return None

def load_audio_files():
    """Load all audio files with metadata."""
    files = []
    if AUDIO_DIR.exists():
        for file in sorted(AUDIO_DIR.glob("*.wav")):
            parsed = parse_audio_filename(file.name)
            if parsed:
                parsed['path'] = file
                # Try to load corresponding transcription
                trans_file = TRANSCRIPTIONS_DIR / file.name.replace('.wav', '.txt')
                if trans_file.exists():
                    parsed['transcription'] = trans_file.read_text()
                else:
                    parsed['transcription'] = None
                files.append(parsed)
    return files

# Load all audio files
audio_files = load_audio_files()
print(f"Found {len(audio_files)} audio files")

# Create DataFrame for easier manipulation
df = pd.DataFrame(audio_files)
if not df.empty:
    df = df.sort_values('timestamp')
    print(f"\nDate range: {df['timestamp'].min()} to {df['timestamp'].max()}")
    print(f"TTS files: {len(df[df['type'] == 'tts'])}")
    print(f"STT files: {len(df[df['type'] == 'stt'])}")

In [None]:
def match_conversations(df, time_window_seconds=120):
    """Match TTS and STT files into conversation pairs."""
    conversations = []
    
    # Sort by timestamp
    df_sorted = df.sort_values('timestamp').reset_index(drop=True)
    
    i = 0
    while i < len(df_sorted):
        current = df_sorted.iloc[i]
        
        # Look for the next file within time window
        j = i + 1
        pair = None
        
        while j < len(df_sorted):
            next_file = df_sorted.iloc[j]
            time_diff = (next_file['timestamp'] - current['timestamp']).total_seconds()
            
            # If too far apart, break
            if time_diff > time_window_seconds:
                break
            
            # If different types, we found a pair
            if current['type'] != next_file['type']:
                pair = next_file
                break
            
            j += 1
        
        if pair is not None:
            # Create conversation entry
            if current['type'] == 'tts':
                tts, stt = current, pair
            else:
                stt, tts = current, pair
            
            conversations.append({
                'timestamp': min(current['timestamp'], pair['timestamp']),
                'tts_file': tts['filename'] if tts is not None else None,
                'stt_file': stt['filename'] if stt is not None else None,
                'tts_path': tts['path'] if tts is not None else None,
                'stt_path': stt['path'] if stt is not None else None,
                'tts_text': tts.get('transcription') if tts is not None else None,
                'stt_text': stt.get('transcription') if stt is not None else None,
                'gap_seconds': abs(time_diff) if pair else None
            })
            
            # Skip the pair
            i = max(i + 1, j + 1)
        else:
            # Unpaired file
            conversations.append({
                'timestamp': current['timestamp'],
                'tts_file': current['filename'] if current['type'] == 'tts' else None,
                'stt_file': current['filename'] if current['type'] == 'stt' else None,
                'tts_path': current['path'] if current['type'] == 'tts' else None,
                'stt_path': current['path'] if current['type'] == 'stt' else None,
                'tts_text': current.get('transcription') if current['type'] == 'tts' else None,
                'stt_text': current.get('transcription') if current['type'] == 'stt' else None,
                'gap_seconds': None
            })
            i += 1
    
    return pd.DataFrame(conversations)

# Match conversations
if not df.empty:
    conversations_df = match_conversations(df)
    print(f"\nMatched {len(conversations_df)} conversation segments")
    print(f"Paired conversations: {len(conversations_df[(conversations_df['tts_file'].notna()) & (conversations_df['stt_file'].notna())])}")
    print(f"Unpaired TTS: {len(conversations_df[(conversations_df['tts_file'].notna()) & (conversations_df['stt_file'].isna())])}")
    print(f"Unpaired STT: {len(conversations_df[(conversations_df['tts_file'].isna()) & (conversations_df['stt_file'].notna())])}")
else:
    conversations_df = pd.DataFrame()

In [None]:
def compile_conversation(selected_indices, gap_duration_ms=500, normalize_volume=True):
    """Compile selected conversation segments into a single audio file."""
    if not selected_indices:
        return None, "No segments selected"
    
    # Create silence gap
    silence = AudioSegment.silent(duration=gap_duration_ms)
    
    # Start with empty audio
    compiled = AudioSegment.empty()
    
    for idx in selected_indices:
        if idx >= len(conversations_df):
            continue
            
        row = conversations_df.iloc[idx]
        
        # Add STT (user speech) first if available
        if row['stt_path'] and row['stt_path'].exists():
            stt_audio = AudioSegment.from_wav(row['stt_path'])
            if normalize_volume:
                stt_audio = stt_audio.normalize()
            compiled += stt_audio + silence
        
        # Then add TTS (assistant response)
        if row['tts_path'] and row['tts_path'].exists():
            tts_audio = AudioSegment.from_wav(row['tts_path'])
            if normalize_volume:
                tts_audio = tts_audio.normalize()
            compiled += tts_audio + silence
    
    # Remove trailing silence
    if len(compiled) > gap_duration_ms:
        compiled = compiled[:-gap_duration_ms]
    
    # Export to file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = OUTPUT_DIR / f"conversation_{timestamp}.wav"
    compiled.export(output_path, format="wav")
    
    duration = len(compiled) / 1000.0  # Convert to seconds
    return str(output_path), f"Compiled {len(selected_indices)} segments into {duration:.1f} seconds of audio\nSaved to: {output_path}"

def create_transcript(selected_indices):
    """Create a text transcript of selected conversation segments."""
    if not selected_indices:
        return "No segments selected"
    
    transcript = []
    
    for idx in selected_indices:
        if idx >= len(conversations_df):
            continue
            
        row = conversations_df.iloc[idx]
        timestamp = row['timestamp'].strftime("%Y-%m-%d %H:%M:%S")
        
        transcript.append(f"\n[{timestamp}]")
        
        if row['stt_text']:
            transcript.append(f"User: {row['stt_text']}")
        elif row['stt_file']:
            transcript.append(f"User: [Audio: {row['stt_file']}]")
        
        if row['tts_text']:
            transcript.append(f"Assistant: {row['tts_text']}")
        elif row['tts_file']:
            transcript.append(f"Assistant: [Audio: {row['tts_file']}]")
    
    return "\n".join(transcript)

In [None]:
# Create Gradio interface
with gr.Blocks(title="Voice Mode Conversation Compiler") as demo:
    gr.Markdown("# Voice Mode Conversation Compiler")
    gr.Markdown("Browse, preview, and compile voice conversations into audio files.")
    
    with gr.Tab("Browse Conversations"):
        # Date filter
        with gr.Row():
            if not conversations_df.empty:
                min_date = conversations_df['timestamp'].min().date()
                max_date = conversations_df['timestamp'].max().date()
            else:
                min_date = max_date = datetime.now().date()
            
            date_filter = gr.Dropdown(
                choices=["All"] + sorted(df['date'].unique().tolist() if not df.empty else []),
                value="All",
                label="Filter by Date"
            )
            refresh_btn = gr.Button("Refresh Files", variant="secondary")
        
        # Conversation table
        def format_conversations_table(date_filter="All"):
            filtered_df = conversations_df.copy()
            
            if date_filter != "All" and not filtered_df.empty:
                filtered_df = filtered_df[filtered_df['timestamp'].dt.strftime('%Y%m%d') == date_filter]
            
            if filtered_df.empty:
                return pd.DataFrame(columns=['Time', 'User Input', 'Assistant Response', 'Gap (s)'])
            
            display_df = pd.DataFrame({
                'Index': filtered_df.index,
                'Time': filtered_df['timestamp'].dt.strftime('%H:%M:%S'),
                'User Input': filtered_df.apply(lambda x: x['stt_text'][:50] + '...' if x['stt_text'] and len(x['stt_text']) > 50 else (x['stt_text'] or '[Audio]' if x['stt_file'] else ''), axis=1),
                'Assistant Response': filtered_df.apply(lambda x: x['tts_text'][:50] + '...' if x['tts_text'] and len(x['tts_text']) > 50 else (x['tts_text'] or '[Audio]' if x['tts_file'] else ''), axis=1),
                'Gap (s)': filtered_df['gap_seconds'].apply(lambda x: f"{x:.1f}" if x else "")
            })
            return display_df
        
        conversations_table = gr.Dataframe(
            value=format_conversations_table() if not conversations_df.empty else pd.DataFrame(),
            label="Conversations",
            interactive=False
        )
        
        # Selection
        selected_indices = gr.State([])
        selection_display = gr.Textbox(label="Selected Segments", value="None selected")
        
        with gr.Row():
            select_all_btn = gr.Button("Select All", variant="secondary")
            clear_selection_btn = gr.Button("Clear Selection", variant="secondary")
        
        # Preview section
        with gr.Row():
            with gr.Column():
                preview_index = gr.Number(label="Preview Index", value=0, precision=0)
                preview_audio_user = gr.Audio(label="User Audio", type="filepath")
                preview_text_user = gr.Textbox(label="User Transcription", lines=3)
            
            with gr.Column():
                preview_btn = gr.Button("Preview", variant="primary")
                preview_audio_assistant = gr.Audio(label="Assistant Audio", type="filepath")
                preview_text_assistant = gr.Textbox(label="Assistant Transcription", lines=3)
    
    with gr.Tab("Compile Audio"):
        gr.Markdown("### Compilation Settings")
        
        gap_duration = gr.Slider(
            minimum=0,
            maximum=2000,
            value=500,
            step=100,
            label="Gap Between Segments (ms)"
        )
        
        normalize_volume = gr.Checkbox(label="Normalize Volume", value=True)
        
        compile_btn = gr.Button("Compile Selected Segments", variant="primary")
        
        compile_output = gr.Audio(label="Compiled Audio", type="filepath")
        compile_status = gr.Textbox(label="Status")
        
        gr.Markdown("### Transcript")
        transcript_output = gr.Textbox(label="Conversation Transcript", lines=20)
        export_transcript_btn = gr.Button("Export Transcript", variant="secondary")
    
    # Event handlers
    def refresh_files_handler():
        global audio_files, df, conversations_df
        audio_files = load_audio_files()
        df = pd.DataFrame(audio_files)
        if not df.empty:
            df = df.sort_values('timestamp')
            conversations_df = match_conversations(df)
        else:
            conversations_df = pd.DataFrame()
        return format_conversations_table()
    
    def filter_conversations(date_filter):
        return format_conversations_table(date_filter)
    
    def preview_conversation(index):
        if conversations_df.empty or index >= len(conversations_df) or index < 0:
            return None, "", None, ""
        
        row = conversations_df.iloc[int(index)]
        
        stt_audio = str(row['stt_path']) if row['stt_path'] and row['stt_path'].exists() else None
        stt_text = row['stt_text'] or ""
        tts_audio = str(row['tts_path']) if row['tts_path'] and row['tts_path'].exists() else None
        tts_text = row['tts_text'] or ""
        
        return stt_audio, stt_text, tts_audio, tts_text
    
    def select_all_segments(date_filter):
        filtered_df = conversations_df.copy()
        if date_filter != "All" and not filtered_df.empty:
            filtered_df = filtered_df[filtered_df['timestamp'].dt.strftime('%Y%m%d') == date_filter]
        
        indices = list(filtered_df.index)
        return indices, f"Selected {len(indices)} segments"
    
    def clear_selection():
        return [], "None selected"
    
    def compile_handler(indices, gap_ms, normalize):
        audio_path, status = compile_conversation(indices, gap_ms, normalize)
        transcript = create_transcript(indices)
        return audio_path, status, transcript
    
    def export_transcript(transcript):
        if not transcript:
            return "No transcript to export"
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_path = OUTPUT_DIR / f"transcript_{timestamp}.txt"
        output_path.write_text(transcript)
        return f"Transcript exported to: {output_path}"
    
    # Connect events
    refresh_btn.click(refresh_files_handler, outputs=[conversations_table])
    date_filter.change(filter_conversations, inputs=[date_filter], outputs=[conversations_table])
    
    preview_btn.click(
        preview_conversation,
        inputs=[preview_index],
        outputs=[preview_audio_user, preview_text_user, preview_audio_assistant, preview_text_assistant]
    )
    
    select_all_btn.click(
        select_all_segments,
        inputs=[date_filter],
        outputs=[selected_indices, selection_display]
    )
    
    clear_selection_btn.click(
        clear_selection,
        outputs=[selected_indices, selection_display]
    )
    
    compile_btn.click(
        compile_handler,
        inputs=[selected_indices, gap_duration, normalize_volume],
        outputs=[compile_output, compile_status, transcript_output]
    )
    
    export_transcript_btn.click(
        export_transcript,
        inputs=[transcript_output],
        outputs=[compile_status]
    )

# Launch the interface
if __name__ == "__main__":
    demo.launch(share=False)

## Advanced Features

The cells below provide additional functionality for more advanced use cases.

In [None]:
# Group conversations by session (large time gaps indicate new sessions)
def identify_sessions(conversations_df, session_gap_minutes=30):
    """Group conversations into sessions based on time gaps."""
    if conversations_df.empty:
        return conversations_df
    
    sessions = []
    current_session = 0
    last_timestamp = None
    
    for idx, row in conversations_df.iterrows():
        if last_timestamp is None:
            sessions.append(current_session)
        else:
            gap = (row['timestamp'] - last_timestamp).total_seconds() / 60
            if gap > session_gap_minutes:
                current_session += 1
            sessions.append(current_session)
        
        last_timestamp = row['timestamp']
    
    conversations_df['session'] = sessions
    return conversations_df

# Add session information
if not conversations_df.empty:
    conversations_df = identify_sessions(conversations_df)
    print(f"\nIdentified {conversations_df['session'].nunique()} conversation sessions")
    
    # Show session summary
    session_summary = conversations_df.groupby('session').agg({
        'timestamp': ['min', 'max', 'count']
    })
    session_summary.columns = ['Start', 'End', 'Segments']
    session_summary['Duration'] = session_summary['End'] - session_summary['Start']
    print("\nSession Summary:")
    print(session_summary)

In [None]:
# Audio analysis functions
def analyze_audio_file(audio_path):
    """Analyze audio file properties."""
    audio = AudioSegment.from_wav(audio_path)
    
    return {
        'duration_seconds': len(audio) / 1000.0,
        'channels': audio.channels,
        'frame_rate': audio.frame_rate,
        'sample_width': audio.sample_width,
        'max_dBFS': audio.max_dBFS,
        'rms': audio.rms,
        'file_size_kb': audio_path.stat().st_size / 1024
    }

def remove_silence(audio_segment, min_silence_len=500, silence_thresh=-40, keep_silence=100):
    """Remove silence from audio segment."""
    chunks = split_on_silence(
        audio_segment,
        min_silence_len=min_silence_len,
        silence_thresh=silence_thresh,
        keep_silence=keep_silence
    )
    
    # Combine chunks
    if chunks:
        return sum(chunks)
    return audio_segment

# Example: Analyze a specific audio file
if not df.empty:
    sample_file = df.iloc[0]['path']
    print(f"\nAnalyzing {sample_file.name}:")
    analysis = analyze_audio_file(sample_file)
    for key, value in analysis.items():
        print(f"  {key}: {value}")

In [None]:
# Export functions for different formats
def export_to_mp3(audio_segment, output_path, bitrate="192k"):
    """Export audio to MP3 format."""
    audio_segment.export(output_path, format="mp3", bitrate=bitrate)
    return output_path

def export_session_to_podcast(session_id, intro_text=None, outro_text=None):
    """Export a complete session as a podcast-style audio file."""
    session_df = conversations_df[conversations_df['session'] == session_id]
    
    if session_df.empty:
        return None, "No conversations in session"
    
    # Compile the session
    indices = list(session_df.index)
    
    # Create the main content
    compiled = AudioSegment.empty()
    silence = AudioSegment.silent(duration=800)  # Longer pauses for podcast style
    
    for idx in indices:
        row = conversations_df.iloc[idx]
        
        # Add conversation
        if row['stt_path'] and row['stt_path'].exists():
            user_audio = AudioSegment.from_wav(row['stt_path'])
            user_audio = user_audio.normalize()
            # Reduce user audio volume slightly
            user_audio = user_audio - 3
            compiled += user_audio + silence
        
        if row['tts_path'] and row['tts_path'].exists():
            assistant_audio = AudioSegment.from_wav(row['tts_path'])
            assistant_audio = assistant_audio.normalize()
            compiled += assistant_audio + silence
    
    # Add intro/outro if provided (would need TTS to generate these)
    # This is a placeholder for the concept
    
    # Export as MP3
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = OUTPUT_DIR / f"podcast_session_{session_id}_{timestamp}.mp3"
    
    compiled.export(output_path, format="mp3", bitrate="192k")
    
    duration = len(compiled) / 1000.0 / 60.0  # Convert to minutes
    return str(output_path), f"Exported session {session_id} as podcast ({duration:.1f} minutes)"

# Example: Export first session as podcast
if not conversations_df.empty and 'session' in conversations_df.columns:
    first_session = conversations_df['session'].iloc[0]
    print(f"\nExporting session {first_session} as podcast...")
    # Uncomment to actually export:
    # output, status = export_session_to_podcast(first_session)
    # print(status)