Replies: 7 comments 8 replies
-
I'm having the same problem. I've tried following one of the examples with const chain = new RemoteRunnable({
url: `http://localhost:8000/langserve_end_point/`,
})
const data = new experimental_StreamData();
const { stream, handlers } = LangChainStream(
{
onFinal: () => {
data.close();
},
experimental_streamData: true,
});
await chain.stream(
{
input: currentMessageContent,
config: {},
kwargs: {},
},
{ callbacks: [handlers] }
);
return new StreamingTextResponse(stream, {}, data); |
Beta Was this translation helpful? Give feedback.
-
Try the following and see if it works, it doesn't stream the output for me but at least it works const outputParser = new HttpResponseOutputParser();
const chain = new RemoteRunnable({
url: `http://localhost:8000/langserve_end_point/`,
}).pipe(outputParser);
const stream = await chain.stream(
JSON.stringify(
{
input: currentMessageContent,
config: {},
kwargs: {},
}),
);
return new StreamingTextResponse(stream) |
Beta Was this translation helpful? Give feedback.
-
UPDATE: Got it to work by piping the response with const chain = new RemoteRunnable({
url: `http://localhost:8000/langserve_end_point/`,
options: { timeout: 10000000 },
})
const stream = await chain.stream(
JSON.stringify(
{
input: currentMessageContent,
config: {},
kwargs: {},
}),
);
const decoder = new TextDecoder();
const encoder = new TextEncoder();
let first_entry_skipped = false;
const transformStream = new TransformStream({
transform(chunk, controller) {
if (!first_entry_skipped) {
first_entry_skipped = true;
}
else {
controller.enqueue(chunk.toString());
}
},
});
return new StreamingTextResponse(stream.pipeThrough(transformStream)); The first chunk is skipped since it corresponds to the |
Beta Was this translation helpful? Give feedback.
-
Did anyone get this to work with EDIT: I made something work, it feels like there should be a better way to do this, but here is one way in case anyone else is trying to use RemoteRunner, and render agent steps/tool usage + stream the text response as well. 'use client';
import { Message, experimental_StreamingReactResponse, readableFromAsyncIterable } from 'ai';
import { RemoteRunnable } from 'langchain/runnables/remote';
import { applyPatch } from '@langchain/core/utils/json_patch';
import { Document } from '@langchain/core/documents';
import { Session } from 'next-auth';
import { Accordion, AccordionSummary, Card, CardContent, ListItem, Box, Typography, List, AccordionDetails } from '@mui/material';
const formatMessage = (message: Message) => {
return `${message.role}: ${message.content}`;
};
export async function MyChatHandler(chatId: number, session: Session, messages: Message[]) {
const formattedPreviousMessages = messages.slice(0, -1).map(formatMessage);
const currentMessageContent = messages.length ? messages[messages.length - 1].content : "";
const remoteChain = new RemoteRunnable({
url: `${process.env.NEXT_PUBLIC_API_URI}`,
options: {
timeout: 5 * 60 * 1000,
headers: {
Authorization: session.idToken,
},
},
});
const logStream = remoteChain.streamLog(
{
input: currentMessageContent,
chat_history: formattedPreviousMessages.join('\n'),
},
{
configurable: { chat_id: chatId }
}
);
const encoder = new TextEncoder()
let aggregateResponseChain: Record<string, any> = {};
const transformStream = new TransformStream({
transform(chunk, controller) {
aggregateResponseChain = applyPatch(aggregateResponseChain, chunk.ops).newDocument;
for (const op of chunk.ops) {
// TODO: Will there ever be other operations than add for the streamed_output_str?
if (op.op == "add" &&
op.path.startsWith("/logs/AzureChatOpenAI") &&
op.path.endsWith("streamed_output_str/-")) {
const streamedText = encoder.encode(op.value.toString());
controller.enqueue(streamedText);
}
}
},
});
const stream = readableFromAsyncIterable(logStream);
// TODO: import the correct types from the langchain package
interface StepInfo {
action: { log: string };
observation: Document[];
}
return new experimental_StreamingReactResponse(stream.pipeThrough(transformStream), {
ui({ content, data }) {
// console.log("aggregateResponseChain: ", aggregateResponseChain);
const steps = aggregateResponseChain?.final_output?.steps ? aggregateResponseChain.final_output.steps : [];
return (
<Box sx={{ width: "80%" }}>
{steps.map((step: StepInfo, index: number) => ( // TODO: Custom rendering of each kind of step
<Accordion sx={{ mb: 2 }} key={index}>
<AccordionSummary>
<Typography variant="subtitleMedium">{step.action.log}</Typography>
</AccordionSummary>
<AccordionDetails>
<List>
{step.observation.map((doc, index) => {
return <DocumentComponent document={doc} key={index}/>
})}
</List>
</AccordionDetails>
</Accordion>
))}
<Card elevation={1}>
<CardContent>
<Typography variant="body">{content}</Typography>
</CardContent>
</Card>
</Box>)
}
});
} Then you can use useChat with something like this: import { useChat } from "ai/react";
type ChatHandler = (payload: {
messages: Message[];
data?: Record<string, string>;
}) => Promise<experimental_StreamingReactResponse>;
const handler: ChatHandler = (payload) => {
const { messages, data } = payload;
return MyChatHandler(chat.id, session, messages);
}
const chat = useChat({api: handler}); Hope it helps someone out there. Would also like to hear if anyone made it work in a better way :) |
Beta Was this translation helpful? Give feedback.
-
I believe there's always room for improvement, but I've managed to successfully integrate https://github.com/dbonates/langserve-nextjs basically I created am API that makes use of Hope it helps! |
Beta Was this translation helpful? Give feedback.
-
FWIW (and I am anything but a TS/JS expert, so this might be a daft approach), I've just used a type assertion
having invoked the LangServe
|
Beta Was this translation helpful? Give feedback.
-
I've come to the following solution for using LangServe with the api/chat/route.ts import {
AIStream,
AIStreamCallbacksAndOptions,
AIStreamParser,
StreamingTextResponse,
createStreamDataTransformer,
} from "ai";
export const dynamic = "force-dynamic";
function parseLangServeStream(): AIStreamParser {
return (data) => {
const json = JSON.parse(data) as { content: string };
return json.content;
};
}
export function LangServeStream(
res: Response,
cb?: AIStreamCallbacksAndOptions,
): ReadableStream {
return AIStream(res, parseLangServeStream(), cb).pipeThrough(
createStreamDataTransformer(),
);
}
export async function POST(req: Request) {
const { messages } = await req.json();
const lastMessage = messages[messages.length - 1];
const postData = { input: lastMessage.content };
const fetchResponse = await fetch("http://127.0.0.1:8100/chat/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(postData),
});
const anthropicStream = LangServeStream(fetchResponse, {
onStart: async () => {
console.log("Stream started");
},
onCompletion: async (completion) => {
console.log("Completion completed", completion);
},
onFinal: async (completion) => {
console.log("Stream completed", completion);
},
onToken: async (token) => {
console.log("Token received", token);
},
});
return new StreamingTextResponse(anthropicStream);
} app/page.tsx "use client";
import { useChat } from "ai/react";
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: "/api/chat",
});
return (
<div className="stretch mx-auto flex w-full max-w-md flex-col py-24">
{messages.map((m) => (
<div key={m.id} className="whitespace-pre-wrap">
{m.role === "user" ? "User: " : "AI: "}
{m.content}
</div>
))}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 mb-8 w-full max-w-md rounded border border-gray-300 p-2 shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
);
} LangServer server.py from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from langserve import add_routes
from langchain.chat_models import ChatOpenAI
load_dotenv()
app = FastAPI()
@app.get("/")
async def redirect_root_to_docs():
return RedirectResponse("/docs")
# Edit this to add the chain you want to add
add_routes(app, ChatOpenAI(), path="/chat")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |
Beta Was this translation helpful? Give feedback.
-
Is there an example of how to use
RemoteRunnable
?There's currently an example for
RunnableSequence
.But I haven't been able to get RemoteRunnable working...
I get an error that doesn't make a lot of sense to me:
Beta Was this translation helpful? Give feedback.
All reactions