diff --git a/examples/applications/openai-completions/gateways.yaml b/examples/applications/openai-completions/gateways.yaml index eda8d4ea8..dc3448828 100644 --- a/examples/applications/openai-completions/gateways.yaml +++ b/examples/applications/openai-completions/gateways.yaml @@ -40,6 +40,13 @@ gateways: - id: consume-history type: consume topic: history-topic + parameters: + - sessionId + consume-options: + filters: + headers: + - key: langstream-client-session-id + value-from-parameters: sessionId - id: produce-input-auth type: produce diff --git a/examples/applications/openai-completions/pipeline.yaml b/examples/applications/openai-completions/pipeline.yaml index db8e09877..e8cc896e5 100644 --- a/examples/applications/openai-completions/pipeline.yaml +++ b/examples/applications/openai-completions/pipeline.yaml @@ -42,11 +42,11 @@ pipeline: # on the streaming answer we send the answer as whole message # the 'value' syntax is used to refer to the whole value of the message stream-response-completion-field: "value" - # we want to stream the answer as soon as we have 20 chunks + # we want to stream the answer as soon as we have 10 chunks # in order to reduce latency for the first message the agent sends the first message # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value # eventually we want to send bigger messages to reduce the overhead of each message on the topic - min-chunks-per-message: 20 + min-chunks-per-message: 10 messages: - role: user - content: "You are an helpful assistant. Below you can fine a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}" + content: "You are a helpful assistant. Below you can find a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}" diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ChatGatewayCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ChatGatewayCmd.java index e11615bc6..4ee0b00c5 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ChatGatewayCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/gateway/ChatGatewayCmd.java @@ -84,6 +84,7 @@ public void run() { AtomicBoolean waitingProduceResponse = new AtomicBoolean(false); AtomicBoolean waitingConsumeMessage = new AtomicBoolean(false); + AtomicBoolean isStreamingOutput = new AtomicBoolean(false); final AtomicReference> loop = new AtomicReference<>(); @@ -102,27 +103,27 @@ public void run() { map -> { final Map record = (Map) map.get("record"); Map headers = (Map) record.get("headers"); - boolean isStreamingOutput = false; boolean isLastMessage = false; int streamIndex = -1; if (headers != null) { String streamLastMessage = headers.get("stream-last-message"); if (streamLastMessage != null) { - isStreamingOutput = true; + isStreamingOutput.set(true); isLastMessage = Boolean.parseBoolean(streamLastMessage + ""); streamIndex = Integer.parseInt(headers.getOrDefault("stream-index", "-1")); } } - if (isStreamingOutput) { + if (isStreamingOutput.get()) { if (streamIndex == 1) { logServer("Server:"); } logNoNewline(String.valueOf(record.get("value"))); if (isLastMessage) { logServer("\n"); - logServer("."); waitingConsumeMessage.set(false); + isStreamingOutput.set(false); + logServer("."); } } else { logServer("\n"); @@ -164,6 +165,7 @@ public void run() { null, line, Map.of()))); waitingProduceResponse.set(true); waitingConsumeMessage.set(true); + isStreamingOutput.set(false); while (waitingProduceResponse.get()) { Thread.sleep(500); if (waitingProduceResponse.get()) { @@ -173,7 +175,13 @@ public void run() { while (waitingConsumeMessage.get()) { Thread.sleep(500); if (waitingConsumeMessage.get()) { - logUserNoNewLine("."); + // If streaming, just flush the line, otherwise + // print a dot + if (isStreamingOutput.get()) { + logUserFlush(); + } else { + logUserNoNewLine("."); + } } } } @@ -265,6 +273,10 @@ private void logUserNoNewLine(String message) { command.commandLine().getOut().flush(); } + private void logUserFlush() { + command.commandLine().getOut().flush(); + } + private void logServer(String message) { log("\u001B[32m" + message + "\u001B[0m"); command.commandLine().getOut().flush();