In [3]:
import os
import base64
import requests
import io
import subprocess
from dotenv import load_dotenv
from pydub import AudioSegment



In [9]:
# Load environment variables
load_dotenv()

# Deepgram API config
DG_API_KEY = os.getenv("DEEPGRAM_API_KEY")
MODEL_NAME = "model=aura-2-callista-en"  

In [7]:
def play_audio_ffplay(wav_io: io.BytesIO):
    """Plays in-memory WAV audio using ffplay (must be installed)."""
    process = subprocess.Popen(
        ["ffplay", "-autoexit", "-", "-nodisp"],
        stdin=subprocess.PIPE,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )
    process.stdin.write(wav_io.read())
    process.stdin.close()
    process.wait()

In [11]:
DEEPGRAM_API_KEY

'054ed5b3845b5790677deb151dd4192b1fe183df'

In [29]:
import os
import subprocess
import shutil
from dotenv import load_dotenv
from deepgram import DeepgramClient, SpeakWSOptions, SpeakWebSocketEvents

# Load .env with API key
load_dotenv()
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")

def is_installed(command: str) -> bool:
    return shutil.which(command) is not None

def synthesize_response(text: str):
    if not is_installed("ffplay"):
        raise RuntimeError("ffplay (from ffmpeg) is required. Please install it and add to PATH.")

    try:
        # Initialize Deepgram client
        deepgram = DeepgramClient(DEEPGRAM_API_KEY)

        # Create WebSocket connection
        connection = deepgram.speak.websocket.v("1")

        # Start ffplay to play raw audio
        player = subprocess.Popen(
            ["ffplay", "-autoexit", "-nodisp", "-"],
            stdin=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )

        # Handle audio event (MUST take 1 positional arg: event object)
        def on_audio(event):
            if player.stdin and hasattr(event, "data") and event.data:
                player.stdin.write(event.data)
                player.stdin.flush()

        # Register the handler
        connection.on(SpeakWebSocketEvents.AudioData, on_audio)

        # Configure options
        options = SpeakWSOptions(
            model="aura-2-thalia-en",
            encoding="linear16",
            sample_rate=16000
        )

        # Start WebSocket connection
        connection.start(options)
        connection.send_text(text)
        connection.flush()
        connection.wait_for_complete()
        connection.finish()

        # Close ffplay
        if player.stdin:
            player.stdin.close()
        player.wait()

    except Exception as e:
        print(f"Error during TTS synthesis or playback: {e}")

if __name__ == "__main__":
    synthesize_response("This is a working Deepgram TTS integration.")

speaker is None. Return immediately


Error during TTS synthesis or playback: DeepgramError: Speaker is not initialized


Exception in AbstractSyncWebSocketClient._listening: synthesize_response.<locals>.on_audio() got an unexpected keyword argument 'data'


In [35]:
import asyncio
from deepgram import (
    DeepgramClient,
    DeepgramClientOptions,
    SpeakWebSocketEvents,
    SpeakWSOptions
)
from dotenv import load_dotenv
import os
import signal

# Load environment variables
load_dotenv()
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")

# Setup Deepgram client with speaker playback enabled
config = DeepgramClientOptions(
    options={
        "speaker_playback": "true"  # Enables real-time audio playback via system speaker
    }
)

deepgram = DeepgramClient(DEEPGRAM_API_KEY, config)

async def synthesize_response(text: str):
    try:
        # Create async WebSocket connection
        dg_connection = deepgram.speak.asyncwebsocket.v("1")

        # Event Handlers (Optional Debug Output)
        async def on_open(self, open, **kwargs):
            print("🔊 Connection opened.")

        async def on_error(self, error, **kwargs):
            print(f"❌ Error: {error}")

        async def on_close(self, close, **kwargs):
            print("✅ Connection closed.")

        # Register events
        dg_connection.on(SpeakWebSocketEvents.Open, on_open)
        dg_connection.on(SpeakWebSocketEvents.Error, on_error)
        dg_connection.on(SpeakWebSocketEvents.Close, on_close)

        # Setup WebSocket TTS options
        options = SpeakWSOptions(
            model="aura-2-thalia-en",
            encoding="linear16",
            sample_rate=16000
        )

        # Start the TTS WebSocket connection
        started = await dg_connection.start(options)
        if not started:
            print("❌ Failed to start the TTS connection.")
            return

        # Send text and flush to initiate playback
        await dg_connection.send_text(text)
        await dg_connection.flush()
        await dg_connection.wait_for_complete()
        await dg_connection.finish()

    except Exception as e:
        print(f"Error during TTS synthesis or playback: {e}")

# Example usage
if __name__ == "__main__":
    await synthesize_response("Hello! I am your voice assistant using Deepgram TTS.")



🔊 Connection opened.
✅ Connection closed.


In [None]:
import asyncio
from dotenv import load_dotenv
import os

from deepgram import (
    DeepgramClient,
    DeepgramClientOptions,
    LiveTranscriptionEvents,
    LiveOptions,
    Microphone,
)

load_dotenv()

class TranscriptCollector:
    def __init__(self):
        self.reset()

    def reset(self):
        self.transcript_parts = []

    def add_part(self, part):
        self.transcript_parts.append(part)

    def get_full_transcript(self):
        return ' '.join(self.transcript_parts)


transcript_collector = TranscriptCollector()

async def start_live_transcription():
    try:
        config = DeepgramClientOptions(options={"keepalive": "true"})
        deepgram = DeepgramClient(os.getenv("DEEPGRAM_API_KEY"), config)

        dg_connection = deepgram.listen.asynclive.v("1")

        queue = asyncio.Queue()

        async def on_message(self, result, **kwargs):
            sentence = result.channel.alternatives[0].transcript
            if not result.speech_final:
                transcript_collector.add_part(sentence)
            else:
                transcript_collector.add_part(sentence)
                full_sentence = transcript_collector.get_full_transcript()
                await queue.put(full_sentence)
                transcript_collector.reset()

        async def on_error(self, error, **kwargs):
            print(f"\n\n{error}\n\n")

        dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
        dg_connection.on(LiveTranscriptionEvents.Error, on_error)

        options = LiveOptions(
            model="nova-2",
            punctuate=True,
            language="en-US",
            encoding="linear16",
            channels=1,
            sample_rate=16000,
            endpointing=True
        )

        await dg_connection.start(options)

        microphone = Microphone(dg_connection.send)
        microphone.start()

        # Silence detection setup
        silence_timeout = 2  # seconds
        last_spoken_time = asyncio.get_event_loop().time()

        while True:
            if not microphone.is_active():
                break

            try:
                result = queue.get_nowait()
                yield result
                last_spoken_time = asyncio.get_event_loop().time()
            except asyncio.QueueEmpty:
                await asyncio.sleep(0.1)

            if asyncio.get_event_loop().time() - last_spoken_time > silence_timeout:
                break

        microphone.finish()
        await dg_connection.finish()
        print("Finished")

    except Exception as e:
        print(f"Could not open socket: {e}")
    finally:
        # Ensure all tasks are properly closed
        if 'microphone' in locals():
            microphone.finish()
        if 'dg_connection' in locals():
            await dg_connection.finish()


In [55]:
async for transcript in start_live_transcription():
    print(f"[Transcribed] {transcript}")


  dg_connection = deepgram.listen.asynclive.v("1")


[Transcribed] Hello. Hello. Hello.
[Transcribed] Hello.
[Transcribed] Hello.
[Transcribed] Hello.
[Transcribed] 


tasks cancelled error: 
tasks cancelled error: 


Finished
