In [None]:
import requests, json

data = None
url = "http://127.0.0.1:2512/stream"

generated_text = ""
with requests.post(url, data=data, stream=True) as response:
    if response.status_code == 200:
        for chunk in response.iter_lines():
            if not chunk:
                continue
            print(chunk)
            stem = "data: "
            chunk = chunk[len(stem) :]
            if chunk == b"[DONE]":
                continue
            # tokens_num += 1
            data = json.loads(chunk)
            delta = data["choices"][0]["delta"]
            if delta.get("content", None):
                generated_text += delta["content"]
                print(generated_text)
    else:
        print(f"Request failed with status code: {response.status_code}")

In [None]:
from pydantic import BaseModel, ConfigDict
from typing import Dict, List
import uuid

class ResponseChoice(BaseModel):
    index: int = 0
    delta: Dict[str, str]

class ResponseChunk(BaseModel):
    id: str
    choices: List[ResponseChoice]


sstr = "hello world"
chunk = ResponseChunk(
    id=str(uuid.uuid4()),
    choices=[ResponseChoice(
        index=0,
        delta={'content':f'{sstr}'}
    )]
)
data = chunk.model_dump_json(exclude_unset=True)
print(data)

In [None]:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict
from typing import Dict, List
import time
import uvicorn
import asyncio
import threading
import uuid

class AsyncStreamResponse:
    def __init__(self):
        self.data = []
        self.stop_signal = False
        self.lock = threading.Lock()
        self.data_event = asyncio.Event()

    def add_data(self, value):
        with self.lock:
            self.data.append(value)
        self.data_event.set()

    def send_stop_signal(self):
        with self.lock:
            self.stop_signal = True
        self.data_event.set()

    def __aiter__(self):
        self.index = 0
        return self

    async def __anext__(self):
        while True:
            with self.lock:
                if self.stop_signal and self.index >= len(self.data):
                    raise StopAsyncIteration
                if self.index < len(self.data):
                    result = self.data[self.index]
                    self.index += 1
                    return result
            self.data_event.clear()
            await self.data_event.wait()

async def producer(asresponse):
    for c in ['你','好','我','是','s','u','p','e','r',' ','o','u','t',' ','man']:
        asresponse.add_data(c)
        await asyncio.sleep(1)
    asresponse.send_stop_signal()

astream = AsyncStreamResponse()

loop = asyncio.get_event_loop()
loop.create_task(producer(astream))

async for data in astream:
    print(data)


In [None]:
import requests, json

url = "http://127.0.0.1:21002/v1/chat/completions"

headers = {
    "Content-Type": "application/json"
}
message = [
    {
        "role": "system",
        "content": "You are a poetic assistant, skilled in explaining complex programming concepts with creative flair."
    },
    {
        "role": "user",
        "content": "Compose a poem that explains the concept of recursion in programming."
    }
]

body = {
    'message': message
}

generated_text = ""
with requests.post(url, json=body, stream=True) as response:
    if response.status_code == 200:
        for chunk in response.iter_lines():
            if not chunk:
                continue
            print(chunk)
            stem = "data: "
            chunk = chunk[len(stem) :]
            if chunk == b"[DONE]":
                continue
            # tokens_num += 1
            data = json.loads(chunk)
            delta = data["choices"][0]["delta"]
            if delta.get("content", None):
                generated_text += delta["content"]
        print(generated_text)
    else:
        print(f"Request failed with status code: {response.status_code}")