diff --git a/agent-sdk/agent-session/Agent.ts b/agent-sdk/agent-session/Agent.ts new file mode 100644 index 00000000..a1805ac2 --- /dev/null +++ b/agent-sdk/agent-session/Agent.ts @@ -0,0 +1,228 @@ +import type TypedEventEmitter from 'typed-emitter'; +import { EventEmitter } from "events"; +import { ConnectionState, ParticipantEvent, ParticipantKind, RemoteParticipant, Room, RoomEvent, Track } from 'livekit-client'; +import { getParticipantTrackRefs, participantTrackEvents, TrackReference } from '@/agent-sdk/external-deps/components-js'; +import { ParticipantEventCallbacks } from '@/agent-sdk/external-deps/client-sdk-js'; +import { ParticipantAttributes } from '@/agent-sdk/lib/participant-attributes'; + +/** State representing the current connection status to the server hosted agent */ +export type AgentConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'signalReconnecting'; + +/** State representing the current status of the agent, whether it is ready for speach, etc */ +export type AgentConversationalState = 'disconnected' | 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; + +export enum AgentEvent { + VideoTrackChanged = 'videoTrackChanged', + AudioTrackChanged = 'videoTrackChanged', + AgentAttributesChanged = 'agentAttributesChanged', + AgentConnectionStateChanged = 'agentConnectionStateChanged', + AgentConversationalStateChanged = 'agentConversationalStateChanged', +} + +export type AgentCallbacks = { + [AgentEvent.VideoTrackChanged]: (newTrack: TrackReference | null) => void; + [AgentEvent.AudioTrackChanged]: (newTrack: TrackReference | null) => void; + [AgentEvent.AgentAttributesChanged]: (newAttributes: Record) => void; + [AgentEvent.AgentConnectionStateChanged]: (newAgentConnectionState: AgentConnectionState) => void; + [AgentEvent.AgentConversationalStateChanged]: (newAgentConversationalState: AgentConversationalState) => void; +}; + +/** + * Agent encapculates all agent state, normalizing some quirks around how LiveKit Agents work. + */ +export default class Agent extends (EventEmitter as new () => TypedEventEmitter) { + private room: Room; + + connectionState: AgentConnectionState = 'disconnected'; + conversationalState: AgentConversationalState = 'disconnected'; + + private agentParticipant: RemoteParticipant | null = null; + private workerParticipant: RemoteParticipant | null = null; // ref: https://docs.livekit.io/agents/integrations/avatar/#avatar-workers + audioTrack: TrackReference | null = null; + videoTrack: TrackReference | null = null; + + attributes: Record = {}; + + constructor(room: Room) { + super(); + this.room = room; + + this.room.on(RoomEvent.ParticipantConnected, this.handleParticipantConnected); + this.room.on(RoomEvent.ParticipantDisconnected, this.handleParticipantDisconnected); + this.room.on(RoomEvent.ConnectionStateChanged, this.handleConnectionStateChanged); + this.room.localParticipant.on(ParticipantEvent.TrackPublished, this.handleLocalParticipantTrackPublished) + + this.updateConnectionState(); + this.updateConversationalState(); + } + + teardown() { + this.room.off(RoomEvent.ParticipantConnected, this.handleParticipantConnected); + this.room.off(RoomEvent.ParticipantDisconnected, this.handleParticipantDisconnected); + this.room.off(RoomEvent.ConnectionStateChanged, this.handleConnectionStateChanged); + this.room.localParticipant.off(ParticipantEvent.TrackPublished, this.handleLocalParticipantTrackPublished) + } + + private handleParticipantConnected = () => { + this.updateParticipants(); + } + private handleParticipantDisconnected = () => { + this.updateParticipants(); + } + + private handleConnectionStateChanged = () => { + this.updateConnectionState(); + this.updateConversationalState(); + } + + private handleLocalParticipantTrackPublished = () => { + this.updateConversationalState(); + } + + private updateParticipants() { + const newAgentParticipant = this.roomRemoteParticipants.find( + (p) => p.kind === ParticipantKind.AGENT && !(ParticipantAttributes.publishOnBehalf in p.attributes), + ) ?? null; + const newWorkerParticipant = newAgentParticipant ? ( + this.roomRemoteParticipants.find( + (p) => + p.kind === ParticipantKind.AGENT && p.attributes[ParticipantAttributes.publishOnBehalf] === newAgentParticipant.identity, + ) ?? null + ) : null; + + const oldAgentParticipant = this.agentParticipant; + const oldWorkerParticipant = this.workerParticipant; + this.agentParticipant = newAgentParticipant; + this.workerParticipant = newWorkerParticipant; + + // 1. Listen for attribute changes + if (oldAgentParticipant !== this.agentParticipant) { + oldAgentParticipant?.off(ParticipantEvent.AttributesChanged, this.handleAttributesChanged); + + if (this.agentParticipant) { + this.agentParticipant.on(ParticipantEvent.AttributesChanged, this.handleAttributesChanged); + this.handleAttributesChanged(this.agentParticipant.attributes); + } + } + + // 2. Listen for track updates + for (const event of participantTrackEvents) { + if (oldAgentParticipant !== this.agentParticipant) { + oldAgentParticipant?.off(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); + if (this.agentParticipant) { + this.agentParticipant.on(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); + this.handleUpdateTracks(); + } + } + if (oldWorkerParticipant !== this.workerParticipant) { + oldWorkerParticipant?.off(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); + if (this.workerParticipant) { + this.workerParticipant.on(event as keyof ParticipantEventCallbacks, this.handleUpdateTracks); + this.handleUpdateTracks(); + } + } + } + } + + private handleUpdateTracks = () => { + const newVideoTrack = ( + this.agentTracks.find((t) => t.source === Track.Source.Camera) ?? + this.workerTracks.find((t) => t.source === Track.Source.Camera) ?? null + ); + if (this.videoTrack !== newVideoTrack) { + this.videoTrack = newVideoTrack; + this.emit(AgentEvent.VideoTrackChanged, newVideoTrack); + } + + const newAudioTrack = ( + this.agentTracks.find((t) => t.source === Track.Source.Microphone) ?? + this.workerTracks.find((t) => t.source === Track.Source.Microphone) ?? null + ); + if (this.audioTrack !== newAudioTrack) { + this.audioTrack = newAudioTrack; + this.emit(AgentEvent.AudioTrackChanged, newAudioTrack); + } + }; + + private handleAttributesChanged = (attributes: Record) => { + this.attributes = attributes; + this.emit(AgentEvent.AgentAttributesChanged, attributes); + this.updateConnectionState(); + this.updateConversationalState(); + }; + + private updateConnectionState() { + let newConnectionState: AgentConnectionState; + + const roomConnectionState = this.room.state; + if (roomConnectionState === ConnectionState.Disconnected) { + newConnectionState = 'disconnected'; + } else if ( + roomConnectionState === ConnectionState.Connecting || + !this.agentParticipant || + !this.attributes[ParticipantAttributes.state] + ) { + newConnectionState = 'connecting'; + } else { + newConnectionState = roomConnectionState; + } + console.log('!! CONNECTION STATE:', newConnectionState); + + if (this.connectionState !== newConnectionState) { + this.connectionState = newConnectionState; + this.emit(AgentEvent.AgentConnectionStateChanged, newConnectionState); + } + } + + private updateConversationalState() { + let newConversationalState: AgentConversationalState = 'disconnected'; + + if (this.room.state !== ConnectionState.Disconnected) { + newConversationalState = 'initializing'; + } + + // If the microphone preconnect buffer is active, then the state should be "listening" rather + // than "initializing" + const micTrack = this.room.localParticipant.getTrackPublication(Track.Source.Microphone); + if (micTrack) { + newConversationalState = 'listening'; + } + + if (this.agentParticipant && this.attributes[ParticipantAttributes.state]) { + // ref: https://github.com/livekit/agents/blob/65170238db197f62f479eb7aaef1c0e18bfad6e7/livekit-agents/livekit/agents/voice/events.py#L97 + const agentState = this.attributes[ParticipantAttributes.state] as 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; + newConversationalState = agentState; + } + + console.log('!! CONVERSATIONAL STATE:', newConversationalState); + + if (this.conversationalState !== newConversationalState) { + this.conversationalState = newConversationalState; + this.emit(AgentEvent.AgentConversationalStateChanged, newConversationalState); + } + } + + private get roomRemoteParticipants() { + return Array.from(this.room.remoteParticipants.values()); + } + + private get agentTracks() { + if (!this.agentParticipant) { + return []; + } + return getParticipantTrackRefs( + this.agentParticipant, + { sources: [Track.Source.Microphone, Track.Source.Camera] } + ); + } + + private get workerTracks() { + if (!this.workerParticipant) { + return []; + } + return getParticipantTrackRefs( + this.workerParticipant, + { sources: [Track.Source.Microphone, Track.Source.Camera] } + ); + } +} diff --git a/agent-sdk/agent-session/AgentSession.ts b/agent-sdk/agent-session/AgentSession.ts new file mode 100644 index 00000000..999d7856 --- /dev/null +++ b/agent-sdk/agent-session/AgentSession.ts @@ -0,0 +1,367 @@ +import type TypedEventEmitter from 'typed-emitter'; +import { EventEmitter } from "events"; +import { Room, RoomEvent, ConnectionState, TrackPublishOptions } from 'livekit-client'; + +import { + type ReceivedMessage, + type SentMessage, + MessageSender, + MessageReceiver, + ChatMessageSender, + CombinedMessageSender, + CombinedMessageReceiver, + TranscriptionMessageReceiver, + ReceivedMessageAggregator, + type ReceivedMessageAggregatorOptions, + ReceivedMessageAggregatorEvent, + SentMessageOptions, + SentChatMessageOptions, +} from "./message"; +import Agent, { AgentConnectionState, AgentConversationalState, AgentEvent } from './Agent'; +import { ConnectionCredentialsProvider } from './ConnectionCredentialsProvider'; + +export enum AgentSessionEvent { + AgentConnectionStateChanged = 'agentConnectionStateChanged', + AgentConversationalStateChanged = 'agentConversationalStateChanged', + AgentAttributesChanged = 'agentAttributesChanged', + MessageReceived = 'messageReceived', + Disconnected = 'disconnected', + AgentConnectionFailure = 'agentConnectionFailure', + AudioPlaybackStatusChanged = 'AudioPlaybackStatusChanged', +} + +export type AgentSessionCallbacks = { + [AgentSessionEvent.AgentConnectionStateChanged]: (newAgentConnectionState: AgentConnectionState) => void; + [AgentSessionEvent.AgentConversationalStateChanged]: (newAgentConversationalState: AgentConversationalState) => void; + [AgentSessionEvent.MessageReceived]: (newMessage: ReceivedMessage) => void; + [AgentSessionEvent.AgentConnectionFailure]: (reason: string) => void; + [AgentSessionEvent.AudioPlaybackStatusChanged]: (audioPlaybackPermitted: boolean) => void; + [AgentSessionEvent.Disconnected]: () => void; +}; + +export type AgentSessionOptions = { + /** Optional abort signal which if triggered will stop waiting for the room to be disconnected + * prior to connecting + * + * FIXME: is this a confusing property to expose? Maybe expose one `signal` that universally + * could apply across the whole agentSession.connect(...) call? + */ + waitForDisconnectSignal?: AbortSignal; + + /** + * Amount of time in milliseonds the system will wait for an agent to join the room, before + * emitting an AgentSessionEvent.AgentConnectionFailure event. + */ + agentConnectTimeoutMilliseconds?: number; + + // FIXME: not sure about this pattern, background thinking is that it would be good to be able to + // abstract away enabling relevant media tracks to the caller so they don't have to interface with + // the room. + tracks?: { + microphone?: { + enabled?: boolean; + publishOptions?: TrackPublishOptions; + }; + }; +}; + +// FIXME: make this 10 seconds once room dispatch booting info is discoverable +const DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS = 20_000; + + +/** + * AgentSession represents a connection to a LiveKit Agent, providing abstractions to make 1:1 + * agent/participant rooms easier to work with. + */ +export class AgentSession extends (EventEmitter as new () => TypedEventEmitter) { + room: Room; // FIXME: should this be private? + + agent: Agent | null = null; + messageSender: MessageSender | null = null; + messageReceiver: MessageReceiver | null = null; + protected agentConnectTimeoutMilliseconds: AgentSessionOptions["agentConnectTimeoutMilliseconds"] | null = null; + + protected connectionCredentialsProvider: ConnectionCredentialsProvider; + + constructor(provider: ConnectionCredentialsProvider) { + super(); + this.connectionCredentialsProvider = provider; + + this.room = new Room(); + this.room.on(RoomEvent.Connected, this.handleRoomConnected); + this.room.on(RoomEvent.Disconnected, this.handleRoomDisconnected); + this.room.on(RoomEvent.AudioPlaybackStatusChanged, this.handleAudioPlaybackStatusChanged); + + this.prepareConnection().catch(err => { + // FIXME: figure out a better logging solution? + console.warn('WARNING: Room.prepareConnection failed:', err); + }); + } + + async connect(options: AgentSessionOptions = {}) { + const { + waitForDisconnectSignal, + agentConnectTimeoutMilliseconds = DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS, + tracks = { microphone: { enabled: true, publishOptions: { preConnectBuffer: true } } }, + } = options; + this.agentConnectTimeoutMilliseconds = agentConnectTimeoutMilliseconds; + + await this.waitUntilRoomDisconnected(waitForDisconnectSignal); + + await Promise.all([ + this.connectionCredentialsProvider.generate().then(connection => ( + this.room.connect(connection.serverUrl, connection.participantToken) + )), + + // Start microphone (with preconnect buffer) by default + tracks.microphone?.enabled ? ( + this.room.localParticipant.setMicrophoneEnabled(true, undefined, tracks.microphone?.publishOptions ?? {}) + ) : Promise.resolve(), + ]); + + await this.waitUntilAgentIsAvailable(); + } + async disconnect() { + await this.room.disconnect(); + } + + async prepareConnection() { + const credentials = await this.connectionCredentialsProvider.generate(); + await this.room.prepareConnection(credentials.serverUrl, credentials.participantToken); + } + + private handleRoomConnected = () => { + console.log('!! CONNECTED'); + this.agent = new Agent(this.room); + this.agent.on(AgentEvent.AgentConnectionStateChanged, this.handleAgentConnectionStateChanged); + this.agent.on(AgentEvent.AgentConversationalStateChanged, this.handleAgentConversationalStateChanged); + + const chatMessageSender = new ChatMessageSender(this.localParticipant); + this.messageSender = new CombinedMessageSender( + chatMessageSender, + // TODO: other types of messages that can be sent + ); + + this.messageReceiver = new CombinedMessageReceiver( + new TranscriptionMessageReceiver(this.room), + chatMessageSender.generateLoopbackMessageReceiver(), + // TODO: images? attachments? rpc? + ); + (async () => { + // FIXME: is this sort of pattern a better idea than just making MessageReceiver an EventEmitter? + // FIXME: this probably doesn't handle errors properly right now + for await (const message of this.messageReceiver!.messages()) { + this.handleIncomingMessage(message); + } + })(); + + this.startAgentConnectedTimeout(); + } + + private handleRoomDisconnected = () => { + console.log('!! DISCONNECTED'); + this.agent?.off(AgentEvent.AgentConnectionStateChanged, this.handleAgentConnectionStateChanged); + this.agent?.off(AgentEvent.AgentConversationalStateChanged, this.handleAgentConversationalStateChanged); + this.agent?.teardown(); + this.agent = null; + + this.messageReceiver?.close(); + this.messageReceiver = null; + + if (this.agentConnectedTimeout) { + clearTimeout(this.agentConnectedTimeout); + this.agentConnectedTimeout = null; + } + + this.emit(AgentSessionEvent.Disconnected); + } + + private agentConnectedTimeout: NodeJS.Timeout | null = null; + private startAgentConnectedTimeout = () => { + this.agentConnectedTimeout = setTimeout(() => { + if (!this.isAvailable) { + const reason = + this.connectionState === 'connecting' + ? 'Agent did not join the room. ' + : 'Agent connected but did not complete initializing. '; + + this.emit(AgentSessionEvent.AgentConnectionFailure, reason); + this.disconnect(); + } + }, this.agentConnectTimeoutMilliseconds ?? DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS); + } + + private handleAgentConnectionStateChanged = async (newConnectionState: AgentConnectionState) => { + this.emit(AgentSessionEvent.AgentConnectionStateChanged, newConnectionState); + }; + + private handleAgentConversationalStateChanged = async (newConversationalState: AgentConversationalState) => { + this.emit(AgentSessionEvent.AgentConversationalStateChanged, newConversationalState); + }; + + private handleAudioPlaybackStatusChanged = async () => { + this.emit(AgentSessionEvent.AudioPlaybackStatusChanged, this.room.canPlaybackAudio); + }; + + private handleIncomingMessage = (incomingMessage: ReceivedMessage) => { + this.emit(AgentSessionEvent.MessageReceived, incomingMessage); + } + + get connectionState() { + return this.agent?.connectionState ?? 'disconnected'; + } + get conversationalState() { + return this.agent?.conversationalState ?? 'disconnected'; + } + + /** Has the session successfully connected to the running agent? */ + get isConnected() { + return ( + this.connectionState === 'connected' || + this.connectionState === 'reconnecting' || + this.connectionState === 'signalReconnecting' + ); + } + + /** Is the agent ready for user interaction? */ + get isAvailable() { + return ( + this.conversationalState === 'listening' || + this.conversationalState === 'thinking' || + this.conversationalState === 'speaking' + ); + } + + /** Returns a promise that resolves once the agent is available for interaction */ + private async waitUntilAgentIsAvailable(signal?: AbortSignal) { + return new Promise((resolve, reject) => { + const stateChangedHandler = () => { + if (!this.isAvailable) { + return; + } + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error('AgentSession.waitUntilAgentIsAvailable - signal aborted')); + }; + + const cleanup = () => { + this.off(AgentSessionEvent.AgentConnectionStateChanged, stateChangedHandler); + this.off(AgentSessionEvent.AgentConversationalStateChanged, stateChangedHandler); + signal?.removeEventListener('abort', abortHandler); + }; + + this.on(AgentSessionEvent.AgentConnectionStateChanged, stateChangedHandler); + this.on(AgentSessionEvent.AgentConversationalStateChanged, stateChangedHandler); + signal?.addEventListener('abort', abortHandler); + }); + } + + private async waitUntilRoomConnected(signal?: AbortSignal) { + return this.waitUntilRoomState( + ConnectionState.Connected, /* FIXME: should I check for other states too? */ + RoomEvent.Connected, + signal, + ); + } + + private async waitUntilRoomDisconnected(signal?: AbortSignal) { + return this.waitUntilRoomState( + ConnectionState.Disconnected, + RoomEvent.Disconnected, + signal, + ); + } + + private async waitUntilRoomState(state: ConnectionState, stateMonitoringEvent: RoomEvent, signal?: AbortSignal) { + if (this.room.state === state) { + return; + } + + return new Promise((resolve, reject) => { + const onceRoomEventOccurred = () => { + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error(`AgentSession.waitUntilRoomState(${state}, ...) - signal aborted`)); + }; + + const cleanup = () => { + this.room.off(stateMonitoringEvent, onceRoomEventOccurred); + signal?.removeEventListener('abort', abortHandler); + }; + + this.room.on(stateMonitoringEvent, onceRoomEventOccurred); + signal?.addEventListener('abort', abortHandler); + }); + } + + get localParticipant() { + return this.room?.localParticipant ?? null; + } + + /** + * Create a ReceivedMessageAggregator, which allows one to view a snapshot of all received + * messages at the current time. + */ + async createMessageAggregator(options: ReceivedMessageAggregatorOptions = {}) { + await this.waitUntilRoomConnected(); + + const aggregator = new ReceivedMessageAggregator(options); + this.on(AgentSessionEvent.MessageReceived, aggregator.upsert); + this.on(AgentSessionEvent.Disconnected, aggregator.close); + + const closeHandler = () => { + this.off(AgentSessionEvent.MessageReceived, aggregator.upsert); + this.off(AgentSessionEvent.Disconnected, aggregator.close); + aggregator.off(ReceivedMessageAggregatorEvent.Close, closeHandler); + }; + aggregator.on(ReceivedMessageAggregatorEvent.Close, closeHandler); + + return aggregator; + } + + async sendMessage( + message: Message, + options: Message extends SentMessage ? SentMessageOptions : SentChatMessageOptions, + ) { + if (!this.messageSender) { + throw new Error('AgentSession.sendMessage - cannot send message until room is connected and MessageSender initialized!'); + } + const constructedMessage: SentMessage = typeof message === 'string' ? { + id: `${Math.random()}`, /* FIXME: fix id generation */ + direction: 'outbound', + timestamp: new Date(), + content: { type: 'chat', text: message }, + } : message; + await this.messageSender.send(constructedMessage, options); + } + // onMessage?: (callback: (reader: TextStreamReader) => void) => void | undefined; + + // TODO: RPC stuff + // registerRpcHandler: ( + // method: string, + // handler: (data: RpcInvocationData) => Promise, + // ) => void; + // performRpc: (method: string, payload: string) => Promise; + + // TODO: Client media controls + // setCameraEnabled: (enabled: boolean) => Promise; + // setMicrophoneEnabled: (enabled: boolean) => Promise; + // setScreenShareEnabled: (enabled: boolean) => Promise; + // setCameraInput: (deviceId: string) => Promise; + // setMicrophoneInput: (deviceId: string) => Promise; + + // Media Playback + async startAudioPlayback() { + await this.room.startAudio(); + + // FIXME: add audio track to audio element / etc + // This probably needs to contain much of the logic in RoomAudioRenderer? + // And then make a similar type of component that then uses this function internally? + } +} diff --git a/agent-sdk/agent-session/ConnectionCredentialsProvider.ts b/agent-sdk/agent-session/ConnectionCredentialsProvider.ts new file mode 100644 index 00000000..27716623 --- /dev/null +++ b/agent-sdk/agent-session/ConnectionCredentialsProvider.ts @@ -0,0 +1,99 @@ +import { decodeJwt } from 'jose'; + +import { ConnectionDetails } from "@/app/api/connection-details/route"; + +const ONE_MINUTE_IN_MILLISECONDS = 60 * 1000; + +/** + * The ConnectionDetailsProvider handles getting credentials for connecting to a new Room, caching + * the last result and using it until it expires. */ +export abstract class ConnectionCredentialsProvider { + private cachedConnectionDetails: ConnectionDetails | null = null; + + private isCachedConnectionDetailsExpired() { + const token = this.cachedConnectionDetails?.participantToken; + if (!token) { + return true; + } + + const jwtPayload = decodeJwt(token); + if (!jwtPayload.exp) { + return true; + } + const expiresAt = new Date(jwtPayload.exp - ONE_MINUTE_IN_MILLISECONDS); + + const now = new Date(); + return expiresAt >= now; + } + + async generate() { + if (this.isCachedConnectionDetailsExpired()) { + await this.refresh(); + } + + return this.cachedConnectionDetails!; + } + + async refresh() { + this.cachedConnectionDetails = await this.fetch(); + } + + protected abstract fetch(): Promise; +}; + +export class ManualConnectionCredentialsProvider extends ConnectionCredentialsProvider { + protected fetch: () => Promise; + + constructor(handler: () => Promise) { + super(); + this.fetch = handler; + } +} + + +type SandboxConnectionCredentialsProviderOptions = { + sandboxId: string; + baseUrl?: string; + + /** The name of the room to join. If omitted, a random new room name will be generated instead. */ + roomName?: string; + + /** The identity of the participant the token should connect as connect as. If omitted, a random + * identity will be used instead. */ + participantName?: string; +}; + +export class SandboxConnectionCredentialsProvider extends ConnectionCredentialsProvider { + protected options: SandboxConnectionCredentialsProviderOptions; + + constructor(options: SandboxConnectionCredentialsProviderOptions) { + super(); + this.options = options; + + if (process.env.NODE_ENV === 'production') { + // FIXME: figure out a better logging solution? + console.warn('WARNING: SandboxConnectionCredentialsProvider is meant for development, and is not security hardened. In production, implement your own token generation solution.'); + } + } + + async fetch() { + const baseUrl = this.options.baseUrl ?? "https://cloud-api.livekit.io"; + const response = await fetch(`${baseUrl}/api/sandbox/connection-details`, { + method: "POST", + headers: { + "X-Sandbox-ID": this.options.sandboxId, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + roomName: this.options.roomName, + participantName: this.options.participantName, + }), + }); + + if (!response.ok) { + throw new Error(`Error generting token from sandbox token server: ${response.status} ${await response.text()}`); + } + + return response.json(); + } +} diff --git a/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts b/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts new file mode 100644 index 00000000..96b2dce7 --- /dev/null +++ b/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts @@ -0,0 +1,120 @@ +import { EventEmitter } from "events"; +import TypedEventEmitter from "typed-emitter"; +import { ReceivedMessage } from "."; + +export type ReceivedMessageAggregatorOptions = { + /** + * Number of messages to buffer internally before old messages are discarded. If not set, the + * buffer size is unlimited. + */ + bufferSize?: number; + + // FIXME: other options? +}; + +export enum ReceivedMessageAggregatorEvent { + Updated = 'updated', + Close = 'close', +} + +type ReceivedMessageAggregatorCallbacks = { + [ReceivedMessageAggregatorEvent.Updated]: () => void; + [ReceivedMessageAggregatorEvent.Close]: () => void; +}; + +/** A container for storing an ordered list of messages that can be easily changed */ +export default class ReceivedMessageAggregator extends (EventEmitter as new () => TypedEventEmitter) { + private messageById: Map = new Map(); + private messageIds: Array = []; + + private options: ReceivedMessageAggregatorOptions; + private closed: boolean = false; + + constructor(options?: ReceivedMessageAggregatorOptions) { + super(); + this.options = options ?? {}; + } + + /** Create a new aggregator pre-populated with the included messages */ + static fromIterator(input: Iterable, options?: ReceivedMessageAggregatorOptions) { + const aggregator = new this(options); + aggregator.extend(input); + return aggregator; + } + + upsert = (message: Message) => { + this.internalBulkUpsert([message]); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + delete = (message: Message) => { + this.internalBulkDelete([message.id]); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + extend = (input: Iterable) => { + this.internalBulkUpsert(input); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + clear = () => { + this.messageById.clear(); + this.messageIds = []; + } + + private internalBulkUpsert(messages: Iterable) { + if (this.closed) { + throw new Error('ReceivedMessageAggregator is closed and is now immutable, no more messages can be ingested!'); + } + + // FIXME: think through this scenario: + // 1. Message `a` is upserted + // 2. `options.bufferSize` messages are upserted, evicting message `a` + // 3. Another message `a` upsert happens, should this somehow get rejected (via bloom filter / etc?) + // or just end up in the list again as a seemingly brand new message? + for (const message of messages) { + this.messageById.set(message.id, message); + if (!this.messageIds.includes(message.id)) { + this.messageIds.push(message.id); + } + + // Truncate message buffer if it is now too large + const numberOfMessagesToRemove = typeof this.options.bufferSize === 'number' ? ( + this.messageIds.length - this.options.bufferSize + ) : 0; + if (numberOfMessagesToRemove > 0) { + const idsToDelete = this.messageIds.slice(0, numberOfMessagesToRemove); + this.internalBulkDelete(idsToDelete); + } + } + } + private internalBulkDelete(messageIdsToDelete: Array) { + if (this.closed) { + throw new Error('ReceivedMessageAggregator is closed and is now immutable, no more messages can be deleted!'); + } + + for (const id of messageIdsToDelete) { + this.messageById.delete(id); + } + this.messageIds = this.messageIds.filter(id => !messageIdsToDelete.includes(id)); + } + + *[Symbol.iterator]() { + for (const id of this.messageIds) { + const message = this.messageById.get(id); + if (!message) { + continue; + } + yield message; + } + } + + toArray() { + return Array.from(this); + } + + close = () => { + this.closed = true; + this.emit(ReceivedMessageAggregatorEvent.Close); + } +} diff --git a/agent-sdk/agent-session/message/index.ts b/agent-sdk/agent-session/message/index.ts new file mode 100644 index 00000000..371d0d62 --- /dev/null +++ b/agent-sdk/agent-session/message/index.ts @@ -0,0 +1,47 @@ +import { SendTextOptions } from 'livekit-client'; +import { TextStreamInfo } from '@/agent-sdk/external-deps/client-sdk-js'; + +export type BaseMessageId = string; +export type BaseMessage = { + id: BaseMessageId; + direction: Direction; + timestamp: Date; + content: Content; +}; + +export type ReceivedTranscriptionMessage = BaseMessage<'inbound', { + type: 'transcription'; + text: string; + participantInfo: { identity: string }; + streamInfo: TextStreamInfo; +}>; + +export type ReceivedChatLoopbackMessage = BaseMessage<'inbound', { type: 'chat'; text: string }>; + +export type ReceivedMessage = + | ReceivedTranscriptionMessage + | ReceivedChatLoopbackMessage; + // TODO: images? attachments? rpc? + +export type SentChatMessage = BaseMessage<'outbound', | { type: 'chat', text: string }>; +export type SentChatMessageOptions = SendTextOptions | undefined; + +export type SentMessage = + | SentChatMessage; + +export type SentMessageOptions = + | (Message extends SentChatMessage ? SentChatMessageOptions : never); + +// FIXME: maybe update all these functions to not have default exports as to avoid the duplicate +// names being written here? +export { default as MessageSender } from './send/MessageSender'; +export { default as ChatMessageSender } from './send/ChatMessageSender'; +export { default as CombinedMessageSender } from './send/CombinedMessageSender'; +export { default as MessageReceiver } from './receive/MessageReceiver'; +export { default as CombinedMessageReceiver } from './receive/CombinedMessageReceiver'; +export { default as TranscriptionMessageReceiver } from './receive/TranscriptionMessageReceiver'; +export { + default as ReceivedMessageAggregator, + type ReceivedMessageAggregatorOptions, + ReceivedMessageAggregatorEvent, +} from './ReceivedMessageAggregator'; diff --git a/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts b/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts new file mode 100644 index 00000000..0e338986 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts @@ -0,0 +1,31 @@ +import { parallelMerge } from "streaming-iterables"; +import MessageReceiver from "./MessageReceiver"; + +/** + * A `MessageReceiver` that zips together multiple underlying `MessageReceiver`s into one unified source. + */ +export default class CombinedMessageReceiver extends MessageReceiver { + private messageReceivers: Array; + + constructor(...messageReceivers: Array) { + super(); + this.messageReceivers = messageReceivers; + } + + async start() { + const messagesAsyncIterators = this.messageReceivers.map(mr => mr.messages()); + (async () => { + for await (const inboundMessage of parallelMerge(...messagesAsyncIterators)) { + this.enqueue(inboundMessage); + } + })().catch(err => { + this.closeWithError(err); + }); + + return () => { + for (const messageReceiver of this.messageReceivers) { + messageReceiver.close(); + } + }; + } +} diff --git a/agent-sdk/agent-session/message/receive/MessageReceiver.ts b/agent-sdk/agent-session/message/receive/MessageReceiver.ts new file mode 100644 index 00000000..9265d5d4 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/MessageReceiver.ts @@ -0,0 +1,57 @@ +import Future from "@/agent-sdk/lib/future"; +import { type ReceivedMessage } from ".."; + +/** Thrown to signal that a MessageReceiver.messages() generator invocation was terminated out of band */ +export class MessageReceiverTerminationError extends Error {} + +/** + * A MessageReceiver acts as a source for all messages in the system. + */ +export default abstract class MessageReceiver { + private signallingFuture = new Future(); + private queue: Array = []; + + // This returns a cleanup function like useEffect maybe? That could be a good pattern? + abstract start(): Promise void)>; + + /** Submit new IncomingMessages to be received by anybody reading from messages() */ + protected enqueue(...messages: Array) { + for (const message of messages) { + this.queue.push(message); + } + const oldSignallingFuture = this.signallingFuture; + this.signallingFuture = new Future(); + oldSignallingFuture.resolve?.(null); + } + + /** Terminate the messages() iteration from an external source */ + close() { + const name: string = (this as any).constructor.name ?? 'MessageReceiver'; + this.signallingFuture.reject?.( + new MessageReceiverTerminationError(`${name} terminated messages() iteration`) + ); + } + + closeWithError(error: Error) { + this.signallingFuture.reject?.(error); + } + + /** A stream of newly generated `IncomingMessage`s */ + async *messages(): AsyncGenerator { + const cleanup = await this.start(); + try { + while (true) { + await this.signallingFuture.promise; + yield* this.queue; + this.queue = []; + } + } catch (err) { + if (err instanceof MessageReceiverTerminationError) { + cleanup?.(); + return; + } + } finally { + cleanup?.(); + } + } +} diff --git a/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts b/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts new file mode 100644 index 00000000..601e5478 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts @@ -0,0 +1,134 @@ +import { Room, TextStreamReader } from "livekit-client"; +import { DataTopic } from "@/agent-sdk/external-deps/components-js"; +import { TextStreamInfo } from "@/agent-sdk/external-deps/client-sdk-js"; + +import { type ReceivedMessage, type ReceivedTranscriptionMessage } from ".."; +import MessageReceiver from "./MessageReceiver"; +import { ParticipantAttributes } from "@/agent-sdk/lib/participant-attributes"; + +/** + * Processes new `lk.transcription` data stream events generated by the agent for both user and + * LLM generated speach and generates corresponding `TranscriptionReceivedMessage`s. + * + * For agent messages, a new text stream is emitted for each message, and the stream is closed when the message is finalized. + * Each agent message is delivered in chunks which must be accumulated and published into the message stream. + * + * For user messages, the full transcription is sent each time, but may be updated until finalized. + * + * The `lk.segment_id` attribute is stable and unique across the lifetime of the message. + * + * Example agent generated transcriptions: + * ``` + * { segment_id: "1", content: "Hello" } + * { segment_id: "1", content: " world" } + * { segment_id: "1", content: "!" } + * { segment_id: "2", content: "Hello" } + * { segment_id: "2", content: " Apple" } + * { segment_id: "2", content: "!" } + * ``` + * + * Example user generated transcriptions: + * ``` + * { segment_id: "3", content: "Hello" } + * { segment_id: "3", content: "Hello world!" } + * { segment_id: "4", content: "Hello" } + * { segment_id: "4", content: "Hello Apple!" } + * ``` + */ +export default class TranscriptionMessageReceiver extends MessageReceiver { + room: Room; + inFlightMessages: Array = []; + + constructor(room: Room) { + super(); + this.room = room; + } + + async start() { + const textStreamHandler = async (reader: TextStreamReader, participantInfo: { identity: string }) => { + const transcriptionSegmentId = reader.info.attributes?.[ParticipantAttributes.TranscriptionSegmentId]; + const isTranscription = Boolean(transcriptionSegmentId); + const isFinal = reader.info.attributes?.[ParticipantAttributes.TranscriptionFinal] === 'true'; + + let currentStreamId = reader.info.id; + + // Find and update the stream in our array + let messageIndex = this.inFlightMessages.findIndex((message) => { + if (message.content.streamInfo.id === reader.info.id) { + return true; + } + if (isTranscription && transcriptionSegmentId === message.content.streamInfo.attributes?.[ParticipantAttributes.TranscriptionSegmentId]) { + return true; + } + return false; + }); + + // FIXME: I think there may need to be some error handling logic to ensure the below for await + // properly exposes errors via `this.closeWithError` + for await (const chunk of reader) { + const existingMessage = this.inFlightMessages[messageIndex]; + if (existingMessage) { + if (existingMessage.content.streamInfo.id === currentStreamId) { + // Stream hasn't changed, just append content + const updatedMessage = this.appendInFlightMessageText(messageIndex, chunk, reader.info); + this.enqueue(updatedMessage); + } else { + // Stream has changed, so fully replace content + const updatedMessage = this.replaceInFlightMessageText(messageIndex, chunk, reader.info); + this.enqueue(updatedMessage); + } + + } else { + // Handle case where stream ID wasn't found (new message) + const message: ReceivedMessage = { + id: reader.info.id, + direction: 'inbound', + timestamp: new Date(reader.info.timestamp), + content: { + type: 'transcription', + text: chunk, + participantInfo, + streamInfo: reader.info, + }, + }; + this.inFlightMessages.push(message); + messageIndex = this.inFlightMessages.length-1; + this.enqueue(message); + } + } + + if (isFinal) { + this.inFlightMessages.splice(messageIndex, 1); + console.log('!! MESSAGE DONE!', this.inFlightMessages); + } + }; + this.room.registerTextStreamHandler(DataTopic.TRANSCRIPTION, textStreamHandler); + + return () => { + this.room.unregisterTextStreamHandler(DataTopic.TRANSCRIPTION); + }; + } + + private replaceInFlightMessageText(messageIndex: number, text: string, streamInfo: TextStreamInfo) { + this.inFlightMessages[messageIndex] = { + ...this.inFlightMessages[messageIndex], + content: { + ...this.inFlightMessages[messageIndex].content, + text, + streamInfo, + }, + }; + return this.inFlightMessages[messageIndex]; + } + private appendInFlightMessageText(messageIndex: number, text: string, streamInfo: TextStreamInfo) { + this.inFlightMessages[messageIndex] = { + ...this.inFlightMessages[messageIndex], + content: { + ...this.inFlightMessages[messageIndex].content, + text: this.inFlightMessages[messageIndex].content.text + text, + streamInfo, + }, + }; + return this.inFlightMessages[messageIndex]; + } +} diff --git a/agent-sdk/agent-session/message/send/ChatMessageSender.ts b/agent-sdk/agent-session/message/send/ChatMessageSender.ts new file mode 100644 index 00000000..aad69b50 --- /dev/null +++ b/agent-sdk/agent-session/message/send/ChatMessageSender.ts @@ -0,0 +1,78 @@ +import { LocalParticipant } from "livekit-client"; + +import { SentMessage, SentMessageOptions, type ReceivedChatLoopbackMessage, type SentChatMessage } from ".."; +import MessageSender from "./MessageSender"; +import MessageReceiver from "../receive/MessageReceiver"; + + +/** A `MessageSender` for sending chat messages via the `lk.chat` datastream topic. */ +export default class ChatMessageSender extends MessageSender { + private localParticipant: LocalParticipant; + private loopbackReceiverCallbacks: Set<(incomingMessage: SentChatMessage) => void> = new Set(); + + constructor(localParticipant: LocalParticipant) { + super(); + this.localParticipant = localParticipant; + } + + isSentChatMessage(message: SentMessage): message is SentChatMessage { + return message.content.type === 'chat'; + } + + async send(message: SentChatMessage, options: SentMessageOptions) { + if (!this.isSentChatMessage(message)) { + return; + } + // FIXME: maybe there's a more elegant way of doing this, where it also + // gets checked as part of `isSentChatMessage`? + const chatMessageOptions = options as SentMessageOptions; + + for (const callback of this.loopbackReceiverCallbacks) { + callback(message); + } + + await this.localParticipant.sendText(message.content.text, chatMessageOptions); + + // FIXME: do I need to handle sending legacy chat messages too? + // const legacyChatMsg: LegacyChatMessage = { + // id: message.id, + // timestamp: message.timestamp.getTime(), + // message: message.content.text, + // }; + // const encodeLegacyMsg = (message: LegacyChatMessage) => new TextEncoder().encode(JSON.stringify(message)); + // await this.localParticipant.publishData(encodeLegacyMsg(legacyChatMsg), { + // topic: "lk-chat-topic",//LegacyDataTopic.CHAT, + // reliable: true, + // }); + } + + /** + * Generates a corresponding MessageReceiver which will emit "received" versions of each chat + * message, that can be correspondingly merged into the message list. + * + * FIXME: should this be on the MessageSender instead, so this can be done for any sender? + */ + generateLoopbackMessageReceiver() { + const chatMessageSender = this; + class ChatMessageLoopbackReceiver extends MessageReceiver { + async start() { + const callback = (incomingMessage: SentChatMessage) => { + const outgoingMessage: ReceivedChatLoopbackMessage = { + id: incomingMessage.id, + direction: 'inbound', + timestamp: incomingMessage.timestamp, + content: { type: 'chat', text: incomingMessage.content.text }, + }; + this.enqueue(outgoingMessage); + }; + + chatMessageSender.loopbackReceiverCallbacks.add(callback); + return () => { + chatMessageSender.loopbackReceiverCallbacks.delete(callback); + }; + } + } + + return new ChatMessageLoopbackReceiver(); + } +} diff --git a/agent-sdk/agent-session/message/send/CombinedMessageSender.ts b/agent-sdk/agent-session/message/send/CombinedMessageSender.ts new file mode 100644 index 00000000..5cba8316 --- /dev/null +++ b/agent-sdk/agent-session/message/send/CombinedMessageSender.ts @@ -0,0 +1,21 @@ +import { SentMessageOptions, type SentMessage } from ".."; +import MessageSender from "./MessageSender"; + +/** + * A `MessageSender` that routes any `SentMessage` to the first underlying `MessageSender` which + * can accept it. + */ +export default class CombinedMessageSender extends MessageSender { + private messageSenders: Array; + + constructor(...messageSenders: Array) { + super(); + this.messageSenders = messageSenders; + } + + async send(message: SentMessage, options: SentMessageOptions) { + await Promise.all(this.messageSenders.map(async (sender) => { + return sender.send(message, options); + })); + } +} diff --git a/agent-sdk/agent-session/message/send/MessageSender.ts b/agent-sdk/agent-session/message/send/MessageSender.ts new file mode 100644 index 00000000..6f8a874c --- /dev/null +++ b/agent-sdk/agent-session/message/send/MessageSender.ts @@ -0,0 +1,5 @@ +import { SentMessageOptions, type SentMessage } from ".."; + +export default abstract class MessageSender { + abstract send(message: SentMessage, options: SentMessageOptions): Promise; +} diff --git a/agent-sdk/external-deps/client-sdk-js.tsx b/agent-sdk/external-deps/client-sdk-js.tsx new file mode 100644 index 00000000..3f50b888 --- /dev/null +++ b/agent-sdk/external-deps/client-sdk-js.tsx @@ -0,0 +1,21 @@ +// This file contains pieces copied and pasted from the livekit-client package, largely internal +// things that aren't currently being exported. +// +// FIXME: export this stuff in livekit-client or explicitly vendor this stuff into the agents sdk + +export interface BaseStreamInfo { + id: string; + mimeType: string; + topic: string; + timestamp: number; + /** total size in bytes for finite streams and undefined for streams of unknown size */ + size?: number; + attributes?: Record; +} +export interface ByteStreamInfo extends BaseStreamInfo { + name: string; +} + +export interface TextStreamInfo extends BaseStreamInfo {} + +export { type ParticipantEventCallbacks } from "../../node_modules/livekit-client/src/room/participant/Participant"; diff --git a/agent-sdk/external-deps/components-js.tsx b/agent-sdk/external-deps/components-js.tsx new file mode 100644 index 00000000..5b3ae9a9 --- /dev/null +++ b/agent-sdk/external-deps/components-js.tsx @@ -0,0 +1,116 @@ +import { Participant, ParticipantEvent, Track, TrackPublication, TranscriptionSegment } from "livekit-client"; + +// This file contains pieces copied and pasted from the components-js repository +// Something is messed up with my local development environment and I can't figure out how to import +// these properly +// +// FIXME: figure out what is going on here or explicitly vendor this stuff into the agents sdk + +/** @public */ +export type TrackReference = { + participant: Participant; + publication: TrackPublication; + source: Track.Source; +}; + +export const participantTrackEvents = [ + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.TrackMuted, + ParticipantEvent.TrackUnmuted, + ParticipantEvent.TrackStreamStateChanged, + ParticipantEvent.TrackSubscribed, + ParticipantEvent.TrackUnsubscribed, + ParticipantEvent.TrackSubscriptionPermissionChanged, + ParticipantEvent.TrackSubscriptionFailed, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, +]; + +export type ReceivedTranscriptionSegment = TranscriptionSegment & { + receivedAtMediaTimestamp: number; + receivedAt: number; +}; + +export function addMediaTimestampToTranscription( + segment: TranscriptionSegment, + timestamps: { timestamp: number; rtpTimestamp?: number }, +): ReceivedTranscriptionSegment { + return { + ...segment, + receivedAtMediaTimestamp: timestamps.rtpTimestamp ?? 0, + receivedAt: timestamps.timestamp, + }; +} + +/** + * @returns An array of unique (by id) `TranscriptionSegment`s. Latest wins. If the resulting array would be longer than `windowSize`, the array will be reduced to `windowSize` length + */ +export function dedupeSegments( + prevSegments: T[], + newSegments: T[], + windowSize: number, +) { + return [...prevSegments, ...newSegments] + .reduceRight((acc, segment) => { + if (!acc.find((val) => val.id === segment.id)) { + acc.unshift(segment); + } + return acc; + }, [] as Array) + .slice(0 - windowSize); +} + +/** + * Create `TrackReferences` for all tracks that are included in the sources property. + * */ +export function getParticipantTrackRefs( + participant: Participant, + identifier: any/* ParticipantTrackIdentifier */, + onlySubscribedTracks = false, +): TrackReference[] { + const { sources, kind, name } = identifier; + const sourceReferences = Array.from(participant.trackPublications.values()) + .filter( + (pub) => + (!sources || sources.includes(pub.source)) && + (!kind || pub.kind === kind) && + (!name || pub.trackName === name) && + // either return all or only the ones that are subscribed + (!onlySubscribedTracks || pub.track), + ) + .map((track): TrackReference => { + return { + participant: participant, + publication: track, + source: track.source, + }; + }); + + return sourceReferences; +} + +export interface TextStreamData { + text: string; + participantInfo: { identity: string }; // Replace with the correct type from livekit-client + streamInfo: any /* TextStreamInfo */; +} + +export const DataTopic = { + CHAT: 'lk.chat', + TRANSCRIPTION: 'lk.transcription', +} as const; + +export const trackSourceToProtocol = (source: Track.Source) => { + // NOTE: this mapping avoids importing the protocol package as that leads to a significant bundle size increase + switch (source) { + case Track.Source.Camera: + return 1; + case Track.Source.Microphone: + return 2; + case Track.Source.ScreenShare: + return 3; + default: + return 0; + } +}; diff --git a/agent-sdk/index.tsx b/agent-sdk/index.tsx new file mode 100644 index 00000000..919acb6c --- /dev/null +++ b/agent-sdk/index.tsx @@ -0,0 +1,457 @@ +import * as React from "react"; +import { useContext, useEffect, useState, useCallback, useMemo } from "react"; +import { + AudioCaptureOptions, + Participant, + ParticipantEvent, + ScreenShareCaptureOptions, + Track, + TrackPublication, + TrackPublishOptions, + VideoCaptureOptions, +} from "livekit-client"; +import { TrackReference, trackSourceToProtocol } from "@/agent-sdk/external-deps/components-js"; +import { ParticipantEventCallbacks } from "../node_modules/livekit-client/src/room/participant/Participant"; +import { AgentSession, AgentSessionCallbacks, AgentSessionEvent } from "./agent-session/AgentSession"; +import { ReceivedMessage, ReceivedMessageAggregator, ReceivedMessageAggregatorEvent, SentChatMessageOptions, SentMessage, SentMessageOptions } from "./agent-session/message"; +import { AgentCallbacks, AgentEvent } from "./agent-session/Agent"; +import { ParticipantPermission } from "livekit-server-sdk"; +import { usePersistentUserChoices } from "@livekit/components-react"; + +// --------------------- +// REACT +// --------------------- + +const AgentSessionContext = React.createContext(null); +export const AgentSessionProvider: React.FunctionComponent> = ({ agentSession, children }) => ( + + {children} + +); + +export function useAgentSession() { + const agentSession = useContext(AgentSessionContext); + if (!agentSession) { + throw new Error('useAgentSession not used within AgentSessionContext!'); + } + return agentSession; +} + +export function useAgentMessages() { + const agentSession = useAgentSession(); + + const [messagesState, setMessagesState] = useState< + Array | null + >(null); + useEffect(() => { + let aggregator: ReceivedMessageAggregator | null = null; + + const handleUpdated = () => { + if (!aggregator) { + return; + } + setMessagesState(aggregator.toArray()); + }; + + agentSession.createMessageAggregator().then(agg => { + aggregator = agg; + setMessagesState(aggregator.toArray()); + aggregator.on(ReceivedMessageAggregatorEvent.Updated, handleUpdated); + }).catch(err => { + // FIXME: how should this error be handled? + console.error('Error creating message aggregator:', err); + }); + + return () => { + aggregator?.close(); + aggregator?.off(ReceivedMessageAggregatorEvent.Updated, handleUpdated); + setMessagesState(null); + }; + }, [agentSession, agentSession.isAvailable]); + + const send = useCallback(async ( + message: SentMessage | string, + options: Message extends SentMessage ? SentMessageOptions : SentChatMessageOptions, + ) => { + return agentSession.sendMessage(message, options); + }, [agentSession]); + + const { messages, ready } = useMemo(() => { + if (messagesState) { + return { messages: messagesState, ready: true }; + } else { + return { messages: [], ready: false }; + } + }, [messagesState]); + + return { ready, messages, send }; +} + +export function useAgentSessionEvent( + eventName: EventName, + callback: AgentSessionCallbacks[EventName], + dependencies: React.DependencyList, +) { + const agentSession = useAgentSession(); + + // FIXME: is doing this memoiztion here a good idea? Maybe useAgentSessionEvent(..., useCallback(...)) is preferrable? + const memoizedCallback = useCallback(callback, dependencies); + + useEffect(() => { + agentSession.on(eventName, memoizedCallback); + return () => { + agentSession.off(eventName, memoizedCallback); + }; + }, [eventName, memoizedCallback]); +} + +export function useAgentEvent( + eventName: EventName, + callback: AgentCallbacks[EventName], + dependencies: React.DependencyList, +) { + const agentSession = useAgentSession(); + + // FIXME: is doing this memoiztion here a good idea? Maybe useAgentSessionEvent(..., useCallback(...)) is preferrable? + const memoizedCallback = useCallback(callback, dependencies); + + useEffect(() => { + if (!agentSession.agent) { + return; + } + + const agent = agentSession.agent; + agent.on(eventName, memoizedCallback); + return () => { + agent.off(eventName, memoizedCallback); + }; + }, [agentSession.agent, eventName, memoizedCallback]); +} + +export function useAgentState() { + const agentSession = useAgentSession(); + const [connectionState, setConnectionState] = useState(agentSession.connectionState); + const [conversationalState, setConversationalState] = useState(agentSession.conversationalState); + const [isAvailable, setIsAvailable] = useState(agentSession.isAvailable); + const [isConnected, setIsConnected] = useState(agentSession.isConnected); + + useAgentSessionEvent(AgentSessionEvent.AgentConnectionStateChanged, (newState) => { + setConnectionState(newState); + setIsAvailable(agentSession.isAvailable); + setIsConnected(agentSession.isConnected); + }, []); + useAgentSessionEvent(AgentSessionEvent.AgentConversationalStateChanged, (newState) => { + setConversationalState(newState); + setIsAvailable(agentSession.isAvailable); + setIsConnected(agentSession.isConnected); + }, []); + + const legacyState = useMemo((): 'disconnected' | 'connecting' | 'initializing' | 'listening' | 'thinking' | 'speaking' => { + if (connectionState === 'disconnected' || connectionState === 'connecting') { + return connectionState; + } else { + switch (conversationalState) { + case 'initializing': + case 'idle': + return 'initializing'; + + default: + return conversationalState; + } + } + }, [connectionState, conversationalState]); + + return { + connectionState, + conversationalState, + /** @deprecated Use connectionState / conversationalState insread of legacyState */ + legacyState, + isAvailable, + isConnected + }; +} + +export function useAgentTracks() { + const agentSession = useAgentSession(); + + const [audioTrack, setAudioTrack] = useState(agentSession.agent?.audioTrack ?? null); + useAgentEvent(AgentEvent.AudioTrackChanged, setAudioTrack, []); + const [videoTrack, setVideoTrack] = useState(agentSession.agent?.videoTrack ?? null); + useAgentEvent(AgentEvent.VideoTrackChanged, setVideoTrack, []); + + return { audioTrack, videoTrack }; +} + +function useParticipantEvents

( + participant: P, + eventNames: Array, + callback: ParticipantEventCallbacks[EventName], + dependencies: React.DependencyList, +) { + // FIXME: is doing this memoiztion here a good idea? Maybe useAgentSessionEvent(..., useCallback(...)) is preferrable? + const memoizedCallback = useCallback(callback, dependencies); + + useEffect(() => { + for (const eventName of eventNames) { + participant.on(eventName, memoizedCallback); + } + return () => { + for (const eventName of eventNames) { + participant.off(eventName, memoizedCallback); + } + }; + }, [participant, eventNames, memoizedCallback]); +} + +export function useAgentLocalParticipant(options?: { + onDeviceError?: (error: Error, source: Track.Source) => void; + saveUserTrackEnabledChoices?: boolean; +}) { + const agentSession = useAgentSession(); + + const [localParticipant, setLocalParticipant] = React.useState(agentSession.localParticipant); + const [microphoneTrackPublication, setMicrophoneTrackPublication] = React.useState(null); + const [microphoneTrackEnabled, setMicrophoneTrackEnabled] = React.useState(false); + const [microphoneTrackPending, setMicrophoneTrackPending] = React.useState(false); + + const [cameraTrackPublication, setCameraTrackPublication] = React.useState(null); + const [cameraTrackEnabled, setCameraTrackEnabled] = React.useState(false); + const [cameraTrackPending, setCameraTrackPending] = React.useState(false); + + const [screenShareTrackPublication, setScreenShareTrackPublication] = React.useState(null); + const [screenShareTrackEnabled, setScreenShareTrackEnabled] = React.useState(false); + const [screenShareTrackPending, setScreenShareTrackPending] = React.useState(false); + + const [permissions, setPermissions] = React.useState(null); + + useParticipantEvents(agentSession.localParticipant, [ + ParticipantEvent.TrackMuted, + ParticipantEvent.TrackUnmuted, + ParticipantEvent.ParticipantPermissionsChanged, + // ParticipantEvent.IsSpeakingChanged, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ParticipantEvent.MediaDevicesError, + ParticipantEvent.TrackSubscriptionStatusChanged, + // ParticipantEvent.ConnectionQualityChanged, + ], () => { + setLocalParticipant(agentSession.localParticipant); + setPermissions(agentSession.localParticipant.permissions ?? null); + + // FIXME: is the rest of this stuff needed? + // const { isMicrophoneEnabled, isCameraEnabled, isScreenShareEnabled } = p; + const microphoneTrack = agentSession.localParticipant.getTrackPublication(Track.Source.Microphone); + setMicrophoneTrackPublication(microphoneTrack ?? null); + setMicrophoneTrackEnabled(localParticipant.isMicrophoneEnabled); + + const cameraTrack = agentSession.localParticipant.getTrackPublication(Track.Source.Camera); + setCameraTrackPublication(cameraTrack ?? null); + setCameraTrackEnabled(localParticipant.isCameraEnabled); + + const screenShareTrack = agentSession.localParticipant.getTrackPublication(Track.Source.ScreenShare); + setScreenShareTrackPublication(screenShareTrack ?? null); + setScreenShareTrackEnabled(localParticipant.isScreenShareEnabled); + }, []); + + const publishPermissions = useMemo(() => { + const canPublishSource = (source: Track.Source) => { + return ( + permissions?.canPublish && + (permissions.canPublishSources.length === 0 || + permissions.canPublishSources.includes(trackSourceToProtocol(source))) + ); + }; + + return { + camera: canPublishSource(Track.Source.Camera), + microphone: canPublishSource(Track.Source.Microphone), + screenShare: canPublishSource(Track.Source.ScreenShare), + data: permissions?.canPublishData ?? false, + }; + }, [permissions]); + + const microphoneTrack: TrackReference | null = React.useMemo(() => { + if (!microphoneTrackPublication) { + return null; + } + return { + participant: localParticipant, + source: Track.Source.Microphone, + publication: microphoneTrackPublication, + }; + }, [localParticipant, microphoneTrackPublication]); + + const cameraTrack: TrackReference | null = React.useMemo(() => { + if (!cameraTrackPublication) { + return null; + } + return { + participant: localParticipant, + source: Track.Source.Camera, + publication: cameraTrackPublication, + }; + }, [localParticipant, cameraTrackPublication]); + + const screenShareTrack: TrackReference | null = React.useMemo(() => { + if (!screenShareTrackPublication) { + return null; + } + return { + participant: localParticipant, + source: Track.Source.ScreenShare, + publication: screenShareTrackPublication, + }; + }, [localParticipant, screenShareTrackPublication]); + + const { + saveAudioInputEnabled, + saveAudioInputDeviceId, + saveVideoInputEnabled, + saveVideoInputDeviceId, + } = usePersistentUserChoices({ // FIXME: replace with agent alternative + preventSave: !options?.saveUserTrackEnabledChoices, + }); + + const setMicrophoneEnabled = useCallback(async ( + enabled: boolean, + captureOptions?: AudioCaptureOptions, + publishOptions?: TrackPublishOptions, + ) => { + setMicrophoneTrackPending(true); + try { + await localParticipant.setMicrophoneEnabled( + enabled, + captureOptions, + publishOptions, + ); + saveAudioInputEnabled(enabled); + setMicrophoneTrackEnabled(enabled); + return localParticipant.isMicrophoneEnabled; + } catch (e) { + if (options?.onDeviceError && e instanceof Error) { + options?.onDeviceError(e, Track.Source.Microphone); + return; + } else { + throw e; + } + } finally { + setMicrophoneTrackPending(false); + } + }, [options?.onDeviceError, setMicrophoneTrackPending, saveAudioInputEnabled, setMicrophoneTrackEnabled]); + + const setCameraEnabled = useCallback(async ( + enabled: boolean, + captureOptions?: VideoCaptureOptions, + publishOptions?: TrackPublishOptions, + ) => { + setCameraTrackPending(true); + try { + await localParticipant.setCameraEnabled( + enabled, + captureOptions, + publishOptions, + ); + saveVideoInputEnabled(enabled); + setCameraTrackEnabled(enabled); + return localParticipant.isMicrophoneEnabled; + } catch (e) { + if (options?.onDeviceError && e instanceof Error) { + options?.onDeviceError(e, Track.Source.Camera); + return; + } else { + throw e; + } + } finally { + setCameraTrackPending(false); + } + }, [options?.onDeviceError, setCameraTrackPending, saveVideoInputEnabled, setCameraTrackEnabled]); + + const setScreenShareEnabled = useCallback(async ( + enabled: boolean, + captureOptions?: ScreenShareCaptureOptions, + publishOptions?: TrackPublishOptions, + ) => { + setScreenShareTrackPending(true); + try { + await localParticipant.setScreenShareEnabled( + enabled, + captureOptions, + publishOptions, + ); + setScreenShareEnabled(enabled); + return localParticipant.isMicrophoneEnabled; + } catch (e) { + if (options?.onDeviceError && e instanceof Error) { + options?.onDeviceError(e, Track.Source.ScreenShare); + return; + } else { + throw e; + } + } finally { + setScreenShareTrackPending(false); + } + }, [options?.onDeviceError, setScreenShareTrackPending, setScreenShareTrackEnabled]); + + const changeAudioDevice = useCallback( + (deviceId: string) => { + saveAudioInputDeviceId(deviceId ?? 'default'); + }, + [saveAudioInputDeviceId] + ); + + const changeVideoDevice = useCallback( + (deviceId: string) => { + saveVideoInputDeviceId(deviceId ?? 'default'); + }, + [saveVideoInputDeviceId] + ); + + return { + localParticipant, + publishPermissions, + + microphone: { + track: microphoneTrack, + enabled: microphoneTrackEnabled, + pending: microphoneTrackPending, + set: setMicrophoneEnabled, + toggle: useCallback(( + captureOptions?: AudioCaptureOptions, + publishOptions?: TrackPublishOptions + ) => setMicrophoneEnabled(!microphoneTrackEnabled, captureOptions, publishOptions), [microphoneTrackEnabled, setMicrophoneEnabled]), + changeDevice: changeAudioDevice, + }, + camera: { + track: cameraTrack, + enabled: cameraTrackEnabled, + pending: cameraTrackPending, + set: setCameraEnabled, + toggle: useCallback(( + captureOptions?: VideoCaptureOptions, + publishOptions?: TrackPublishOptions + ) => setCameraEnabled(!cameraTrackEnabled, captureOptions, publishOptions), [cameraTrackEnabled, setCameraEnabled]), + changeDevice: changeVideoDevice, + }, + screenShare: { + track: screenShareTrack, + enabled: screenShareTrackEnabled, + pending: screenShareTrackPending, + set: setScreenShareEnabled, + toggle: useCallback(( + captureOptions?: ScreenShareCaptureOptions, + publishOptions?: TrackPublishOptions + ) => setScreenShareEnabled(!screenShareTrackEnabled, captureOptions, publishOptions), [screenShareTrackEnabled, setScreenShareEnabled]), + }, + }; +} + +// hook ideas: +// useAgentTracks? (video) +// useAgentControls? (control bar stuff) + +export { + AgentSession, + AgentSessionEvent, +}; diff --git a/agent-sdk/lib/future.ts b/agent-sdk/lib/future.ts new file mode 100644 index 00000000..992a5f1e --- /dev/null +++ b/agent-sdk/lib/future.ts @@ -0,0 +1,59 @@ +/** A Future represents a serialized version of a new Promise(...) call, exposing the promise plus + * corresponding resolve and reject functions to be used as an async execution management building + * block. + * + * @example + * ```ts + * const future = new Future(); + * + * async function startA() { + * setTimeout(() => future.resolve(123), 5000); + * } + * + * async function waitForA() { + * await future.promise; + * } + * + * async function main() { + * startA(); + * const result = await waitForA(); + * console.log(result); // logs 123 + * } + * ``` + * */ +export default class Future { + promise: Promise; + + // NOTE: these `throw`s shouldn't ever happen in practice, `new Promise` runs its callback + // syncronusly. + resolve: (arg: T) => void = () => { throw new Error('Future not yet initialized!') }; + reject: (e: any) => void = () => { throw new Error('Future not yet initialized!') }; + + onFinally?: () => void; + + get isResolved(): boolean { + return this._isResolved; + } + + private _isResolved: boolean = false; + + constructor( + futureBase?: (resolve: (arg: T) => void, reject: (e: any) => void) => void | Promise, + onFinally?: () => void, + ) { + this.onFinally = onFinally; + this.promise = new Promise(async (resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + if (futureBase) { + const futureBaseReturn = futureBase(resolve, reject); + if (futureBaseReturn instanceof Promise) { + await futureBaseReturn; + } + } + }).finally(() => { + this._isResolved = true; + this.onFinally?.(); + }); + } +} diff --git a/agent-sdk/lib/participant-attributes.ts b/agent-sdk/lib/participant-attributes.ts new file mode 100644 index 00000000..5fef0012 --- /dev/null +++ b/agent-sdk/lib/participant-attributes.ts @@ -0,0 +1,9 @@ +/** An enum of first party livekit attributes generated by the serverside agents sdk */ +export enum ParticipantAttributes { + state = 'lk.agent.state', + publishOnBehalf = 'lk.publish_on_behalf', + + TranscriptionFinal = "lk.transcription_final", + TranscriptionSegmentId = "lk.segment_id", + TranscribedTrackId = "lk.transcribed_track_id", +} diff --git a/components/app.tsx b/components/app.tsx index 725170b2..b63ff9d3 100644 --- a/components/app.tsx +++ b/components/app.tsx @@ -1,7 +1,7 @@ 'use client'; import { useEffect, useMemo, useState } from 'react'; -import { Room, RoomEvent } from 'livekit-client'; +import { RoomEvent } from 'livekit-client'; import { motion } from 'motion/react'; import { RoomAudioRenderer, RoomContext, StartAudio } from '@livekit/components-react'; import { toastAlert } from '@/components/alert-toast'; @@ -10,6 +10,7 @@ import { Toaster } from '@/components/ui/sonner'; import { Welcome } from '@/components/welcome'; import useConnectionDetails from '@/hooks/useConnectionDetails'; import type { AppConfig } from '@/lib/types'; +import { AgentSession, AgentSessionProvider } from '@/agent-sdk'; const MotionWelcome = motion.create(Welcome); const MotionSessionView = motion.create(SessionView); @@ -19,14 +20,14 @@ interface AppProps { } export function App({ appConfig }: AppProps) { - const room = useMemo(() => new Room(), []); + const { connectionDetailsProvider } = useConnectionDetails(); + const agentSession = useMemo(() => new AgentSession(connectionDetailsProvider), [connectionDetailsProvider]); const [sessionStarted, setSessionStarted] = useState(false); - const { connectionDetails, refreshConnectionDetails } = useConnectionDetails(); useEffect(() => { const onDisconnected = () => { setSessionStarted(false); - refreshConnectionDetails(); + connectionDetailsProvider.refresh(); }; const onMediaDevicesError = (error: Error) => { toastAlert({ @@ -34,23 +35,18 @@ export function App({ appConfig }: AppProps) { description: `${error.name}: ${error.message}`, }); }; - room.on(RoomEvent.MediaDevicesError, onMediaDevicesError); - room.on(RoomEvent.Disconnected, onDisconnected); + agentSession.room.on(RoomEvent.MediaDevicesError, onMediaDevicesError); + agentSession.room.on(RoomEvent.Disconnected, onDisconnected); return () => { - room.off(RoomEvent.Disconnected, onDisconnected); - room.off(RoomEvent.MediaDevicesError, onMediaDevicesError); + agentSession.room.off(RoomEvent.Disconnected, onDisconnected); + agentSession.room.off(RoomEvent.MediaDevicesError, onMediaDevicesError); }; - }, [room, refreshConnectionDetails]); + }, [agentSession, connectionDetailsProvider.refresh]); useEffect(() => { let aborted = false; - if (sessionStarted && room.state === 'disconnected' && connectionDetails) { - Promise.all([ - room.localParticipant.setMicrophoneEnabled(true, undefined, { - preConnectBuffer: appConfig.isPreConnectBufferEnabled, - }), - room.connect(connectionDetails.serverUrl, connectionDetails.participantToken), - ]).catch((error) => { + if (sessionStarted && agentSession.connectionState === 'disconnected') { + agentSession.connect().catch((error) => { if (aborted) { // Once the effect has cleaned up after itself, drop any errors // @@ -68,9 +64,9 @@ export function App({ appConfig }: AppProps) { } return () => { aborted = true; - room.disconnect(); + agentSession.disconnect(); }; - }, [room, sessionStarted, connectionDetails, appConfig.isPreConnectBufferEnabled]); + }, [agentSession, sessionStarted /* , appConfig.isPreConnectBufferEnabled */]); const { startButtonText } = appConfig; @@ -86,24 +82,26 @@ export function App({ appConfig }: AppProps) { transition={{ duration: 0.5, ease: 'linear', delay: sessionStarted ? 0 : 0.5 }} /> - - - - {/* --- */} - - + + + + + {/* --- */} + + + diff --git a/components/livekit/agent-control-bar/agent-control-bar.tsx b/components/livekit/agent-control-bar/agent-control-bar.tsx index 3f5051c5..f39bdcc1 100644 --- a/components/livekit/agent-control-bar/agent-control-bar.tsx +++ b/components/livekit/agent-control-bar/agent-control-bar.tsx @@ -3,7 +3,7 @@ import * as React from 'react'; import { useCallback } from 'react'; import { Track } from 'livekit-client'; -import { BarVisualizer, useRemoteParticipants } from '@livekit/components-react'; +import { BarVisualizer, /* useRemoteParticipants */ } from '@livekit/components-react'; import { ChatTextIcon, PhoneDisconnectIcon } from '@phosphor-icons/react/dist/ssr'; import { ChatInput } from '@/components/livekit/chat/chat-input'; import { Button } from '@/components/ui/button'; @@ -13,6 +13,7 @@ import { cn } from '@/lib/utils'; import { DeviceSelect } from '../device-select'; import { TrackToggle } from '../track-toggle'; import { UseAgentControlBarProps, useAgentControlBar } from './hooks/use-agent-control-bar'; +import { useAgentState } from '@/agent-sdk'; export interface AgentControlBarProps extends React.HTMLAttributes, @@ -38,11 +39,12 @@ export function AgentControlBar({ onDeviceError, ...props }: AgentControlBarProps) { - const participants = useRemoteParticipants(); const [chatOpen, setChatOpen] = React.useState(false); const [isSendingMessage, setIsSendingMessage] = React.useState(false); - const isAgentAvailable = participants.some((p) => p.isAgent); + // const participants = useRemoteParticipants(); + // const isAgentAvailable = participants.some((p) => p.isAgent); + const { isAvailable: isAgentAvailable } = useAgentState(); const isInputDisabled = !chatOpen || !isAgentAvailable || isSendingMessage; const [isDisconnecting, setIsDisconnecting] = React.useState(false); diff --git a/components/livekit/agent-control-bar/hooks/use-agent-control-bar.ts b/components/livekit/agent-control-bar/hooks/use-agent-control-bar.ts index 2c1495e8..f9d56522 100644 --- a/components/livekit/agent-control-bar/hooks/use-agent-control-bar.ts +++ b/components/livekit/agent-control-bar/hooks/use-agent-control-bar.ts @@ -1,4 +1,5 @@ import * as React from 'react'; +import { useCallback } from 'react'; import { Track } from 'livekit-client'; import { type TrackReferenceOrPlaceholder, @@ -8,6 +9,7 @@ import { useTrackToggle, } from '@livekit/components-react'; import { usePublishPermissions } from './use-publish-permissions'; +import { useAgentLocalParticipant, useAgentSession } from '@/agent-sdk'; export interface ControlBarControls { microphone?: boolean; @@ -40,113 +42,119 @@ export function useAgentControlBar(props: UseAgentControlBarProps = {}): UseAgen leave: true, ...controls, }; - const { microphoneTrack, localParticipant } = useLocalParticipant(); - const publishPermissions = usePublishPermissions(); - const room = useRoomContext(); - - const microphoneToggle = useTrackToggle({ - source: Track.Source.Microphone, - onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.Microphone, error }), - }); - const cameraToggle = useTrackToggle({ - source: Track.Source.Camera, - onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.Camera, error }), - }); - const screenShareToggle = useTrackToggle({ - source: Track.Source.ScreenShare, - onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.ScreenShare, error }), + // const { microphoneTrack, /* localParticipant */ } = useLocalParticipant(); // FIXME: replace with agent alternative + const { + publishPermissions, + microphone, + camera, + screenShare, + } = useAgentLocalParticipant({ + onDeviceError: useCallback((error: Error, source: Track.Source) => props.onDeviceError?.({ source, error }), [props.onDeviceError]), + saveUserTrackEnabledChoices: saveUserChoices, }); + // const publishPermissions = usePublishPermissions(); // FIXME: replace with agent alternative + // const room = useRoomContext(); + const agentSession = useAgentSession(); - const micTrackRef = React.useMemo(() => { - return { - participant: localParticipant, - source: Track.Source.Microphone, - publication: microphoneTrack, - }; - }, [localParticipant, microphoneTrack]); + // const microphoneToggle = useTrackToggle({ // FIXME: replace with agent alternative + // source: Track.Source.Microphone, + // onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.Microphone, error }), + // }); + // const cameraToggle = useTrackToggle({ // FIXME: replace with agent alternative + // source: Track.Source.Camera, + // onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.Camera, error }), + // }); + // const screenShareToggle = useTrackToggle({ // FIXME: replace with agent alternative + // source: Track.Source.ScreenShare, + // onDeviceError: (error) => props.onDeviceError?.({ source: Track.Source.ScreenShare, error }), + // }); visibleControls.microphone ??= publishPermissions.microphone; visibleControls.screenShare ??= publishPermissions.screenShare; visibleControls.camera ??= publishPermissions.camera; visibleControls.chat ??= publishPermissions.data; - const { - saveAudioInputEnabled, - saveAudioInputDeviceId, - saveVideoInputEnabled, - saveVideoInputDeviceId, - } = usePersistentUserChoices({ - preventSave: !saveUserChoices, - }); + // const { + // saveAudioInputEnabled, + // saveAudioInputDeviceId, + // saveVideoInputEnabled, + // saveVideoInputDeviceId, + // } = usePersistentUserChoices({ // FIXME: replace with agent alternative + // preventSave: !saveUserChoices, + // }); const handleDisconnect = React.useCallback(async () => { - if (room) { - await room.disconnect(); - } - }, [room]); + // if (room) { + // await room.disconnect(); + // } + await agentSession?.disconnect() + }, [/* room */, agentSession]); - const handleAudioDeviceChange = React.useCallback( - (deviceId: string) => { - saveAudioInputDeviceId(deviceId ?? 'default'); - }, - [saveAudioInputDeviceId] - ); + // const handleAudioDeviceChange = React.useCallback( + // (deviceId: string) => { + // saveAudioInputDeviceId(deviceId ?? 'default'); + // }, + // [saveAudioInputDeviceId] + // ); - const handleVideoDeviceChange = React.useCallback( - (deviceId: string) => { - saveVideoInputDeviceId(deviceId ?? 'default'); - }, - [saveVideoInputDeviceId] - ); + // const handleVideoDeviceChange = React.useCallback( + // (deviceId: string) => { + // saveVideoInputDeviceId(deviceId ?? 'default'); + // }, + // [saveVideoInputDeviceId] + // ); - const handleToggleCamera = React.useCallback( - async (enabled?: boolean) => { - if (screenShareToggle.enabled) { - screenShareToggle.toggle(false); - } - await cameraToggle.toggle(enabled); - // persist video input enabled preference - saveVideoInputEnabled(!cameraToggle.enabled); - }, - [cameraToggle.enabled, screenShareToggle.enabled] - ); + // const handleToggleCamera = React.useCallback( + // async (enabled?: boolean) => { + // if (screenShareToggle.enabled) { + // screenShareToggle.toggle(false); + // } + // await cameraToggle.toggle(enabled); + // // persist video input enabled preference + // saveVideoInputEnabled(!cameraToggle.enabled); + // }, + // [cameraToggle.enabled, screenShareToggle.enabled] + // ); - const handleToggleMicrophone = React.useCallback( - async (enabled?: boolean) => { - await microphoneToggle.toggle(enabled); - // persist audio input enabled preference - saveAudioInputEnabled(!microphoneToggle.enabled); - }, - [microphoneToggle.enabled] - ); + // const handleToggleMicrophone = React.useCallback( + // async (enabled?: boolean) => { + // await microphoneToggle.toggle(enabled); + // // persist audio input enabled preference + // saveAudioInputEnabled(!microphoneToggle.enabled); + // }, + // [microphoneToggle.enabled] + // ); - const handleToggleScreenShare = React.useCallback( - async (enabled?: boolean) => { - if (cameraToggle.enabled) { - cameraToggle.toggle(false); - } - await screenShareToggle.toggle(enabled); - }, - [screenShareToggle.enabled, cameraToggle.enabled] - ); + // const handleToggleScreenShare = React.useCallback( + // async (enabled?: boolean) => { + // if (cameraToggle.enabled) { + // cameraToggle.toggle(false); + // } + // await screenShareToggle.toggle(enabled); + // }, + // [screenShareToggle.enabled, cameraToggle.enabled] + // ); return { - micTrackRef, + micTrackRef: microphone.track, visibleControls, - cameraToggle: { - ...cameraToggle, - toggle: handleToggleCamera, - }, - microphoneToggle: { - ...microphoneToggle, - toggle: handleToggleMicrophone, - }, - screenShareToggle: { - ...screenShareToggle, - toggle: handleToggleScreenShare, - }, + cameraToggle: { ...camera, buttonProps: {} }, + microphoneToggle: { ...microphone, buttonProps: {} }, + screenShareToggle: { ...screenShare, buttonProps: {} }, + // cameraToggle: { + // ...cameraToggle, + // toggle: handleToggleCamera, + // }, + // microphoneToggle: { + // ...microphoneToggle, + // toggle: handleToggleMicrophone, + // }, + // screenShareToggle: { + // ...screenShareToggle, + // toggle: handleToggleScreenShare, + // }, handleDisconnect, - handleAudioDeviceChange, - handleVideoDeviceChange, + handleAudioDeviceChange: microphone.changeDevice, + handleVideoDeviceChange: camera.changeDevice, }; } diff --git a/components/livekit/chat/chat-entry.tsx b/components/livekit/chat/chat-entry.tsx index 1ad1ab84..270e5581 100644 --- a/components/livekit/chat/chat-entry.tsx +++ b/components/livekit/chat/chat-entry.tsx @@ -1,11 +1,11 @@ import * as React from 'react'; -import type { MessageFormatter, ReceivedChatMessage } from '@livekit/components-react'; +import type { MessageFormatter } from '@livekit/components-react'; import { cn } from '@/lib/utils'; -import { useChatMessage } from './hooks/utils'; +import { ReceivedMessage } from '@/agent-sdk/agent-session/message'; export interface ChatEntryProps extends React.HTMLAttributes { /** The chat massage object to display. */ - entry: ReceivedChatMessage; + entry: ReceivedMessage; /** Hide sender name. Useful when displaying multiple consecutive chat messages from the same person. */ hideName?: boolean; /** Hide message timestamp. */ @@ -22,9 +22,15 @@ export const ChatEntry = ({ className, ...props }: ChatEntryProps) => { - const { message, hasBeenEdited, time, locale, name } = useChatMessage(entry, messageFormatter); + // FIXME: Where would this kind of metadata come from for real? + // const { message, hasBeenEdited, time, locale, name } = useChatMessage(entry, messageFormatter); + const message = entry.content.text; + const hasBeenEdited = false; + const time = entry.timestamp; + const locale = typeof navigator !== 'undefined' ? navigator.language : 'en-US'; + const name = entry.direction === 'outbound' ? 'User' : 'Agent'; - const isUser = entry.from?.isLocal ?? false; + const isUser = entry.direction === 'outbound';//entry.from?.isLocal ?? false; const messageOrigin = isUser ? 'remote' : 'local'; return ( diff --git a/components/livekit/media-tiles.tsx b/components/livekit/media-tiles.tsx index 7b7cedd6..c3b7b99d 100644 --- a/components/livekit/media-tiles.tsx +++ b/components/livekit/media-tiles.tsx @@ -11,6 +11,7 @@ import { cn } from '@/lib/utils'; import { AgentTile } from './agent-tile'; import { AvatarTile } from './avatar-tile'; import { VideoTile } from './video-tile'; +import { useAgentLocalParticipant, useAgentState } from '@/agent-sdk'; const MotionVideoTile = motion.create(VideoTile); const MotionAgentTile = motion.create(AgentTile); @@ -91,13 +92,17 @@ interface MediaTilesProps { } export function MediaTiles({ chatOpen }: MediaTilesProps) { + const { legacyState: agentState } = useAgentState(); + // const { audioTrack: agentAudioTrack, videoTrack: agentVideoTrack } = useAgentTracks(); const { - state: agentState, + // state: agentState, audioTrack: agentAudioTrack, videoTrack: agentVideoTrack, } = useVoiceAssistant(); - const [screenShareTrack] = useTracks([Track.Source.ScreenShare]); - const cameraTrack: TrackReference | undefined = useLocalTrackRef(Track.Source.Camera); + // console.log('TRACKS:', agentAudioTrack, agentVideoTrack); + const [screenShareTrack] = useTracks([Track.Source.ScreenShare]); // FIXME: replace with agent alternative + // const cameraTrack: TrackReference | undefined = useLocalTrackRef(Track.Source.Camera); // FIXME: replace with agent alternative + const { camera: { track: cameraTrack } } = useAgentLocalParticipant(); const isCameraEnabled = cameraTrack && !cameraTrack.publication.isMuted; const isScreenShareEnabled = screenShareTrack && !screenShareTrack.publication.isMuted; @@ -119,7 +124,7 @@ export function MediaTiles({ chatOpen }: MediaTilesProps) { const agentLayoutTransition = transition; const avatarLayoutTransition = transition; - const isAvatar = agentVideoTrack !== undefined; + const isAvatar = Boolean(agentVideoTrack); return (

diff --git a/components/session-view.tsx b/components/session-view.tsx index ae601993..b818e55c 100644 --- a/components/session-view.tsx +++ b/components/session-view.tsx @@ -1,26 +1,27 @@ 'use client'; -import React, { useEffect, useState } from 'react'; +import React, { useState } from 'react'; import { AnimatePresence, motion } from 'motion/react'; -import { - type AgentState, - type ReceivedChatMessage, - useRoomContext, - useVoiceAssistant, -} from '@livekit/components-react'; +// import { +// type AgentState, +// type ReceivedChatMessage, +// useRoomContext, +// useVoiceAssistant, +// } from '@livekit/components-react'; import { toastAlert } from '@/components/alert-toast'; import { AgentControlBar } from '@/components/livekit/agent-control-bar/agent-control-bar'; import { ChatEntry } from '@/components/livekit/chat/chat-entry'; import { ChatMessageView } from '@/components/livekit/chat/chat-message-view'; import { MediaTiles } from '@/components/livekit/media-tiles'; -import useChatAndTranscription from '@/hooks/useChatAndTranscription'; +// import useChatAndTranscription from '@/hooks/useChatAndTranscription'; import { useDebugMode } from '@/hooks/useDebug'; import type { AppConfig } from '@/lib/types'; import { cn } from '@/lib/utils'; +import { AgentSessionEvent, useAgentMessages, useAgentSessionEvent } from '@/agent-sdk'; -function isAgentAvailable(agentState: AgentState) { - return agentState == 'listening' || agentState == 'thinking' || agentState == 'speaking'; -} +// function isAgentAvailable(agentState: AgentState) { +// return agentState == 'listening' || agentState == 'thinking' || agentState == 'speaking'; +// } interface SessionViewProps { appConfig: AppConfig; @@ -34,50 +35,45 @@ export const SessionView = ({ sessionStarted, ref, }: React.ComponentProps<'div'> & SessionViewProps) => { - const { state: agentState } = useVoiceAssistant(); + const { messages, send } = useAgentMessages(); + + // const { state: agentState } = useVoiceAssistant(); const [chatOpen, setChatOpen] = useState(false); - const { messages, send } = useChatAndTranscription(); - const room = useRoomContext(); + // const { messages, send } = useChatAndTranscription(); + // const room = useRoomContext(); useDebugMode(); async function handleSendMessage(message: string) { - await send(message); + // FIXME: add some sort of builder for SentMessage here so it's not just a raw object? + await send({ + id: `${Math.random()}`, /* FIXME: fix id generation */ + direction: 'outbound', + timestamp: new Date(), + content: { type: 'chat', text: message }, + }); + // await send(message); } - useEffect(() => { - if (sessionStarted) { - const timeout = setTimeout(() => { - if (!isAgentAvailable(agentState)) { - const reason = - agentState === 'connecting' - ? 'Agent did not join the room. ' - : 'Agent connected but did not complete initializing. '; - - toastAlert({ - title: 'Session ended', - description: ( -

- {reason} - - See quickstart guide - - . -

- ), - }); - room.disconnect(); - } - }, 10_000); - - return () => clearTimeout(timeout); - } - }, [agentState, sessionStarted, room]); + useAgentSessionEvent(AgentSessionEvent.AgentConnectionFailure, (reason: string) => { + toastAlert({ + title: 'Session ended', + description: ( +

+ {reason} + + See quickstart guide + + . +

+ ), + }); + }, []); const { supportsChatInput, supportsVideoInput, supportsScreenShare } = appConfig; const capabilities = { @@ -104,7 +100,7 @@ export const SessionView = ({ >
- {messages.map((message: ReceivedChatMessage) => ( + {messages.map((message) => ( (null); - - const fetchConnectionDetails = useCallback(() => { - setConnectionDetails(null); + const fetchConnectionDetails = useCallback(async () => { const url = new URL( process.env.NEXT_PUBLIC_CONN_DETAILS_ENDPOINT ?? '/api/connection-details', window.location.origin ); - fetch(url.toString()) - .then((res) => res.json()) - .then((data) => { - setConnectionDetails(data); - }) - .catch((error) => { - console.error('Error fetching connection details:', error); - }); + + let data: ConnectionDetails; + try { + const res = await fetch(url.toString()); + data = await res.json(); + } catch (error) { + console.error('Error fetching connection details:', error); + throw new Error('Error fetching connection details!'); + } + + return data; }, []); + const provider = useMemo( + () => new ManualConnectionCredentialsProvider(fetchConnectionDetails), + [fetchConnectionDetails], + ); + useEffect(() => { - fetchConnectionDetails(); - }, [fetchConnectionDetails]); + provider.refresh(); + }, [provider]); - return { connectionDetails, refreshConnectionDetails: fetchConnectionDetails }; + return { connectionDetailsProvider: provider }; } diff --git a/hooks/useDebug.ts b/hooks/useDebug.ts index 7e69dab9..149e5a61 100644 --- a/hooks/useDebug.ts +++ b/hooks/useDebug.ts @@ -1,9 +1,11 @@ import * as React from 'react'; import { LogLevel, setLogLevel } from 'livekit-client'; -import { useRoomContext } from '@livekit/components-react'; +// import { useRoomContext } from '@livekit/components-react'; +import { useAgentSession } from '@/agent-sdk'; export const useDebugMode = ({ logLevel }: { logLevel?: LogLevel } = {}) => { - const room = useRoomContext(); + // const room = useRoomContext(); + const room = useAgentSession().room; React.useEffect(() => { setLogLevel(logLevel ?? 'debug'); diff --git a/package.json b/package.json index 5cd54b64..9cb2fbe6 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "buffer-image-size": "^0.6.4", "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", + "jose": "^6.0.12", "livekit-client": "^2.13.3", "livekit-server-sdk": "^2.13.0", "mime": "^4.0.7", @@ -31,7 +32,9 @@ "react": "^19.0.0", "react-dom": "^19.0.0", "sonner": "^2.0.3", - "tailwind-merge": "^3.3.0" + "streaming-iterables": "^8.0.1", + "tailwind-merge": "^3.3.0", + "typed-emitter": "^2.1.0" }, "devDependencies": { "@eslint/eslintrc": "^3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e97a81c8..cc37b112 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: clsx: specifier: ^2.1.1 version: 2.1.1 + jose: + specifier: ^6.0.12 + version: 6.0.12 livekit-client: specifier: ^2.13.3 version: 2.15.4(@types/dom-mediacapture-record@1.0.22) @@ -68,9 +71,15 @@ importers: sonner: specifier: ^2.0.3 version: 2.0.7(react-dom@19.1.1(react@19.1.1))(react@19.1.1) + streaming-iterables: + specifier: ^8.0.1 + version: 8.0.1 tailwind-merge: specifier: ^3.3.0 version: 3.3.1 + typed-emitter: + specifier: ^2.1.0 + version: 2.1.0 devDependencies: '@eslint/eslintrc': specifier: ^3 @@ -1853,6 +1862,9 @@ packages: jose@5.10.0: resolution: {integrity: sha512-s+3Al/p9g32Iq+oqXxkW//7jk2Vig6FF1CFqzVXoTUXt2qz89YWbL+OwS17NFYEvxC35n0FKeGO2LGYSxeM2Gg==} + jose@6.0.12: + resolution: {integrity: sha512-T8xypXs8CpmiIi78k0E+Lk7T2zlK4zDyg+o1CZ4AkOHgDg98ogdP2BeZ61lTFKFyoEwJ9RgAgN+SdM3iPgNonQ==} + js-tokens@4.0.0: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} @@ -2445,6 +2457,10 @@ packages: resolution: {integrity: sha512-eLoXW/DHyl62zxY4SCaIgnRhuMr6ri4juEYARS8E6sCEqzKpOiE521Ucofdx+KnDZl5xmvGYaaKCk5FEOxJCoQ==} engines: {node: '>= 0.4'} + streaming-iterables@8.0.1: + resolution: {integrity: sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==} + engines: {node: '>=18'} + string.prototype.includes@2.0.1: resolution: {integrity: sha512-o7+c9bW6zpAdJHTtujeePODAhkuicdAryFsfVKwA+wGw89wJ4GTY484WTucM9hLtDEOpOvI+aHnzqnC5lHp4Rg==} engines: {node: '>= 0.4'} @@ -4453,6 +4469,8 @@ snapshots: jose@5.10.0: {} + jose@6.0.12: {} + js-tokens@4.0.0: {} js-yaml@4.1.0: @@ -4998,6 +5016,8 @@ snapshots: es-errors: 1.3.0 internal-slot: 1.1.0 + streaming-iterables@8.0.1: {} + string.prototype.includes@2.0.1: dependencies: call-bind: 1.0.8