Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example using Streaming Response for FastAPI. #161

Open
mattzcarey opened this issue Jun 19, 2023 · 16 comments
Open

Example using Streaming Response for FastAPI. #161

mattzcarey opened this issue Jun 19, 2023 · 16 comments

Comments

@mattzcarey
Copy link

Lots of people write their Langchain apis in Python, not using RSC.

A common tech stack is using FastAPI on the backend with NextJS/React for the frontend. It would be great to show an example of this using FastAPI Streaming Response.

This would really help us building Quivr..

@jasan-s
Copy link

jasan-s commented Jun 22, 2023

@mattzcarey I'm thinking of using similar tech stack, but it seems that vercel doesn't support python runtime streaming. could you please share your stack in more detail.
I'm currently using langchain js b deployed to vercel edge function and streaming response back to client. But it is apparent that the python version is far more featured, thus my reason to switch.

@mattzcarey
Copy link
Author

mattzcarey commented Jun 24, 2023

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

@jasan-s
Copy link

jasan-s commented Jun 24, 2023

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

Did you deploy quiver to vercel?

@mattzcarey
Copy link
Author

@jasan-s I have managed to do this with langchain callbacks and Streaming Response from FastAPi. You can check out the 'stream' route in the Quivr codebase.

Did you deploy quiver to vercel?

Yes it can be.

@kallebysantos
Copy link

kallebysantos commented Oct 20, 2023

I Had create a gist example:

2023-10-20.22-57-29.mp4

@satyamdalai
Copy link

Having a native support for converting streaming responses from FastAPI/any other HTTP Server in Next.js API routes (with the help of SDK) will be helpful in my usecase. Since I don't want to directly call FastAPI endpoint using useChat hook, as I manage the authentication layer in Next.js.

@danielcorin
Copy link

danielcorin commented Feb 2, 2024

I came across this thread looking for the same thing but wanted to use the openai library (rather than langchain as in the gist above) and the useChat hook. Here's what I ended up doing:

server.py

from openai import AsyncOpenAI

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports, should change depending on your setup, not a good idea in prod
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream")

Run with

uvicorn server:app --reload

Example frontend src/app/page.tsx in a new Next.js app

"use client";

import { useChat } from "ai/react";

export default function Home() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: "http://127.0.0.1:8000/ask"
  });

  return (
    <main className="flex min-h-screen flex-col items-center justify-between p-24">
      <div>
        {messages.map((m) => (
          <div key={m.id}>
            {m.role === "user" ? "User: " : "AI: "}
            {m.content}
          </div>
        ))}

        <form onSubmit={handleSubmit}>
          <label>
            Say something...
            <input value={input} onChange={handleInputChange} />
          </label>
          <button type="submit">Send</button>
        </form>
      </div>
    </main>
  );
}

@kallebysantos
Copy link

I think that Issue should be mark as complete.
We had provide useful examples that solves the question.

@DanLeininger
Copy link

DanLeininger commented Feb 5, 2024

Building off the above answers, here's an example using experimental_StreamData:

server.py

from openai import AsyncOpenAI

from utils import stream_chunk #formats chunks for use with experimental_StreamData

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports, should change depending on your setup, not a good idea in prod
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=[ "X-Experimental-Stream-Data"],  # this is needed for streaming data header to be read by the client
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield stream_chunk(chunk.choices[0].delta.content or "", "text")
        yield stream_chunk([{"foo":"bar"}], "data") # send streaming data after 

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream",  headers={"X-Experimental-Stream-Data": "true"})

Where stream_chunk is a util that looks like this:

utils.py

# transforms the chunk into a stream part compatible with the vercel/ai
def stream_chunk(chunk, type: str = "text"):
    code = get_stream_part_code(type)
    formatted_stream_part = f"{code}:{json.dumps(chunk, separators=(',', ':'))}\n"
    return formatted_stream_part

# given a type returns the code for the stream part
def get_stream_part_code(stream_part_type: str) -> str:
    stream_part_types = {
        "text": "0",
        "function_call": "1",
        "data": "2",
        "error": "3",
        "assistant_message": "4",
        "assistant_data_stream_part": "5",
        "data_stream_part": "6",
        "message_annotations_stream_part": "7",
    }
    return stream_part_types[stream_part_type]

@szymonzmyslony
Copy link

szymonzmyslony commented Feb 9, 2024

@DanLeininger
your setup works for me when using useChat(). I want to add some custom onCompletion handlers with AI stream in route handler. My server setup is exactly like yours (again works with useChat) but im getting no response with:


export async function POST(req: Request) {
const json = await req.json()
const { messages, previewToken } = json
const userId = (await auth())?.user.id

if (!userId) {
return new Response('Unauthorized', {
status: 401
})
}
const data = {
messages: [{ role: 'user', content: 'Hello' }]
}
const fetchResponse = await fetch('http://127.0.0.1:8000/ask', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
const reader = fetchResponse
console.log('Reader is', reader)
const myStream = AIStream(reader, undefined, {
onStart: async () => {
console.log('Stream started')
},
onCompletion: async (completion: string) => {
console.log('Completion completed', completion)
},
onFinal: async (completion: string) => {
console.log('Stream completed', completion)
}
})
return new StreamingTextResponse(myStream)
}

@Udbhav8
Copy link

Udbhav8 commented Feb 14, 2024

@DanLeininger your setup works for me when using useChat(). I want to add some custom onCompletion handlers with AI stream in route handler. My server setup is exactly like yours (again works with useChat) but im getting no response with:


export async function POST(req: Request) {
const json = await req.json()
const { messages, previewToken } = json
const userId = (await auth())?.user.id

if (!userId) {
return new Response('Unauthorized', {
status: 401
})
}
const data = {
messages: [{ role: 'user', content: 'Hello' }]
}
const fetchResponse = await fetch('http://127.0.0.1:8000/ask', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
const reader = fetchResponse
console.log('Reader is', reader)
const myStream = AIStream(reader, undefined, {
onStart: async () => {
console.log('Stream started')
},
onCompletion: async (completion: string) => {
console.log('Completion completed', completion)
},
onFinal: async (completion: string) => {
console.log('Stream completed', completion)
}
})
return new StreamingTextResponse(myStream)
}

Having the same issue @danielcorin @DanLeininger would be great to have some help

@ichitaka
Copy link

I think that Issue should be mark as complete. We had provide useful examples that solves the question.

We still need a useful example that include tool-calling and streaming data.

@DanLeininger
Copy link

@szymonzmyslony @Udbhav8 In our use case we're bypassing Next.js api routes / route handlers and streaming from Fast API directly to the client / useChat() and so haven't attempted passing anything through AIStream

@ErikDale
Copy link

ErikDale commented Apr 3, 2024

@szymonzmyslony @Udbhav8 @satyamdalai have you found out how to add some custom onCompletion handlers with AI stream in the route handler, maybe using the AIStream?

@lgrammel
Copy link
Collaborator

If your endpoint sends a chunked text stream, you can useCompletion and useChat with streamMode: "text"

@ashen007
Copy link

@DanLeininger your answer worked for me, my use case was that I had a fast API back end which used langgraph agent and had to do the streaming as you mentioned. it worked properly, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests