/
agent.ts
472 lines (425 loc) 路 15.1 KB
/
agent.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
// Copyright 2024 Ahyve AI Inc.
// SPDX-License-Identifier: MIT
import chalk from "chalk";
import {
ChildOf,
Conclusible,
Constructor,
Identified,
Metadata,
ParentOf,
meta,
} from "./common";
import { ModelType, pricing, ModelPricing } from "./models";
import { ModelInvocationOptions, Session } from "./session";
import { Thread, ToolAssistantContent, ToolCall } from "./thread";
import { Tool, ToolSpec } from "./tool";
import { EventEmitter } from "events";
/** Basic options for an agent */
export interface AgentOptions {
/** Model type for the agent to use */
model?: ModelType;
/** Topic for the agent */
topic?: string;
/** Tools for the agent to use */
tools?: Tool[];
/** System prompt for the agent */
systemPrompt?: string;
/** Eat tool results */
eatToolResults?: boolean;
/** Use JSON mode */
expectsJSON?: boolean;
/** Temperature for the LLM */
temperature?: number;
}
/** Agent is the interface for all agents */
export type Agent = BaseAgent<any, any, any>;
/** Agent events */
export interface AgentEvents<StateType, ResultType> {
start: (state: StateType) => void;
step: (state: StateType) => void;
stopping: (state: StateType) => void;
stop: (result: ResultType) => void;
heartbeat: () => void;
}
export interface BaseAgent<
OptionsType extends AgentOptions,
StateType,
ResultType,
> {
on<U extends keyof AgentEvents<StateType, ResultType>>(
event: U,
listener: AgentEvents<StateType, ResultType>[U]
): this;
emit<U extends keyof AgentEvents<StateType, ResultType>>(
event: U,
...args: Parameters<AgentEvents<StateType, ResultType>[U]>
): boolean;
off<U extends keyof AgentEvents<StateType, ResultType>>(
event: U,
listener: AgentEvents<StateType, ResultType>[U]
): this;
once<U extends keyof AgentEvents<StateType, ResultType>>(
event: U,
listener: AgentEvents<StateType, ResultType>[U]
): this;
}
/**
* BaseAgent is the base class for all agents.
* It implements the basic agent lifecycle and convenience functions.
* It also implements thread management.
* This class is not directly usable and will throw unimplemented.
* To implement a custom agent, extend this class (see `initialize`, `finalize` and `step` methods).
* @param OptionsType options for the agent
* @param StateType state of the agent
* @param ResultType result of the agent
*/
export class BaseAgent<OptionsType extends AgentOptions, StateType, ResultType>
extends EventEmitter
implements Identified, Conclusible, ChildOf<Session>, ParentOf<Thread>
{
metadata: Metadata;
/** Options for the agent, use to pass custom data in constructor. */
options: OptionsType;
/** System prompt, can be undefined if the agent does not need one. */
systemPrompt?: string;
/** Model to use for the agent, can be undefined if the agent does not need to talk to any model. */
model?: ModelType;
/** State of the agent, initial state is obtained by calling `initialize` */
state?: StateType = undefined;
/** Result of the agent, obtained by calling `finalize` */
result?: ResultType = undefined;
/** Flag to indicate if the agent is active or not. Agent is only active during `run` */
isActive: boolean = false;
/** Tools for the agent to use */
tools: Tool[] = [];
/** Flag to indicate whether the tool results should be removed from threads after they were read */
eatToolResults: boolean = false;
/** Flag to indicate whether the agent expects JSON output from the LLM */
expectsJSON: boolean = false;
/** Temperature to use with the LLM */
temperature: number = 0.0;
/** Maximum tokens to produce */
maxTokens?: number = undefined;
/** Used to track threads that the agent is managing */
private threads: Thread[];
get parent(): Session {
if (!this.metadata.parent) {
throw new Error("Agent has no parent");
}
return this.metadata.parent! as Session;
}
/** Session that the agent belongs to, alias to `parent` */
get session(): Session {
return this.parent;
}
/** Model details */
get modelDetails(): ModelPricing | undefined {
if (this.model) {
return pricing[this.model];
}
return undefined;
}
/** Constructs a new agent.
* @param session the session that the agent belongs to
* @param options options for the agent
*/
constructor(session: Session, options?: OptionsType) {
super();
this.metadata = meta(
this.constructor as Constructor<Identified>,
options?.topic
);
this.metadata.parent = session;
this.options = options || ({} as OptionsType);
this.model = this.model || this.options?.model;
this.tools = this.tools || this.options?.tools || [];
this.systemPrompt = this.systemPrompt || this.options?.systemPrompt;
this.eatToolResults =
this.eatToolResults || this.options?.eatToolResults || false;
this.expectsJSON = this.expectsJSON || this.options?.expectsJSON || false;
this.temperature = this.temperature || this.options?.temperature || 0.0;
this.threads = [];
}
/**
* Starts the agent. Returns the final result when the agent is done.
* @returns the final result of the agent's work
*/
async run(): Promise<ResultType> {
console.log("Agent", chalk.yellow(this.metadata.ID), chalk.blue("start"));
this.trace(`${this.metadata.ID} run started: ${this.metadata.topic}`);
this.state = await this.initialize(this.options);
this.trace("initialize finished", this.state);
// FIXME: This check doen't make sense when the state is of type void. We need to rethink this.
// if (!this.state) {
// throw new Error("initialize() must return a state");
// }
this.isActive = true;
this.emit("start", this.state);
while (this.isActive && !this.session.isAborted) {
this.heartbeat();
this.state = await this.step(this.state!);
this.emit("step", this.state);
}
this.isActive = false;
this.emit("stopping", this.state);
this.result = await this.finalize(this.state);
this.emit("stop", this.result);
this.conclude();
console.log(
"Agent",
chalk.yellow(this.metadata.ID),
chalk.green("done"),
chalk.gray(
`(took ${this.metadata.timing.elapsed.as("seconds").toFixed(2)}s)`
)
);
return this.result;
}
/** Indicates that the agent should be stopped.
* Call only from `step` to finish agent's work.
*/
stop(): void {
if (!this.isActive) {
throw new Error("Agent is not active");
}
this.isActive = false;
}
/** Initializes the agent.
* Used to set up the initial state of the agent.
* Is called before first invocation of `step`.
* Implement this when extending the class.
* @param _options options for the agent
* @returns the initial state of the agent
*/
async initialize(_options: OptionsType): Promise<StateType> {
throw new Error("initialize() not implemented");
}
/** Finalizes the agent.
* Used to finalize the agent's work and obtain the final result.
* Implement this when extending the class.
* @param _finalState the final state of the agent
* @returns the final result of the agent
*/
async finalize(_finalState: StateType): Promise<ResultType> {
throw new Error("finalize() not implemented");
}
/** Steps the agent.
* Used to advance the agent's work.
* Is called repeatedly until `stop` is called.
* Implement this when extending the class.
* @param _finalState the final state of the agent
* @returns the next state of the agent
*/
async step(_finalState: StateType): Promise<StateType> {
throw new Error("step() not implemented");
}
/** Creates a new thread for the agent and adopts it.
* @returns the new thread
*/
createThread(): Thread {
const thread = new Thread(this);
this.adopt(thread);
return thread;
}
/** Advances the thread by sending it to the model.
* This might take a while, as it waits for the model to respond.
* The thread is advanced mutably, so the same thread is returned.
* This essentially appends the model's response to the thread.
* @param thread the thread to advance
* @returns the next thread
*/
async advance(thread: Thread): Promise<Thread> {
if (!this.model) {
throw new Error("Model not set");
}
if (thread.metadata.parent !== this) {
throw new Error("Thread already has a different parent");
}
if (!this.threads.includes(thread)) {
throw new Error("Thread not adopted");
}
if (thread.complete) {
throw new Error("Thread is complete");
}
const messages = thread.messages;
const response = await this.session.invokeModel(
this,
this.model,
messages,
this.modelInvocationOptions
);
let nextThread: Thread;
if (response.content && typeof response.content === "string") {
nextThread = thread.appendAssistantMessage(response.content);
if (nextThread !== thread) {
throw new Error("Thread should have been mutably advanced");
}
} else if (response.tool_calls) {
nextThread = thread.appendAssistantToolCalls(response.tool_calls);
// now we handle the tool calls by responding to the assistant
// since tool calls completed the thread, this will not mutate the thread,
// but will create a new one that we need to adopt
const nextThread2 = await this.handleToolCalls(nextThread);
if (nextThread2 === nextThread) {
throw new Error("Thread should have been immutably advanced");
}
this.abandon(nextThread);
this.adopt(nextThread2);
// finally we need to invoke the model again to pass the tool responses to the assistant
// and obtain its response
const assistantRespondedThread = await this.advance(nextThread2);
if (this.eatToolResults) {
const toolCalls: ToolCall[] | undefined = (
assistantRespondedThread.interaction.previous?.assistant as
| ToolAssistantContent
| undefined
)?.toolCalls;
if (toolCalls === undefined) {
throw new Error("Invalid tool calls when eating tool results");
}
const threadWithoutToolResults = assistantRespondedThread.rollup(
thread,
`You have called tools: ${JSON.stringify(
toolCalls.map((t) => t.function)
)} but the results have been removed from the conversation. See the following note.`
);
this.abandon(assistantRespondedThread);
this.adopt(threadWithoutToolResults);
return threadWithoutToolResults;
}
return assistantRespondedThread;
} else {
throw new Error("Invalid response");
}
return nextThread;
}
/** Describes the tools that the agent uses.
* @returns the OpenAI schema for the tools that the agent can use
*/
describeTools(): ToolSpec[] {
const tools = this.tools.map((t) => t.describe());
return tools;
}
/** Model invocation options that the agent uses.
* Contains tools to use and other options accepted by OpenAI.
*/
get modelInvocationOptions(): ModelInvocationOptions | undefined {
const options: ModelInvocationOptions = {};
if (this.tools.length > 0) {
options.tools = this.describeTools();
}
if (this.expectsJSON) {
options.response_format = { type: "json_object" };
}
options.temperature = this.temperature;
if (this.maxTokens !== undefined) {
options.max_tokens = this.maxTokens;
}
if (Object.keys(options).length > 0) {
return options;
}
return undefined;
}
/**
* Handles tool calls in a given thread by invoking the corresponding tool and appending the result to the thread.
* Resulting thread still needs to be completed using `advance` to obtain the assistant response.
* @remarks This method is called by `advance` when the assistant returns tool calls.
* @param thread The thread containing the tool calls to be handled.
* @returns A promise that resolves to the updated thread with the tool results appended.
*/
async handleToolCalls(thread: Thread): Promise<Thread> {
if (thread.interaction.assistant?.type !== "tool_calls") {
throw new Error("Thread does not contain tool calls");
}
const toolCalls: ToolCall[] = thread.interaction.assistant.toolCalls;
for (const toolCall of toolCalls) {
try {
// find the tool
const tool: Tool | undefined = this.tools.find(
(t) => t.name === toolCall.function.name
);
if (!tool) {
throw new Error(`Tool ${toolCall.function.name} not found`);
}
// invoke the tool
const parsedArguments = JSON.parse(toolCall.function.arguments);
const result = await tool.invoke(this, parsedArguments);
// append the result to current thread
thread = thread.appendToolResult(toolCall.id, JSON.stringify(result));
} catch (err: any) {
// when something goes wrong, we need to respond with an error to the particular tool call
// we don't throw to the caller
this.trace("tool error", toolCall.function.name, err);
thread = thread.appendToolResult(
toolCall.id,
`TOOL ERROR: ${err.message}`
);
}
}
// at this stage we have appended all the tool results to the thread
return thread;
}
/** Create another agent within the session.
* It delegates to `Session.spawnAgent` in the session of this agent.
* @param constructor constructor of the agent to create
* @param options options for the agent
*/
spawnAgent<T extends Agent>(
constructor: Constructor<T>,
options?: AgentOptions
): T {
console.log(
chalk.yellow(this.metadata.ID),
chalk.blue("spawn"),
constructor.name
);
return this.session.spawnAgent(constructor, options);
}
adopt(child: Thread): void {
if (child.metadata.parent !== this) {
throw new Error("Thread already has a different parent");
}
if (this.threads.includes(child)) {
throw new Error("Thread already adopted");
}
this.threads.push(child);
}
abandon(child: Thread): void {
if (!this.threads.includes(child)) {
throw new Error("Thread not adopted");
}
this.threads = this.threads.filter((t) => t !== child);
}
conclude(): void {
if (this.isActive) {
throw new Error("Can't conclude an active Agent");
}
for (const thread of this.threads) {
thread.conclude();
}
this.metadata.timing.finish();
this.parent.abandon(this);
}
/** Notify session that the agent is still active. Use in long-running side-effects (e.g. tool invocations, long external requests, etc.) to prevent timeouts. This method is automatically called in `step` so you don't need to call it manually in most cases. */
heartbeat(): void {
this.emit("heartbeat");
}
notify(...stuff: any[]): number | undefined {
if (this.parent) {
if (typeof stuff[0] === "object") {
stuff[0].agent = this.metadata.ID;
} else {
stuff.unshift({ agent: this.metadata.ID });
}
return this.parent.notify(...stuff);
}
return undefined;
}
trace(...stuff: any[]): number | undefined {
if (this.parent) {
return this.parent.trace(...stuff);
}
return undefined;
}
}