Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] vocode-core 0.1.0 #570

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions apps/telephony_app/speller_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def respond(
human_input: str,
conversation_id: str,
is_interrupt: bool = False,
) -> Tuple[Optional[str], bool]:
) -> Optional[str]:
"""Generates a response from the SpellerAgent.

The response is generated by joining each character in the human input with a space.
Expand All @@ -43,9 +43,9 @@ async def respond(
is_interrupt (bool): A flag indicating whether the agent was interrupted.

Returns:
Tuple[Optional[str], bool]: The generated response and a flag indicating whether to stop.
Optional[str]: The generated response
"""
return "".join(c + " " for c in human_input), False
return "".join(c + " " for c in human_input)


class SpellerAgentFactory(AbstractAgentFactory):
Expand Down
10 changes: 8 additions & 2 deletions docs/open-source/create-your-own-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ You can subclass a [`RespondAgent`](https://github.com/vocodedev/vocode-python/b
To do so, you must create an agent type, create an agent config, and then create your agent subclass. In the examples below, we will create an agent that responds with the same message no matter what is said to it, called `BrokenRecordAgent`.

### Agent type

Each agent has a unique agent type string that is checked in various parts of Vocode, most notably in the factories that create agents. So, you must create a new type for your custom agent. See the `AgentType` enum in `vocode/streaming/models/agent.py` for examples.
For our `BrokenRecordAgent`, we will use "agent_broken_record" as our type.

### Agent config

Your agent must have a corresponding agent config that is a subclass of `AgentConfig` and is ([JSON-serializable](https://docs.pydantic.dev/latest/concepts/serialization/#modelmodel_dump_json)). Serialization is automatically handled by [Pydantic](https://docs.pydantic.dev/latest/).

The agent config should only contain the information you need to deterministically create the same agent each time. This means with the same parameters in your config, the corresponding agent should have the same behavior each time you create it.

For our `BrokenRecordAgent`, we create a config like:

```python
from vocode.streaming.models.agent import AgentConfig

Expand All @@ -24,21 +27,24 @@ class BrokenRecordAgentConfig(AgentConfig, type="agent_broken_record"):
```

### Custom Agent

Now, you can create your custom agent subclass of `RespondAgent`. In your class header, pass in `RespondAgent` with a your agent type as a type hint. This should look like `RespondAgent[Your_Agent_Type]`.

Each agent should override the `generate_response()` async method to support streaming and `respond()` method to support turn-based conversations.
Each agent should override the `generate_response()` async method to support streaming and `respond()` method to support turn-based conversations.

> If you want to only support turn-based conversations, you do not have to overwrite `generate_response()` but you MUST set `generate_response=False` in your agent config (see `ChatVertexAIAgentConfig` in `vocode/streaming/models/agent.py` for an example). Otherwise, you must ALWAYS implement the `generate_response()` async method.

The `generate_response()` method returns an `AsyncGenerator` of tuples containing each message/sentence and a boolean for whether that message can be interrupted by the human speaking. You can automatically create this generator by yielding instead of returning (see example below).

We will now define our `BrokenRecordAgent`. Since we simply return the same message each time, we can return and yield that message in `respond()` and `generate_response()`, respectively:

```python
class BrokenRecordAgent(RespondAgent[BrokenRecordAgentConfig]):

# is_interrupt is True when the human has just interrupted the bot's last response
def respond(
self, human_input, is_interrupt: bool = False
) -> tuple[Optional[str], bool]:
) -> Optional[str]
return self.agent_config.message

async def generate_response(
Expand Down
8 changes: 4 additions & 4 deletions docs/open-source/telephony.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class SpellerAgent(RespondAgent[SpellerAgentConfig]):
human_input: str,
conversation_id: str,
is_interrupt: bool = False,
) -> Tuple[Optional[str], bool]:
return "".join(c + " " for c in human_input), False
) -> Optional[str]:
return "".join(c + " " for c in human_input)


class SpellerAgentFactory(AbstractAgentFactory):
Expand Down Expand Up @@ -182,10 +182,10 @@ class SpellerAgent(BaseAgent):
human_input: str,
conversation_id: str,
is_interrupt: bool = False,
) -> Tuple[Optional[str], bool]:
) -> Optional[str]:
call_config = self.config_manager.get_config(conversation_id)
if call_config is not None:
from_phone = call_config.twilio_from
to_phone = call_config.twilio_to
return "".join(c + " " for c in human_input), False
return "".join(c + " " for c in human_input)
```
101 changes: 49 additions & 52 deletions playground/streaming/agent/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.transcript import Transcript
from vocode.streaming.utils.state_manager import AbstractConversationStateManager
from vocode.streaming.utils.worker import InterruptibleAgentResponseEvent
from vocode.streaming.utils.worker import InterruptibleAgentResponseEvent, QueueConsumer

load_dotenv()

from vocode.streaming.agent import ChatGPTAgent
from vocode.streaming.agent.base_agent import (
AgentResponseMessage,
AgentResponseType,
AgentResponse,
AgentResponse,
BaseAgent,
TranscriptionAgentInput,
)
Expand Down Expand Up @@ -96,6 +96,11 @@ async def run_agent(
):
ended = False
conversation_id = create_conversation_id()
agent_response_queue: asyncio.Queue[InterruptibleAgentResponseEvent[AgentResponse]] = (
asyncio.Queue()
)
agent_consumer = QueueConsumer(input_queue=agent_response_queue)
agent.agent_responses_consumer = agent_consumer

async def receiver():
nonlocal ended
Expand All @@ -106,52 +111,51 @@ async def receiver():

while not ended:
try:
event = await agent.get_output_queue().get()
response = event.payload
if response.type == AgentResponseType.FILLER_AUDIO:
print("Would have sent filler audio")
elif response.type == AgentResponseType.STOP:
print("Agent returned stop")
ended = True
break
elif response.type == AgentResponseType.MESSAGE:
agent_response = typing.cast(AgentResponseMessage, response)

if isinstance(agent_response.message, EndOfTurn):
ignore_until_end_of_turn = False
if random.random() < backchannel_probability:
backchannel = random.choice(BACKCHANNELS)
print("Human: " + f"[{backchannel}]")
agent.transcript.add_human_message(
backchannel,
conversation_id,
is_backchannel=True,
)
elif isinstance(agent_response.message, BaseMessage):
if ignore_until_end_of_turn:
continue

message_sent: str
is_final: bool
# TODO: consider allowing the user to interrupt the agent manually by responding fast
if random.random() < interruption_probability:
stop_idx = random.randint(0, len(agent_response.message.text))
message_sent = agent_response.message.text[:stop_idx]
ignore_until_end_of_turn = True
is_final = False
else:
message_sent = agent_response.message.text
is_final = True

agent.transcript.add_bot_message(
message_sent, conversation_id, is_final=is_final
event = await agent_response_queue.get()
agent_response = event.payload

if isinstance(agent_response.message, EndOfTurn):
ignore_until_end_of_turn = False
if random.random() < backchannel_probability:
backchannel = random.choice(BACKCHANNELS)
print("Human: " + f"[{backchannel}]")
agent.transcript.add_human_message(
backchannel,
conversation_id,
is_backchannel=True,
)
elif isinstance(agent_response.message, BaseMessage):
if ignore_until_end_of_turn:
continue

message_sent: str
is_final: bool
# TODO: consider allowing the user to interrupt the agent manually by responding fast
if random.random() < interruption_probability:
stop_idx = random.randint(0, len(agent_response.message.text))
message_sent = agent_response.message.text[:stop_idx]
ignore_until_end_of_turn = True
is_final = False
else:
message_sent = agent_response.message.text
is_final = True

agent.transcript.add_bot_message(
message_sent, conversation_id, is_final=is_final
)

print("AI: " + message_sent + ("-" if not is_final else ""))
print("AI: " + message_sent + ("-" if not is_final else ""))
except asyncio.CancelledError:
break

async def sender():
if agent.agent_config.initial_message is not None:
agent.agent_responses_consumer.consume_nonblocking(
InterruptibleAgentResponseEvent(
payload=AgentResponse(message=agent.agent_config.initial_message),
agent_response_tracker=asyncio.Event(),
)
)
while not ended:
try:
message = await asyncio.get_event_loop().run_in_executor(
Expand All @@ -175,10 +179,10 @@ async def sender():
actions_worker = None
if isinstance(agent, ChatGPTAgent):
actions_worker = ActionsWorker(
input_queue=agent.actions_queue,
output_queue=agent.get_input_queue(),
action_factory=agent.action_factory,
)
actions_worker.consumer = agent
agent.actions_consumer = actions_worker
actions_worker.attach_conversation_state_manager(agent.conversation_state_manager)
actions_worker.start()

Expand Down Expand Up @@ -215,13 +219,6 @@ async def agent_main():
)
agent.attach_conversation_state_manager(DummyConversationManager())
agent.attach_transcript(transcript)
if agent.agent_config.initial_message is not None:
agent.output_queue.put_nowait(
InterruptibleAgentResponseEvent(
payload=AgentResponseMessage(message=agent.agent_config.initial_message),
agent_response_tracker=asyncio.Event(),
)
)
agent.start()

try:
Expand Down
12 changes: 8 additions & 4 deletions playground/streaming/synthesizer/synthesize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import AzureSynthesizerConfig
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.audio_chunk import AudioChunk
from vocode.streaming.output_device.blocking_speaker_output import BlockingSpeakerOutput
from vocode.streaming.synthesizer.azure_synthesizer import AzureSynthesizer
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer
from vocode.streaming.utils import get_chunk_size_per_second
from vocode.streaming.utils.worker import InterruptibleEvent

if __name__ == "__main__":
import asyncio
Expand All @@ -19,7 +21,7 @@

async def speak(
synthesizer: BaseSynthesizer,
output_device: BaseOutputDevice,
output_device: AbstractOutputDevice,
message: BaseMessage,
):
message_sent = message.text
Expand All @@ -29,7 +31,7 @@ async def speak(
synthesizer.get_synthesizer_config().sampling_rate,
)
# ClientSession needs to be created within the async task
synthesis_result = await synthesizer.create_speech_uncached(
synthesis_result = await synthesizer.create_speech(
message=message,
chunk_size=int(chunk_size),
)
Expand All @@ -38,7 +40,9 @@ async def speak(
try:
start_time = time.time()
speech_length_seconds = seconds_per_chunk * (len(chunk_result.chunk) / chunk_size)
output_device.consume_nonblocking(chunk_result.chunk)
output_device.consume_nonblocking(
InterruptibleEvent(payload=AudioChunk(data=chunk_result.chunk))
)
end_time = time.time()
await asyncio.sleep(
max(
Expand Down
18 changes: 12 additions & 6 deletions playground/streaming/transcriber/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
DeepgramEndpointingConfig,
DeepgramTranscriber,
)
from vocode.streaming.utils.worker import AsyncWorker


class TranscriptionPrinter(AsyncWorker[Transcription]):
async def _run_loop(self):
while True:
transcription: Transcription = await self.input_queue.get()
print(transcription)


if __name__ == "__main__":
import asyncio
Expand All @@ -13,11 +22,6 @@

load_dotenv()

async def print_output(transcriber: BaseTranscriber):
while True:
transcription: Transcription = await transcriber.output_queue.get()
print(transcription)

async def listen():
microphone_input = MicrophoneInput.from_default_device()

Expand All @@ -28,7 +32,9 @@ async def listen():
)
)
transcriber.start()
asyncio.create_task(print_output(transcriber))
transcription_printer = TranscriptionPrinter()
transcriber.consumer = transcription_printer
transcription_printer.start()
print("Start speaking...press Ctrl+C to end. ")
while True:
chunk = await microphone_input.get_audio()
Expand Down
54 changes: 51 additions & 3 deletions tests/fakedata/conversation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import time
from typing import Optional

from pytest_mock import MockerFixture
Expand All @@ -8,7 +10,8 @@
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import PlayHtSynthesizerConfig, SynthesizerConfig
from vocode.streaming.models.transcriber import DeepgramTranscriberConfig, TranscriberConfig
from vocode.streaming.output_device.base_output_device import BaseOutputDevice
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.audio_chunk import ChunkState
from vocode.streaming.streaming_conversation import StreamingConversation
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer
from vocode.streaming.telephony.constants import DEFAULT_CHUNK_SIZE, DEFAULT_SAMPLING_RATE
Expand Down Expand Up @@ -36,8 +39,53 @@
)


class DummyOutputDevice(BaseOutputDevice):
def consume_nonblocking(self, chunk: bytes):
class DummyOutputDevice(AbstractOutputDevice):

def __init__(
self,
sampling_rate: int,
audio_encoding: AudioEncoding,
wait_for_interrupt: bool = False,
chunks_before_interrupt: int = 1,
):
super().__init__(sampling_rate, audio_encoding)
self.wait_for_interrupt = wait_for_interrupt
self.chunks_before_interrupt = chunks_before_interrupt
self.interrupt_event = asyncio.Event()

async def process(self, item):
self.interruptible_event = item
audio_chunk = item.payload

if item.is_interrupted():
audio_chunk.on_interrupt()
audio_chunk.state = ChunkState.INTERRUPTED
else:
audio_chunk.on_play()
audio_chunk.state = ChunkState.PLAYED
self.interruptible_event.is_interruptible = False

async def _run_loop(self):
chunk_counter = 0
while True:
try:
item = await self.input_queue.get()
except asyncio.CancelledError:
return
if self.wait_for_interrupt and chunk_counter == self.chunks_before_interrupt:
await self.interrupt_event.wait()
await self.process(item)
chunk_counter += 1

def flush(self):
while True:
try:
item = self.input_queue.get_nowait()
except asyncio.QueueEmpty:
break
self.process(item)

def interrupt(self):
pass


Expand Down
Loading
Loading