# Stream chat with async flex flow

**Learning Objectives** - Upon completing this tutorial, you should be able to:

- Write LLM application using class based flex flow.
- Use OpenAIModelConfiguration as class init parameter.
- Use prompty to stream completions.
- Convert the application into a async flow and batch run against multi lines of data.
- Use classed base flow to evaluate the main flow and learn how to do aggregation.

## 0. Install dependent packages

In [8]:
%%capture --no-stderr
%pip install -r ./requirements.txt

## 1. Trace your application with promptflow

Assume we already have a python program, which leverage prompty.

In [9]:
with open("flow.py") as fin:
    print(fin.read())

import asyncio
import os
from pathlib import Path

from promptflow.tracing import trace
from promptflow.core import OpenAIModelConfiguration, Prompty

BASE_DIR = Path(__file__).absolute().parent


def log(message: str):
    verbose = os.environ.get("VERBOSE", "false")
    if verbose.lower() == "true":
        print(message, flush=True)


class ChatFlow:
    def __init__(
        self, model_config: OpenAIModelConfiguration, max_total_token=1100
    ):
        self.model_config = model_config
        self.max_total_token = max_total_token

    @trace
    async def __call__(
        self, question: str = "What is ChatGPT?", chat_history: list = None
    ) -> str:
        """Flow entry function."""

        prompty = Prompty.load(
            source=BASE_DIR / "chat.prompty",
            model={"configuration": self.model_config},
        )

        chat_history = chat_history or []
        # Try to render the prompt with token limit and reduce the history count if it fails
        while 

When `stream=true` is configured in the parameters of a prompt whose output format is text, promptflow sdk will return a generator type, which item is the content of each chunk.

Reference openai doc on how to do it using plain python code: [how_to_stream_completions](https://cookbook.openai.com/examples/how_to_stream_completions)

In [10]:
with open("chat.prompty") as fin:
    print(fin.read())

---
name: Stream Chat
description: Chat with stream enabled.
model:
  api: chat
  configuration:
    type: azure_openai
    model: gpt-3.5-turbo
  parameters:
    temperature: 0.2
    stream: true
    max_tokens: 1024
inputs: 
  question:
    type: string
  chat_history:
    type: list
sample:
  question: "What is Prompt flow?"
---

system:
You are a helpful assistant.

{% for item in chat_history %}
{{item.role}}:
{{item.content}}
{% endfor %}

user:
{{question}}




### Create necessary connections
Connection helps securely store and manage secret keys or other sensitive credentials required for interacting with LLM and other external tools for example Azure Content Safety.

Above prompty uses connection `open_ai_connection` inside, we need to set up the connection if we haven't added it before. After created, it's stored in local db and can be used in any flow.

Prepare your Azure OpenAI resource follow this [instruction](https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal) and get your `api_key` if you don't have one.

In [11]:
from promptflow.client import PFClient
from promptflow.connections import AzureOpenAIConnection, OpenAIConnection

# client can help manage your runs and connections.
pf = PFClient()
try:
    conn_name = "open_ai_connection"
    conn = pf.connections.get(name=conn_name)
    print("using existing connection")
except:
    # Follow https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/create-resource?pivots=web-portal to create an Azure OpenAI resource.
    connection = AzureOpenAIConnection(
        name=conn_name,
        api_key="<your_AOAI_key>",
        api_base="<your_AOAI_endpoint>",
        api_type="azure",
    )

    # use this if you have an existing OpenAI account
    # connection = OpenAIConnection(
    #     name=conn_name,
    #     api_key="<user-input>",
    # )

    conn = pf.connections.create_or_update(connection)
    print("successfully created connection")

print(conn)

using existing connection
name: open_ai_connection
module: promptflow.connections
created_date: '2025-04-21T12:39:40.203530'
last_modified_date: '2025-04-21T14:34:50.329413'
type: open_ai
api_key: '******'



### Visualize trace by using start_trace

Note we add `@trace` in the `my_llm_tool` function, re-run below cell will collect a trace in trace UI.

In [12]:
from promptflow.core import OpenAIModelConfiguration

model_config = OpenAIModelConfiguration(
    connection="open_ai_connection",
    model="gpt-3.5-turbo"
)


In [13]:
from promptflow.tracing import start_trace
# from promptflow.core import OpenAIModelConfiguration
from promptflow.core import OpenAIModelConfiguration

from flow import ChatFlow

# ✅ Use plain OpenAI config instead of Azure
config = OpenAIModelConfiguration(
    connection="open_ai_connection",  # must match the created connection name
    model="gpt-3.5-turbo",  # actual OpenAI model
)
chat_flow = ChatFlow(config)

# start a trace session, and print a url for user to check trace
start_trace()

# run the flow as function, which will be recorded in the trace
result = chat_flow(question="What is ChatGPT? Please explain with detailed statement")
# note the type is async generator object as we enabled stream in prompty
result

Prompt flow service has started...


<async_generator object ChatFlow.__call__ at 0x70de6c877340>

In [14]:
import asyncio

# print result in stream manner
async for output in result:
    print(output, end="")
    await asyncio.sleep(0.01)

🛰️ Sending request to OpenAI with params:
{
  "temperature": 0.2,
  "stream": true,
  "max_tokens": 1024,
  "model": "gpt-3.5-turbo",
  "messages": [
    {
      "role": "system",
      "content": "You are a helpful assistant."
    },
    {
      "role": "user",
      "content": "What is ChatGPT? Please explain with detailed statement"
    }
  ]
}
🔐 Client type: <class 'openai.OpenAI'>
🔐 Client config keys: ['__annotations__', '__class__', '__class_getitem__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__orig_bases__', '__parameters__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_base_url', '_build_headers', '_build_request', '_calculate_retry_timeout', '_client', '_custom_headers', '_custo

In [15]:
result = chat_flow(question="What is ChatGPT? Please explain with consise statement")

answer = ""
async for output in result:
    answer += output
answer

🛰️ Sending request to OpenAI with params:
{
  "temperature": 0.2,
  "stream": true,
  "max_tokens": 1024,
  "model": "gpt-3.5-turbo",
  "messages": [
    {
      "role": "system",
      "content": "You are a helpful assistant."
    },
    {
      "role": "user",
      "content": "What is ChatGPT? Please explain with consise statement"
    }
  ]
}
🔐 Client type: <class 'openai.OpenAI'>
🔐 Client config keys: ['__annotations__', '__class__', '__class_getitem__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__orig_bases__', '__parameters__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_base_url', '_build_headers', '_build_request', '_calculate_retry_timeout', '_client', '_custom_headers', '_custom

'ChatGPT is an AI language model developed by OpenAI that can generate human-like text responses to prompts, enabling natural and engaging conversations with users.'

### Eval the result 

In [16]:
%load_ext autoreload
%autoreload 2

import paths  # add the code_quality module to the path
from check_list import EvalFlow

eval_flow = EvalFlow(config)
# evaluate answer agains a set of statement
eval_result = eval_flow(
    answer=answer,
    statements={
        "correctness": "It contains a detailed explanation of ChatGPT.",
        "consise": "It is a consise statement.",
    },
)
eval_result

🛰️ Sending request to OpenAI with params:
{
  "max_tokens": 256,
  "temperature": 0.7,
  "model": "gpt-3.5-turbo",
  "messages": [
    {
      "role": "system",
      "content": "You are an AI assistant. \nYou task is to evaluate a score based on how the statement applies for the answer.\nOnly accepts valid JSON format response without extra prefix or postfix."
    },
    {
      "role": "user",
      "content": "This score value should always be an integer between 1 and 5. So the score produced should be 1 or 2 or 3 or 4 or 5.\n\nHere are a few examples:\nanswer: ChatGPT is a conversational AI model developed by OpenAI.\nstatement: It contains a brief explanation of ChatGPT.\nOUTPUT:\n{\"score\": \"5\", \"explanation\":\"The statement is correct. The answer contains a brief explanation of ChatGPT.\"}\n\nFor a given answer, valuate the answer based on how the statement applies for the answer:\nanswer: ChatGPT is an AI language model developed by OpenAI that can generate human-like text

{'correctness': {'score': '3',
  'explanation': "The statement is partially correct. The answer provides a detailed explanation of ChatGPT, but it does not explicitly mention 'detailed explanation' in the statement."},
 'consise': {'score': '3'}}

## 2. Batch run the function as flow with multi-line data


### Batch run with a data file (with multiple lines of test data)


In [17]:
from promptflow.client import PFClient

pf = PFClient()

In [18]:
data = "./data.jsonl"  # path to the data file
# create run with the flow function and data
base_run = pf.run(
    flow=chat_flow,
    data=data,
    column_mapping={
        "question": "${data.question}",
        "chat_history": "${data.chat_history}",
    },
    stream=True,
)



Prompt flow service has started...
You can view the traces in local from http://127.0.0.1:23333/v1.0/ui/traces/?#run=chat_async_stream_20250421_143559_228684


[2025-04-21 14:35:59 -0300][promptflow._sdk._orchestrator.run_submitter][INFO] - Submitting run chat_async_stream_20250421_143559_228684, log path: /home/matias/.promptflow/.runs/chat_async_stream_20250421_143559_228684/logs.txt


2025-04-21 14:36:08 -0300 1909783 execution.bulk     INFO     Process 1909956 terminated.
2025-04-21 14:35:59 -0300 1901803 execution.bulk     INFO     Current thread is not main thread, skip signal handler registration in BatchEngine.
2025-04-21 14:36:00 -0300 1901803 execution.bulk     INFO     Set process count to 3 by taking the minimum value among the factors of {'default_worker_count': 4, 'row_count': 3}.
2025-04-21 14:36:03 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-4:1)-Process id(1909956)-Line number(0) start execution.
2025-04-21 14:36:03 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-4:2)-Process id(1909964)-Line number(1) start execution.
2025-04-21 14:36:03 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-4:3)-Process id(1909972)-Line number(2) start execution.
2025-04-21 14:36:04 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-4:2)-Process id(1909964)-Line number(1) completed.
2025-04-21 14:3

In [19]:
details = pf.get_details(base_run)
details.head(10)

Unnamed: 0,inputs.question,inputs.chat_history,inputs.line_number,outputs.output
0,What is Prompt flow?,[],0,Prompt flow refers to the sequence of prompts ...
1,What is ChatGPT? Please explain with consise s...,[],1,ChatGPT is a conversational AI model developed...
2,How many questions did user ask?,"[{'role': 'user', 'content': 'where is the nea...",2,You have asked two questions so far. How can I...


## 3. Evaluate your flow
Then you can use an evaluation method to evaluate your flow. The evaluation methods are also flows which usually using LLM assert the produced output matches certain expectation. 

### Run evaluation on the previous batch run
The **base_run** is the batch run we completed in step 2 above, for web-classification flow with "data.jsonl" as input.

In [20]:
eval_run = pf.run(
    flow=eval_flow,
    data="./data.jsonl",  # path to the data file
    run=base_run,  # specify base_run as the run you want to evaluate
    column_mapping={
        "answer": "${run.outputs.output}",
        "statements": "${data.statements}",
    },
    stream=True,
)



Prompt flow service has started...
You can view the traces in local from http://127.0.0.1:23333/v1.0/ui/traces/?#run=check_list_evalflow_b3voou12_20250421_143609_676623


[2025-04-21 14:36:09 -0300][promptflow._sdk._orchestrator.run_submitter][INFO] - Submitting run check_list_evalflow_b3voou12_20250421_143609_676623, log path: /home/matias/.promptflow/.runs/check_list_evalflow_b3voou12_20250421_143609_676623/logs.txt


2025-04-21 14:36:17 -0300 1910302 execution.bulk     INFO     Process 1910464 terminated.
2025-04-21 14:36:10 -0300 1901803 execution.bulk     INFO     Current thread is not main thread, skip signal handler registration in BatchEngine.
2025-04-21 14:36:10 -0300 1901803 execution.bulk     INFO     Set process count to 3 by taking the minimum value among the factors of {'default_worker_count': 4, 'row_count': 3}.
2025-04-21 14:36:13 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-8:3)-Process id(1910478)-Line number(0) start execution.
2025-04-21 14:36:13 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-8:1)-Process id(1910464)-Line number(1) start execution.
2025-04-21 14:36:13 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-8:2)-Process id(1910471)-Line number(2) start execution.
2025-04-21 14:36:15 -0300 1901803 execution.bulk     INFO     Process name(ForkProcess-8:1)-Process id(1910464)-Line number(1) completed.
2025-04-21 14:3

In [21]:
details = pf.get_details(eval_run)
details.head(10)

Unnamed: 0,inputs.answer,inputs.statements,inputs.line_number,outputs.correctness,outputs.consise
0,Prompt flow refers to the sequence of prompts ...,"{'correctness': 'result should be 1', 'consise...",0,"{'score': '1', 'explanation': 'The statement i...","{'score': '3', 'explanation': 'The statement i..."
1,ChatGPT is a conversational AI model developed...,"{'correctness': 'result should be 1', 'consise...",1,"{'score': '1', 'explanation': 'The statement i...",{'score': '3'}
2,You have asked two questions so far. How can I...,"{'correctness': 'result should be 1', 'consise...",2,"{'score': '1', 'explanation': 'The statement i...","{'score': '3', 'explanation': 'The statement i..."


In [22]:
import json

metrics = pf.get_metrics(eval_run)
print(json.dumps(metrics, indent=4))

{
    "average_correctness": 1.0,
    "total": 3
}


In [23]:
pf.visualize([base_run, eval_run])

Prompt flow service has started...
The HTML file is generated at '/tmp/pf-visualize-detail-fhz1fves.html'.
Trying to view the result in a web browser...
Successfully visualized from the web browser.


## Next steps

By now you've successfully run your chat flow and did evaluation on it. That's great!

You can check out more examples:
- [Stream Chat](https://github.com/microsoft/promptflow/tree/main/examples/flex-flows/chat-stream): demonstrates how to create a chatbot that runs in streaming mode.