Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
Improved streaming chat output (LangStream#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbartholomew committed Sep 19, 2023
1 parent f315836 commit aca0085
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
7 changes: 7 additions & 0 deletions examples/applications/openai-completions/gateways.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ gateways:
- id: consume-history
type: consume
topic: history-topic
parameters:
- sessionId
consume-options:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId

- id: produce-input-auth
type: produce
Expand Down
6 changes: 3 additions & 3 deletions examples/applications/openai-completions/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ pipeline:
# on the streaming answer we send the answer as whole message
# the 'value' syntax is used to refer to the whole value of the message
stream-response-completion-field: "value"
# we want to stream the answer as soon as we have 20 chunks
# we want to stream the answer as soon as we have 10 chunks
# in order to reduce latency for the first message the agent sends the first message
# with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value
# eventually we want to send bigger messages to reduce the overhead of each message on the topic
min-chunks-per-message: 20
min-chunks-per-message: 10
messages:
- role: user
content: "You are an helpful assistant. Below you can fine a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}"
content: "You are a helpful assistant. Below you can find a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}"
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void run() {

AtomicBoolean waitingProduceResponse = new AtomicBoolean(false);
AtomicBoolean waitingConsumeMessage = new AtomicBoolean(false);
AtomicBoolean isStreamingOutput = new AtomicBoolean(false);

final AtomicReference<CompletableFuture<Void>> loop = new AtomicReference<>();

Expand All @@ -102,27 +103,27 @@ public void run() {
map -> {
final Map<String, Object> record = (Map<String, Object>) map.get("record");
Map<String, String> headers = (Map<String, String>) record.get("headers");
boolean isStreamingOutput = false;
boolean isLastMessage = false;
int streamIndex = -1;
if (headers != null) {
String streamLastMessage = headers.get("stream-last-message");
if (streamLastMessage != null) {
isStreamingOutput = true;
isStreamingOutput.set(true);
isLastMessage = Boolean.parseBoolean(streamLastMessage + "");
streamIndex =
Integer.parseInt(headers.getOrDefault("stream-index", "-1"));
}
}
if (isStreamingOutput) {
if (isStreamingOutput.get()) {
if (streamIndex == 1) {
logServer("Server:");
}
logNoNewline(String.valueOf(record.get("value")));
if (isLastMessage) {
logServer("\n");
logServer(".");
waitingConsumeMessage.set(false);
isStreamingOutput.set(false);
logServer(".");
}
} else {
logServer("\n");
Expand Down Expand Up @@ -164,6 +165,7 @@ public void run() {
null, line, Map.of())));
waitingProduceResponse.set(true);
waitingConsumeMessage.set(true);
isStreamingOutput.set(false);
while (waitingProduceResponse.get()) {
Thread.sleep(500);
if (waitingProduceResponse.get()) {
Expand All @@ -173,7 +175,13 @@ public void run() {
while (waitingConsumeMessage.get()) {
Thread.sleep(500);
if (waitingConsumeMessage.get()) {
logUserNoNewLine(".");
// If streaming, just flush the line, otherwise
// print a dot
if (isStreamingOutput.get()) {
logUserFlush();
} else {
logUserNoNewLine(".");
}
}
}
}
Expand Down Expand Up @@ -265,6 +273,10 @@ private void logUserNoNewLine(String message) {
command.commandLine().getOut().flush();
}

private void logUserFlush() {
command.commandLine().getOut().flush();
}

private void logServer(String message) {
log("\u001B[32m" + message + "\u001B[0m");
command.commandLine().getOut().flush();
Expand Down

0 comments on commit aca0085

Please sign in to comment.