Skip to content

Commit

Permalink
Merge pull request #3792 from langchain-ai/nc/dec26/runnable-stream
Browse files Browse the repository at this point in the history
core[minor]: Nc/dec26/runnable stream
  • Loading branch information
nfcampos committed Dec 26, 2023
2 parents d93f8b0 + 81d6bad commit 1eca176
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 70 deletions.
7 changes: 0 additions & 7 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,6 @@ export interface BaseCallbackConfig {
* Tags are passed to all callbacks, metadata is passed to handle*Start callbacks.
*/
callbacks?: Callbacks;

/**
* Runtime values for attributes previously made configurable on this Runnable,
* or sub-Runnables.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
configurable?: Record<string, any>;
}

export function parseCallbackConfigArg(
Expand Down
139 changes: 99 additions & 40 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import pRetry from "p-retry";
import {
CallbackManager,
CallbackManagerForChainRun,
BaseCallbackConfig,
} from "../callbacks/manager.js";
import {
LogStreamCallbackHandler,
Expand All @@ -13,9 +12,12 @@ import {
import { Serializable } from "../load/serializable.js";
import {
IterableReadableStream,
concat,
type IterableReadableStreamInterface,
atee,
} from "../utils/stream.js";
import {
DEFAULT_RECURSION_LIMIT,
RunnableConfig,
getCallbackMangerForConfig,
mergeConfigs,
Expand Down Expand Up @@ -59,12 +61,6 @@ export interface RunnableInterface<
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;

batch(
inputs: RunInput[],
options?: Partial<CallOptions> | Partial<CallOptions>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;

stream(
input: RunInput,
options?: Partial<CallOptions>
Expand Down Expand Up @@ -433,30 +429,24 @@ export abstract class Runnable<
let finalOutputSupported = true;

const callbackManager_ = await getCallbackMangerForConfig(options);
let runManager: CallbackManagerForChainRun | undefined;
const serializedRepresentation = this.toJSON();
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
{ input: "" },
undefined,
options?.runType,
undefined,
undefined,
options?.runName
);
async function* wrapInputForTracing() {
for await (const chunk of inputGenerator) {
if (!runManager) {
// Start the run manager AFTER the iterator starts to preserve
// tracing order
runManager = await callbackManager_?.handleChainStart(
serializedRepresentation,
{ input: "" },
undefined,
options?.runType,
undefined,
undefined,
options?.runName
);
}
if (finalInputSupported) {
if (finalInput === undefined) {
finalInput = chunk;
} else {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
finalInput = (finalInput as any).concat(chunk);
finalInput = concat(finalInput, chunk as any);
} catch {
finalInput = undefined;
finalInputSupported = false;
Expand All @@ -482,7 +472,7 @@ export abstract class Runnable<
} else {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
finalOutput = (finalOutput as any).concat(chunk);
finalOutput = concat(finalOutput, chunk as any);
} catch {
finalOutput = undefined;
finalOutputSupported = false;
Expand All @@ -507,7 +497,8 @@ export abstract class Runnable<

_patchConfig(
config: Partial<CallOptions> = {},
callbackManager: CallbackManager | undefined = undefined
callbackManager: CallbackManager | undefined = undefined,
recursionLimit: number | undefined = undefined
): Partial<CallOptions> {
const newConfig = { ...config };
if (callbackManager !== undefined) {
Expand All @@ -518,6 +509,9 @@ export abstract class Runnable<
delete newConfig.runName;
return { ...newConfig, callbacks: callbackManager };
}
if (recursionLimit !== undefined) {
newConfig.recursionLimit = recursionLimit;
}
return newConfig;
}

Expand Down Expand Up @@ -556,7 +550,7 @@ export abstract class Runnable<
// Make a best effort to gather, for any type that supports concat.
// This method should throw an error if gathering fails.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
finalChunk = (finalChunk as any).concat(chunk);
finalChunk = concat(finalChunk, chunk as any);
}
}
yield* this._streamIterator(finalChunk, options);
Expand Down Expand Up @@ -670,7 +664,7 @@ export abstract class Runnable<
export type RunnableBindingArgs<
RunInput,
RunOutput,
CallOptions extends RunnableConfig
CallOptions extends RunnableConfig = RunnableConfig
> = {
bound: Runnable<RunInput, RunOutput, CallOptions>;
kwargs?: Partial<CallOptions>;
Expand All @@ -684,7 +678,7 @@ export type RunnableBindingArgs<
export class RunnableBinding<
RunInput,
RunOutput,
CallOptions extends RunnableConfig
CallOptions extends RunnableConfig = RunnableConfig
> extends Runnable<RunInput, RunOutput, CallOptions> {
static lc_name() {
return "RunnableBinding";
Expand Down Expand Up @@ -892,7 +886,7 @@ export class RunnableBinding<
export class RunnableEach<
RunInputItem,
RunOutputItem,
CallOptions extends BaseCallbackConfig
CallOptions extends RunnableConfig
> extends Runnable<RunInputItem[], RunOutputItem[], CallOptions> {
static lc_name() {
return "RunnableEach";
Expand Down Expand Up @@ -1360,7 +1354,7 @@ export class RunnableSequence<
} else {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
finalOutput = (finalOutput as any).concat(chunk);
finalOutput = concat(finalOutput, chunk as any);
} catch (e) {
finalOutput = undefined;
concatSupported = false;
Expand Down Expand Up @@ -1473,7 +1467,7 @@ export class RunnableMap<

async invoke(
input: RunInput,
options?: Partial<BaseCallbackConfig>
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
const callbackManager_ = await getCallbackMangerForConfig(options);
const runManager = await callbackManager_?.handleChainStart(
Expand All @@ -1494,7 +1488,7 @@ export class RunnableMap<
Object.entries(this.steps).map(async ([key, runnable]) => {
output[key] = await runnable.invoke(
input,
this._patchConfig(options, runManager?.getChild(key))
this._patchConfig(options, runManager?.getChild(`map:key:${key}`))
);
})
);
Expand All @@ -1505,6 +1499,64 @@ export class RunnableMap<
await runManager?.handleChainEnd(output);
return output as RunOutput;
}

async *_transform(
generator: AsyncGenerator<RunInput>,
runManager?: CallbackManagerForChainRun,
options?: Partial<RunnableConfig>
): AsyncGenerator<RunOutput> {
// shallow copy steps to ignore changes while iterating
const steps = { ...this.steps };
// each step gets a copy of the input iterator
const inputCopies = atee(generator, Object.keys(steps).length);
// start the first iteration of each output iterator
const tasks = new Map(
Object.entries(steps).map(([key, runnable], i) => {
const gen = runnable.transform(
inputCopies[i],
this._patchConfig(options, runManager?.getChild(`map:key:${key}`))
);
return [key, gen.next().then((result) => ({ key, gen, result }))];
})
);
// yield chunks as they become available,
// starting new iterations as needed,
// until all iterators are done
while (tasks.size) {
const { key, result, gen } = await Promise.race(tasks.values());
tasks.delete(key);
if (!result.done) {
yield { [key]: result.value } as unknown as RunOutput;
tasks.set(
key,
gen.next().then((result) => ({ key, gen, result }))
);
}
}
}

transform(
generator: AsyncGenerator<RunInput>,
options?: Partial<RunnableConfig>
): AsyncGenerator<RunOutput> {
return this._transformStreamWithConfig(
generator,
this._transform.bind(this),
options
);
}

async stream(
input: RunInput,
options?: Partial<RunnableConfig>
): Promise<IterableReadableStream<RunOutput>> {
async function* generator() {
yield input;
}
return IterableReadableStream.fromAsyncGenerator(
this.transform(generator(), options)
);
}
}

/**
Expand Down Expand Up @@ -1537,22 +1589,29 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<

async _invoke(
input: RunInput,
config?: Partial<BaseCallbackConfig>,
config?: Partial<RunnableConfig>,
runManager?: CallbackManagerForChainRun
) {
let output = await this.func(input, { config });
if (output && Runnable.isRunnable(output)) {
if (config?.recursionLimit === 0) {
throw new Error("Recursion limit reached.");
}
output = await output.invoke(
input,
this._patchConfig(config, runManager?.getChild())
this._patchConfig(
config,
runManager?.getChild(),
(config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1
)
);
}
return output;
}

async invoke(
input: RunInput,
options?: Partial<BaseCallbackConfig>
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
return this._callWithConfig(this._invoke, input, options);
}
Expand Down Expand Up @@ -1597,7 +1656,7 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<

async invoke(
input: RunInput,
options?: Partial<BaseCallbackConfig>
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
const callbackManager_ = await CallbackManager.configure(
options?.callbacks,
Expand Down Expand Up @@ -1639,25 +1698,25 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<

async batch(
inputs: RunInput[],
options?: Partial<BaseCallbackConfig> | Partial<BaseCallbackConfig>[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions & { returnExceptions?: false }
): Promise<RunOutput[]>;

async batch(
inputs: RunInput[],
options?: Partial<BaseCallbackConfig> | Partial<BaseCallbackConfig>[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions & { returnExceptions: true }
): Promise<(RunOutput | Error)[]>;

async batch(
inputs: RunInput[],
options?: Partial<BaseCallbackConfig> | Partial<BaseCallbackConfig>[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]>;

async batch(
inputs: RunInput[],
options?: Partial<BaseCallbackConfig> | Partial<BaseCallbackConfig>[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]> {
if (batchOptions?.returnExceptions) {
Expand Down
18 changes: 17 additions & 1 deletion langchain-core/src/runnables/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,21 @@ import {
CallbackManager,
} from "../callbacks/manager.js";

export type RunnableConfig = BaseCallbackConfig;
export const DEFAULT_RECURSION_LIMIT = 25;

export interface RunnableConfig extends BaseCallbackConfig {
/**
* Runtime values for attributes previously made configurable on this Runnable,
* or sub-Runnables.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
configurable?: Record<string, any>;

/**
* Maximum number of times a call can recurse. If not provided, defaults to 25.
*/
recursionLimit?: number;
}

export async function getCallbackMangerForConfig(config?: RunnableConfig) {
return CallbackManager.configure(
Expand All @@ -28,6 +42,8 @@ export function mergeConfigs<CallOptions extends RunnableConfig>(
copy[key] = { ...copy[key], ...options[key] };
} else if (key === "tags") {
copy[key] = (copy[key] ?? []).concat(options[key] ?? []);
} else if (key === "configurable") {
copy[key] = { ...copy[key], ...options[key] };
} else if (key === "callbacks") {
const baseCallbacks = copy.callbacks;
const providedCallbacks = options.callbacks ?? config.callbacks;
Expand Down
12 changes: 4 additions & 8 deletions langchain-core/src/runnables/history.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { BaseCallbackConfig } from "../callbacks/manager.js";
import {
BaseChatMessageHistory,
BaseListChatMessageHistory,
Expand Down Expand Up @@ -28,10 +27,7 @@ type GetSessionHistoryCallable = (
| BaseListChatMessageHistory;

export interface RunnableWithMessageHistoryInputs<RunInput, RunOutput>
extends Omit<
RunnableBindingArgs<RunInput, RunOutput, BaseCallbackConfig>,
"bound" | "config"
> {
extends Omit<RunnableBindingArgs<RunInput, RunOutput>, "bound" | "config"> {
runnable: Runnable<RunInput, RunOutput>;
getMessageHistory: GetSessionHistoryCallable;
inputMessagesKey?: string;
Expand All @@ -43,7 +39,7 @@ export interface RunnableWithMessageHistoryInputs<RunInput, RunOutput>
export class RunnableWithMessageHistory<
RunInput,
RunOutput
> extends RunnableBinding<RunInput, RunOutput, BaseCallbackConfig> {
> extends RunnableBinding<RunInput, RunOutput> {
runnable: Runnable<RunInput, RunOutput>;

inputMessagesKey?: string;
Expand Down Expand Up @@ -151,7 +147,7 @@ export class RunnableWithMessageHistory<
return returnType;
}

async _exitHistory(run: Run, config: BaseCallbackConfig): Promise<void> {
async _exitHistory(run: Run, config: RunnableConfig): Promise<void> {
const history = config.configurable?.messageHistory;

// Get input messages
Expand All @@ -176,7 +172,7 @@ export class RunnableWithMessageHistory<
}
}

async _mergeConfig(...configs: Array<BaseCallbackConfig | undefined>) {
async _mergeConfig(...configs: Array<RunnableConfig | undefined>) {
const config = await super._mergeConfig(...configs);
// Extract sessionId
if (!config.configurable || !config.configurable.sessionId) {
Expand Down
Loading

2 comments on commit 1eca176

@vercel
Copy link

@vercel vercel bot commented on 1eca176 Dec 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

langchainjs-docs – ./docs/core_docs/

langchainjs-docs-git-main-langchain.vercel.app
langchainjs-docs-langchain.vercel.app
js.langchain.com
langchainjs-docs-ruddy.vercel.app

@vercel
Copy link

@vercel vercel bot commented on 1eca176 Dec 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.