-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
router.ts
108 lines (96 loc) 路 3.44 KB
/
router.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import { Runnable, type RunnableBatchOptions } from "./base.js";
import { IterableReadableStream } from "../utils/stream.js";
import { ensureConfig, type RunnableConfig } from "./config.js";
export type RouterInput = {
key: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
input: any;
};
/**
* A runnable that routes to a set of runnables based on Input['key'].
* Returns the output of the selected runnable.
*/
export class RouterRunnable<
RunInput extends RouterInput,
RunnableInput,
RunOutput
> extends Runnable<RunInput, RunOutput> {
static lc_name() {
return "RouterRunnable";
}
lc_namespace = ["langchain_core", "runnables"];
lc_serializable = true;
runnables: Record<string, Runnable<RunnableInput, RunOutput>>;
constructor(fields: {
runnables: Record<string, Runnable<RunnableInput, RunOutput>>;
}) {
super(fields);
this.runnables = fields.runnables;
}
async invoke(
input: RunInput,
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
const { key, input: actualInput } = input;
const runnable = this.runnables[key];
if (runnable === undefined) {
throw new Error(`No runnable associated with key "${key}".`);
}
return runnable.invoke(actualInput, ensureConfig(options));
}
async batch(
inputs: RunInput[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions & { returnExceptions?: false }
): Promise<RunOutput[]>;
async batch(
inputs: RunInput[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions & { returnExceptions: true }
): Promise<(RunOutput | Error)[]>;
async batch(
inputs: RunInput[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;
async batch(
inputs: RunInput[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]> {
const keys = inputs.map((input) => input.key);
const actualInputs = inputs.map((input) => input.input);
const missingKey = keys.find((key) => this.runnables[key] === undefined);
if (missingKey !== undefined) {
throw new Error(`One or more keys do not have a corresponding runnable.`);
}
const runnables = keys.map((key) => this.runnables[key]);
const optionsList = this._getOptionsList(options ?? {}, inputs.length);
const maxConcurrency =
optionsList[0]?.maxConcurrency ?? batchOptions?.maxConcurrency;
const batchSize =
maxConcurrency && maxConcurrency > 0 ? maxConcurrency : inputs.length;
const batchResults = [];
for (let i = 0; i < actualInputs.length; i += batchSize) {
const batchPromises = actualInputs
.slice(i, i + batchSize)
.map((actualInput, i) =>
runnables[i].invoke(actualInput, optionsList[i])
);
const batchResult = await Promise.all(batchPromises);
batchResults.push(batchResult);
}
return batchResults.flat();
}
async stream(
input: RunInput,
options?: Partial<RunnableConfig>
): Promise<IterableReadableStream<RunOutput>> {
const { key, input: actualInput } = input;
const runnable = this.runnables[key];
if (runnable === undefined) {
throw new Error(`No runnable associated with key "${key}".`);
}
return runnable.stream(actualInput, options);
}
}