From 4f8bf98b5a7e83267330f898df2a2ab94b3a1d31 Mon Sep 17 00:00:00 2001 From: tsuz <6927131+tsuz@users.noreply.github.com> Date: Mon, 25 May 2026 08:38:39 +0900 Subject: [PATCH 1/4] enable cache control --- .../think/service/ClaudeApiService.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java index fb5e29c..c52d3ba 100644 --- a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java +++ b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java @@ -130,7 +130,12 @@ public ThinkResponse callWithoutTools(String systemPrompt, Map body = new LinkedHashMap<>(); body.put("model", model); body.put("max_tokens", maxTokens); - body.put("system", systemPrompt); + // Cache the system prompt (see buildRequestBody). No tools to cache here. + body.put("system", List.of(Map.of( + "type", "text", + "text", systemPrompt, + "cache_control", Map.of("type", "ephemeral") + ))); body.put("messages", List.of(Map.of("role", "user", "content", conversationText.toString()))); // No tools @@ -167,7 +172,14 @@ private Map buildRequestBody(String systemPrompt, List body = new LinkedHashMap<>(); body.put("model", model); body.put("max_tokens", maxTokens); - body.put("system", systemPrompt); + // Send the system prompt as a content block with cache_control so the static + // prefix (tools + system) is cached. A single breakpoint here covers the tools + // below it in the tools -> system -> messages cache order. + body.put("system", List.of(Map.of( + "type", "text", + "text", systemPrompt, + "cache_control", Map.of("type", "ephemeral") + ))); body.put("messages", messages); List> tools = ToolDefinitions.getTools(); @@ -185,14 +197,22 @@ ThinkResponse parseResponse(String responseBody, String sessionId, String userId try { JsonNode root = mapper.readTree(responseBody); - // Extract token usage + // Extract token usage. With prompt caching enabled, cached tokens are reported + // separately and are NOT included in input_tokens, so they must be priced too. JsonNode usage = root.get("usage"); int inputTokens = usage != null ? usage.path("input_tokens").asInt(0) : 0; int outputTokens = usage != null ? usage.path("output_tokens").asInt(0) : 0; + int cacheCreationTokens = usage != null ? usage.path("cache_creation_input_tokens").asInt(0) : 0; + int cacheReadTokens = usage != null ? usage.path("cache_read_input_tokens").asInt(0) : 0; + boolean cacheHit = cacheReadTokens > 0; - // Calculate cost (null if pricing env vars not set) + // Calculate cost (null if pricing env vars not set). + // Cache writes are billed at ~1.25x and cache reads at ~0.1x of the base input price. Double cost = (INPUT_TOKEN_PRICE != null && OUTPUT_TOKEN_PRICE != null) - ? (inputTokens / 1_000_000.0 * INPUT_TOKEN_PRICE) + (outputTokens / 1_000_000.0 * OUTPUT_TOKEN_PRICE) + ? (inputTokens / 1_000_000.0 * INPUT_TOKEN_PRICE) + + (cacheCreationTokens / 1_000_000.0 * INPUT_TOKEN_PRICE * 1.25) + + (cacheReadTokens / 1_000_000.0 * INPUT_TOKEN_PRICE * 0.1) + + (outputTokens / 1_000_000.0 * OUTPUT_TOKEN_PRICE) : null; // Determine if this is an end turn @@ -270,8 +290,9 @@ ThinkResponse parseResponse(String responseBody, String sessionId, String userId } } - log.info("[{}] Claude response: input_tokens={} output_tokens={} tool_uses={} end_turn={} cost={}", - sessionId, inputTokens, outputTokens, toolUses.size(), endTurn, + log.info("[{}] Claude response: input_tokens={} output_tokens={} cache_write={} cache_read={} cache_hit={} tool_uses={} end_turn={} cost={}", + sessionId, inputTokens, outputTokens, cacheCreationTokens, cacheReadTokens, cacheHit, + toolUses.size(), endTurn, cost != null ? String.format("$%.6f", cost) : "null"); return new ThinkResponse( From 9fa0e908a45e9a11edef7eac2eb68a5d1f7cff9f Mon Sep 17 00:00:00 2001 From: tsuz <6927131+tsuz@users.noreply.github.com> Date: Mon, 25 May 2026 23:41:41 +0900 Subject: [PATCH 2/4] add prompt cache from env var --- README.md | 1 + .../flightdeck/memoir/config/AppConfig.java | 3 + .../memoir/service/ClaudeMemoirService.java | 12 +- .../flightdeck_sdk/think_consumer_runner.py | 16 +- sdk/typescript/src/think-consumer-runner.ts | 859 ++++++++++++++++++ .../io/flightdeck/think/config/AppConfig.java | 4 + .../think/service/ClaudeApiService.java | 39 +- 7 files changed, 918 insertions(+), 16 deletions(-) create mode 100644 sdk/typescript/src/think-consumer-runner.ts diff --git a/README.md b/README.md index 8e21c6d..5858797 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ All configuration is done via environment variables in the `.env` file. See [`.e | `CLAUDE_API_KEY` | *(required for Claude)* | Your Anthropic API key | | `CLAUDE_MODEL` | `claude-sonnet-4-20250514` | Claude model to use | | `CLAUDE_MAX_TOKENS` | `8096` | Max tokens per Claude response | +| `PROMPT_CACHING` | `false` | Enable Anthropic prompt caching (adds a `cache_control` breakpoint to the system prompt). Only takes effect when the cached prefix exceeds the model's minimum cacheable length (e.g. 4,096 tokens for Haiku 4.5, 1,024 for Sonnet). | | `GEMINI_API_KEY` | *(required for Gemini)* | Your Google Gemini API key | | `GEMINI_MODEL` | `gemini-2.5-flash` | Gemini model to use | | `GEMINI_MAX_TOKENS` | `4096` | Max tokens per Gemini response | diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/AppConfig.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/AppConfig.java index 2414ec1..422a54c 100644 --- a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/AppConfig.java +++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/config/AppConfig.java @@ -28,6 +28,9 @@ private AppConfig() {} env("CLAUDE_MODEL", "claude-sonnet-4-20250514"); public static final int CLAUDE_MAX_TOKENS = Integer.parseInt(env("CLAUDE_MAX_TOKENS", "4096")); + // Enable Anthropic prompt caching (cache_control breakpoints). Off by default. + public static final boolean PROMPT_CACHING = + Boolean.parseBoolean(env("PROMPT_CACHING", "false")); // ── Tuning ─────────────────────────────────────────────────────────────── public static final long POLL_TIMEOUT_MS = diff --git a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/service/ClaudeMemoirService.java b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/service/ClaudeMemoirService.java index 43fbd86..9add8d3 100644 --- a/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/service/ClaudeMemoirService.java +++ b/memoir/update-memoir-consumer/src/main/java/io/flightdeck/memoir/service/ClaudeMemoirService.java @@ -69,7 +69,17 @@ public String updateMemoir(String sessionId, MemoirSessionEnd snapshot) { Map requestBody = new LinkedHashMap<>(); requestBody.put("model", AppConfig.CLAUDE_MODEL); requestBody.put("max_tokens", AppConfig.CLAUDE_MAX_TOKENS); - requestBody.put("system", SYSTEM_PROMPT); + // When prompt caching is enabled, send the system prompt as a content block + // with a cache_control breakpoint; otherwise send it as a plain string. + if (AppConfig.PROMPT_CACHING) { + requestBody.put("system", List.of(Map.of( + "type", "text", + "text", SYSTEM_PROMPT, + "cache_control", Map.of("type", "ephemeral") + ))); + } else { + requestBody.put("system", SYSTEM_PROMPT); + } requestBody.put("messages", List.of( Map.of("role", "user", "content", userMessage) )); diff --git a/sdk/python/flightdeck_sdk/think_consumer_runner.py b/sdk/python/flightdeck_sdk/think_consumer_runner.py index 863d88e..b7f4343 100644 --- a/sdk/python/flightdeck_sdk/think_consumer_runner.py +++ b/sdk/python/flightdeck_sdk/think_consumer_runner.py @@ -25,6 +25,9 @@ class ThinkConsumerConfig: claude_model: str = "claude-haiku-4-5-20251001" claude_max_tokens: int = 4096 claude_api_url: str = "https://api.anthropic.com/v1/messages" + # Enable Anthropic prompt caching (cache_control breakpoints). Defaults from the + # PROMPT_CACHING env var (off unless set to "true"); can be overridden explicitly. + prompt_caching: bool = os.environ.get("PROMPT_CACHING", "false").lower() == "true" poll_timeout_s: float = 1.0 system_prompt_builder: Optional[Callable[[str, dict], str]] = None llm_provider: str = "claude" @@ -329,10 +332,21 @@ def _append_or_merge(self, messages: list[dict], role: str, text: str) -> None: messages.append({"role": role, "content": text}) def _call_claude(self, system_prompt: str, messages: list[dict], *, include_tools: bool = True) -> dict: + system: Any = system_prompt + if self._config.prompt_caching: + # Add a cache_control breakpoint so the static prefix can be cached. + system = [ + { + "type": "text", + "text": system_prompt, + "cache_control": {"type": "ephemeral"}, + } + ] + body: dict[str, Any] = { "model": self._config.claude_model, "max_tokens": self._config.claude_max_tokens, - "system": system_prompt, + "system": system, "messages": messages, } diff --git a/sdk/typescript/src/think-consumer-runner.ts b/sdk/typescript/src/think-consumer-runner.ts new file mode 100644 index 0000000..67534ef --- /dev/null +++ b/sdk/typescript/src/think-consumer-runner.ts @@ -0,0 +1,859 @@ +import { KafkaConsumer, Producer, Message, TopicPartition } from "@confluentinc/kafka-javascript"; + +export interface ThinkConsumerConfig { + agentName: string; + brokers: string; + claudeApiKey: string; + systemPrompt: string; + tools?: Record[]; + claudeModel?: string; + claudeMaxTokens?: number; + claudeApiUrl?: string; + /** Enable Anthropic prompt caching (cache_control breakpoints). Defaults to false. */ + promptCaching?: boolean; + pollTimeoutMs?: number; + systemPromptBuilder?: (basePrompt: string, context: Record) => string; + llmProvider?: string; + geminiApiKey?: string; + geminiModel?: string; + geminiMaxTokens?: number; + geminiApiUrl?: string; + compactionUserMessageTrigger?: number; + compactionUserMessageUntil?: number; + compactionPrompt?: string; +} + +function deriveThinkNames(agentName: string) { + return { + groupId: `${agentName}-think-consumer`, + input: `${agentName}-enriched-message-input`, + output: `${agentName}-think-request-response`, + dlq: `${agentName}-think-dlq`, + }; +} +// Token pricing from environment variables (per-token, not per-million) +const INPUT_TOKEN_PRICE = process.env.INPUT_TOKEN_PRICE + ? parseFloat(process.env.INPUT_TOKEN_PRICE) + : null; +const OUTPUT_TOKEN_PRICE = process.env.OUTPUT_TOKEN_PRICE + ? parseFloat(process.env.OUTPUT_TOKEN_PRICE) + : null; + +if (INPUT_TOKEN_PRICE == null || OUTPUT_TOKEN_PRICE == null) { + console.warn( + "WARN: INPUT_TOKEN_PRICE and/or OUTPUT_TOKEN_PRICE not set — cost will not be calculated" + ); +} + +export class ThinkConsumerRunner { + private consumer: KafkaConsumer; + private producer: Producer; + private config: ThinkConsumerConfig; + private names: ReturnType; + private running = false; + + constructor(config: ThinkConsumerConfig) { + this.config = { + claudeModel: "claude-haiku-4-5-20251001", + claudeMaxTokens: 4096, + claudeApiUrl: "https://api.anthropic.com/v1/messages", + promptCaching: process.env.PROMPT_CACHING?.toLowerCase() === "true", + tools: [], + llmProvider: "claude", + geminiApiKey: "", + geminiModel: "gemini-2.5-flash", + geminiMaxTokens: 4096, + geminiApiUrl: "https://generativelanguage.googleapis.com/v1beta", + compactionUserMessageTrigger: -1, + compactionUserMessageUntil: 2, + compactionPrompt: + "Summarize the following conversation concisely. " + + "If the conversation starts with a previous summary, incorporate and extend it " + + "rather than re-summarizing it. " + + "Preserve key facts, decisions, user preferences, and any context needed " + + "to continue the conversation naturally. Output only the summary.", + ...config, + }; + this.names = deriveThinkNames(config.agentName); + + const provider = (this.config.llmProvider || "claude").toLowerCase(); + if (provider === "gemini") { + if (!this.config.geminiApiKey) { + throw new Error("geminiApiKey is required when llmProvider='gemini'"); + } + console.log(`Using Gemini LLM provider (model=${this.config.geminiModel})`); + } else { + if (!this.config.claudeApiKey) { + throw new Error("claudeApiKey is required when llmProvider='claude'"); + } + console.log(`Using Claude LLM provider (model=${this.config.claudeModel})`); + } + + this.consumer = new KafkaConsumer( + { + "bootstrap.servers": config.brokers, + "group.id": this.names.groupId, + "auto.offset.reset": "earliest", + "enable.auto.commit": false, + }, + {} + ); + + this.producer = new Producer( + { + "bootstrap.servers": config.brokers, + acks: "all", + "enable.idempotence": true, + }, + {} + ); + } + + async start(): Promise { + await this.connectConsumer(); + await this.connectProducer(); + + this.consumer.subscribe([this.names.input]); + this.running = true; + + console.log(`ThinkConsumerRunner started — listening on [${this.names.input}]`); + + process.on("SIGINT", () => this.stop()); + process.on("SIGTERM", () => this.stop()); + + this.poll(); + } + + stop(): void { + console.log("Shutting down ThinkConsumerRunner..."); + this.running = false; + this.consumer.unsubscribe(); + this.consumer.disconnect(); + this.producer.disconnect(); + } + + private poll(): void { + if (!this.running) return; + + this.consumer.consume( + 1, + async (err: Error | null, messages: Message[]) => { + if (err) { + console.error("Consumer error:", err.message); + setTimeout(() => this.poll(), this.config.pollTimeoutMs ?? 1000); + return; + } + + for (const msg of messages) { + const key = msg.key ? msg.key.toString() : null; + const value = msg.value ? msg.value.toString() : null; + + try { + await this.processRecord(key, value, msg.topic, msg.partition, msg.offset); + } catch (e: unknown) { + const reason = e instanceof Error ? e.message : String(e); + console.error(`Error processing offset ${msg.offset}:`, reason); + await this.emitErrorResponse(key, value, reason, msg.topic, msg.partition, msg.offset); + await this.sendToDlq(key, value, reason, msg.topic, msg.partition, msg.offset); + } + } + + if (this.running) { + setImmediate(() => this.poll()); + } + } + ); + } + + private async processRecord( + key: string | null, + value: string | null, + topic: string, + partition: number, + offset: number + ): Promise { + const context = value ? JSON.parse(value) : {}; + + const sessionId = key || context.sessionId || ""; + const userId = context.userId || ""; + const history: Record[] = context.history || []; + const latestInput: Record = context.latestInput || {}; + const memoirContext: string = context.memoirContext || ""; + const cumulativeCost: number | null = context.cost != null ? Number(context.cost) : null; + + // Check session budget + const budgetStr = process.env.BUDGET_PRICE_PER_SESSION; + if (budgetStr && cumulativeCost != null) { + const budget = parseFloat(budgetStr); + if (cumulativeCost >= budget) { + console.warn( + `[${sessionId}] Session budget exceeded: $${cumulativeCost.toFixed(6)} >= $${budget.toFixed(2)}` + ); + const budgetResponse = { + sessionId, + userId, + cost: null, + prevSessionCost: cumulativeCost, + inputTokens: 0, + outputTokens: 0, + previousMessages: history, + lastInputMessage: latestInput, + lastInputResponse: [ + { + sessionId, + userId, + role: "assistant", + content: `You have used too many tokens. Session budget of $${budget.toFixed(2)} has been reached.`, + timestamp: new Date().toISOString(), + }, + ], + toolUses: [], + endTurn: true, + timestamp: new Date().toISOString(), + }; + await this.produce(this.names.output, key, JSON.stringify(budgetResponse)); + const tp: TopicPartition = { topic, partition, offset: offset + 1 }; + this.consumer.commitAsync([tp], (commitErr: Error | null) => { + if (commitErr) console.error("Commit failed:", commitErr.message); + }); + return; + } + } + + // Compact history if user message count exceeds trigger + let effectiveHistory = history; + let compactedHistory: Record[] | null = null; + const trigger = this.config.compactionUserMessageTrigger ?? -1; + const keepLast = this.config.compactionUserMessageUntil ?? 2; + + if (trigger > 0 && effectiveHistory.length > 0) { + const userMsgCount = effectiveHistory.filter((m) => m.role === "user").length; + if (userMsgCount >= trigger) { + const splitIdx = this.findCompactionSplitIndex(effectiveHistory, keepLast); + if (splitIdx > 0) { + const recentMessages = effectiveHistory.slice(splitIdx); + // Skip compaction if we're in an active tool loop + // (latestInput is a tool result) + const midToolLoop = latestInput && latestInput.role === "tool"; + + if (!midToolLoop) { + console.log( + `[${sessionId}] Compacting history: ${userMsgCount} user messages >= trigger ${trigger}, keeping from index ${splitIdx}` + ); + const oldMessages = effectiveHistory.slice(0, splitIdx); + + const prov = (this.config.llmProvider || "claude").toLowerCase(); + let summaryText: string; + if (prov === "gemini") { + const summaryInput = this.toGeminiMessages(oldMessages, {}); + const summaryResp = await this.callGemini(this.config.compactionPrompt!, summaryInput, false); + summaryText = this.extractGeminiText(summaryResp); + } else { + const summaryInput = this.toClaudeMessages(oldMessages, {}); + const summaryResp = await this.callClaude(this.config.compactionPrompt!, summaryInput, false); + summaryText = this.extractClaudeText(summaryResp); + } + + const summaryMsg: Record = { + sessionId, + userId, + role: "assistant", + content: `[Conversation Summary]\n${summaryText}`, + timestamp: new Date().toISOString(), + }; + compactedHistory = [summaryMsg, ...recentMessages]; + effectiveHistory = compactedHistory; + console.log( + `[${sessionId}] History compacted: ${history.length} messages → ${compactedHistory.length}` + ); + } + } + } + } + + // Build system prompt + const systemPrompt = this.buildSystemPrompt(memoirContext, context); + + const provider = (this.config.llmProvider || "claude").toLowerCase(); + let thinkResponse: Record; + + if (provider === "gemini") { + const geminiMessages = this.toGeminiMessages(effectiveHistory, latestInput); + const response = await this.callGemini(systemPrompt, geminiMessages); + thinkResponse = this.parseGeminiResponse(response, sessionId, userId, latestInput, effectiveHistory); + } else { + const messages = this.toClaudeMessages(effectiveHistory, latestInput); + const response = await this.callClaude(systemPrompt, messages); + thinkResponse = this.parseResponse(response, sessionId, userId, latestInput); + } + + thinkResponse.prevSessionCost = cumulativeCost; + thinkResponse.previousMessages = effectiveHistory; + thinkResponse.lastInputMessage = latestInput; + thinkResponse.lastInputResponse = thinkResponse.messages; + delete thinkResponse.messages; + + // Produce to output topic + await this.produce(this.names.output, key, JSON.stringify(thinkResponse)); + + // Commit offset + const tp: TopicPartition = { topic, partition, offset: offset + 1 }; + this.consumer.commitAsync([tp], (commitErr: Error | null) => { + if (commitErr) console.error("Commit failed:", commitErr.message); + }); + } + + private buildSystemPrompt(memoirContext: string, fullContext: Record): string { + let prompt: string; + + if (this.config.systemPromptBuilder) { + prompt = this.config.systemPromptBuilder(this.config.systemPrompt, fullContext); + } else { + prompt = this.config.systemPrompt; + } + + if (memoirContext) { + prompt += + "\n\nUser memoir (known facts about this user from previous sessions):\n" + + memoirContext + + "\nUse the memoir to personalize your responses."; + } + + return prompt; + } + + private toClaudeMessages( + history: Record[], + latestInput: Record + ): Record[] { + const messages: Record[] = []; + + for (const msg of history) { + const role = (msg.role as string) || "user"; + const content = msg.content; + + if (role === "tool") { + // Convert tool results to user role with tool_result blocks + const toolResults = Array.isArray(content) ? content : [content]; + const blocks: Record[] = []; + for (const result of toolResults) { + if (typeof result === "object" && result !== null && "tool_use_id" in result) { + blocks.push({ + type: "tool_result", + tool_use_id: (result as Record).tool_use_id, + content: (result as Record).content || "", + }); + } else { + blocks.push({ type: "text", text: String(result) }); + } + } + messages.push({ role: "user", content: blocks }); + } else if (role === "assistant") { + if (Array.isArray(content)) { + messages.push({ role: "assistant", content }); + } else { + this.appendOrMerge(messages, "assistant", String(content)); + } + } else { + this.appendOrMerge(messages, "user", String(content)); + } + } + + // Add latest input + if (latestInput && latestInput.content) { + this.appendOrMerge(messages, "user", String(latestInput.content)); + } + + return messages; + } + + private appendOrMerge(messages: Record[], role: string, text: string): void { + const last = messages[messages.length - 1]; + if (last && last.role === role && typeof last.content === "string") { + last.content = last.content + "\n" + text; + } else { + messages.push({ role, content: text }); + } + } + + private async callClaude( + systemPrompt: string, + messages: Record[], + includeTools = true + ): Promise> { + const body: Record = { + model: this.config.claudeModel, + max_tokens: this.config.claudeMaxTokens, + system: this.config.promptCaching + ? [ + { + type: "text", + text: systemPrompt, + cache_control: { type: "ephemeral" }, + }, + ] + : systemPrompt, + messages, + }; + + if (includeTools && this.config.tools && this.config.tools.length > 0) { + body.tools = this.config.tools; + } + + const resp = await fetch(this.config.claudeApiUrl!, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": this.config.claudeApiKey, + "anthropic-version": "2023-06-01", + }, + body: JSON.stringify(body), + signal: AbortSignal.timeout(120000), + }); + + if (!resp.ok) { + const errorBody = await resp.text(); + throw new Error(`Claude API error ${resp.status}: ${errorBody}`); + } + + return (await resp.json()) as Record; + } + + private parseResponse( + response: Record, + sessionId: string, + userId: string, + latestInput: Record + ): Record { + const usage = (response.usage as Record) || {}; + const inputTokens = usage.input_tokens || 0; + const outputTokens = usage.output_tokens || 0; + const cost = + INPUT_TOKEN_PRICE != null && OUTPUT_TOKEN_PRICE != null + ? (inputTokens / 1_000_000) * INPUT_TOKEN_PRICE + (outputTokens / 1_000_000) * OUTPUT_TOKEN_PRICE + : null; + + const stopReason = response.stop_reason as string; + const endTurn = stopReason !== "tool_use"; + + const contentBlocks = (response.content as Record[]) || []; + const messages: Record[] = []; + const toolUses: Record[] = []; + const now = new Date().toISOString(); + + const hasToolUse = contentBlocks.some((b) => b.type === "tool_use"); + + if (hasToolUse) { + messages.push({ + sessionId, + userId, + role: "assistant", + content: contentBlocks, + timestamp: now, + }); + } + + for (const block of contentBlocks) { + if (block.type === "text") { + if (!hasToolUse) { + messages.push({ + sessionId, + userId, + role: "assistant", + content: block.text, + timestamp: now, + }); + } + } else if (block.type === "tool_use") { + toolUses.push({ + toolUseId: block.id, + toolId: block.name, + name: block.name, + input: block.input, + sessionId, + totalTools: contentBlocks.filter((b) => b.type === "tool_use").length, + timestamp: now, + }); + } + } + + return { + sessionId, + userId, + cost: cost != null ? Math.round(cost * 1_000_000) / 1_000_000 : null, + inputTokens, + outputTokens, + messages, + toolUses, + endTurn, + timestamp: now, + }; + } + + // ── Gemini helpers ────────────────────────────────────────────────────── + + private toGeminiMessages( + history: Record[], + latestInput: Record + ): Record[] { + // Build tool_use_id → name mapping from assistant messages + const toolIdToName: Record = {}; + for (const msg of history) { + if (msg.role === "assistant" && Array.isArray(msg.content)) { + for (const block of msg.content as Record[]) { + if (block.type === "tool_use" && block.id && block.name) { + toolIdToName[block.id as string] = block.name as string; + } + } + } + } + + const contents: Record[] = []; + + for (const msg of history) { + const role = (msg.role as string) || "user"; + const content = msg.content; + + if (role === "tool") { + const parts = this.buildFunctionResponseParts(content, toolIdToName); + if (parts.length > 0) { + contents.push({ role: "user", parts }); + } + } else if (role === "assistant") { + if (Array.isArray(content)) { + const parts: Record[] = []; + for (const block of content as Record[]) { + if (block.type === "text") { + const text = block.text as string; + if (text) parts.push({ text }); + } else if (block.type === "tool_use") { + parts.push({ + functionCall: { name: block.name, args: block.input || {} }, + }); + } + } + if (parts.length > 0) { + contents.push({ role: "model", parts }); + } + } else { + this.appendOrMergeGemini(contents, "model", String(content)); + } + } else { + this.appendOrMergeGemini(contents, "user", String(content)); + } + } + + if (latestInput && latestInput.content) { + this.appendOrMergeGemini(contents, "user", String(latestInput.content)); + } + + return contents; + } + + private appendOrMergeGemini( + contents: Record[], + role: string, + text: string + ): void { + const last = contents[contents.length - 1]; + if (last && last.role === role) { + const parts = last.parts as Record[]; + if (parts.length > 0 && "text" in parts[parts.length - 1]) { + parts.push({ text }); + return; + } + } + contents.push({ role, parts: [{ text }] }); + } + + private buildFunctionResponseParts( + content: unknown, + toolIdToName: Record + ): Record[] { + const parts: Record[] = []; + const results = Array.isArray(content) ? content : [content]; + + for (const result of results) { + if (typeof result === "object" && result !== null && "tool_use_id" in result) { + const r = result as Record; + const name = toolIdToName[r.tool_use_id as string] || "unknown"; + let resData = r.result ?? r.content ?? "{}"; + if (typeof resData === "string") { + try { + resData = JSON.parse(resData); + } catch { + resData = { result: resData }; + } + } + parts.push({ functionResponse: { name, response: resData } }); + } + } + return parts; + } + + private async callGemini( + systemPrompt: string, + contents: Record[], + includeTools = true + ): Promise> { + const body: Record = { + system_instruction: { parts: [{ text: systemPrompt }] }, + contents, + generationConfig: { maxOutputTokens: this.config.geminiMaxTokens }, + }; + + if (includeTools && this.config.tools && this.config.tools.length > 0) { + const funcDecls = this.config.tools.map((tool) => ({ + name: tool.name, + description: tool.description || "", + ...(tool.input_schema ? { parameters: tool.input_schema } : {}), + })); + body.tools = [{ function_declarations: funcDecls }]; + } + + const url = `${this.config.geminiApiUrl}/models/${this.config.geminiModel}:generateContent?key=${this.config.geminiApiKey}`; + + const resp = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + signal: AbortSignal.timeout(120000), + }); + + if (!resp.ok) { + const errorBody = await resp.text(); + throw new Error(`Gemini API error ${resp.status}: ${errorBody}`); + } + + return (await resp.json()) as Record; + } + + private parseGeminiResponse( + response: Record, + sessionId: string, + userId: string, + latestInput: Record, + _history: Record[] + ): Record { + const usage = (response.usageMetadata as Record) || {}; + const inputTokens = usage.promptTokenCount || 0; + const outputTokens = usage.candidatesTokenCount || 0; + const cost = + INPUT_TOKEN_PRICE != null && OUTPUT_TOKEN_PRICE != null + ? (inputTokens / 1_000_000) * INPUT_TOKEN_PRICE + (outputTokens / 1_000_000) * OUTPUT_TOKEN_PRICE + : null; + + const candidates = (response.candidates as Record[]) || []; + if (candidates.length === 0) throw new Error("No candidates in Gemini response"); + + const candidate = candidates[0]; + const candidateContent = (candidate.content as Record) || {}; + const parts = (candidateContent.parts as Record[]) || []; + + const hasFunctionCall = parts.some((p) => "functionCall" in p); + const endTurn = !hasFunctionCall; + + const messages: Record[] = []; + const toolUses: Record[] = []; + const now = new Date().toISOString(); + + // Build content blocks in Claude-compatible format + const contentBlocks: Record[] = []; + + for (const part of parts) { + if ("text" in part) { + contentBlocks.push({ type: "text", text: part.text }); + } else if ("functionCall" in part) { + const fc = part.functionCall as Record; + const toolUseId = "toolu_" + crypto.randomUUID().replace(/-/g, "").substring(0, 20); + contentBlocks.push({ + type: "tool_use", + id: toolUseId, + name: fc.name, + input: fc.args || {}, + }); + toolUses.push({ + toolUseId, + toolId: fc.name, + name: fc.name, + input: fc.args || {}, + sessionId, + totalTools: parts.filter((p) => "functionCall" in p).length, + timestamp: now, + }); + } + } + + if (hasFunctionCall) { + messages.push({ sessionId, userId, role: "assistant", content: contentBlocks, timestamp: now }); + } else { + for (const block of contentBlocks) { + if (block.type === "text") { + messages.push({ sessionId, userId, role: "assistant", content: block.text, timestamp: now }); + } + } + } + + return { + sessionId, + userId, + cost: cost != null ? Math.round(cost * 1_000_000) / 1_000_000 : null, + inputTokens, + outputTokens, + messages, + toolUses, + endTurn, + timestamp: now, + }; + } + + private findCompactionSplitIndex(history: Record[], keepLast: number): number { + if (!history || keepLast <= 0) return -1; + const totalUser = history.filter((m) => m.role === "user").length; + if (totalUser <= keepLast) return -1; + const target = totalUser - keepLast; + let seen = 0; + for (let i = 0; i < history.length; i++) { + if (history[i].role === "user") { + seen++; + if (seen > target) return i; + } + } + return -1; + } + + private hasToolUseContent(msg: Record): boolean { + const content = msg.content; + if (Array.isArray(content)) { + return content.some( + (b) => typeof b === "object" && b !== null && (b as Record).type === "tool_use" + ); + } + return false; + } + + private extractClaudeText(response: Record): string { + const blocks = (response.content as Record[]) || []; + return blocks + .filter((b) => b.type === "text") + .map((b) => (b.text as string) || "") + .join("\n"); + } + + private extractGeminiText(response: Record): string { + const candidates = (response.candidates as Record[]) || []; + if (candidates.length === 0) return ""; + const content = (candidates[0].content as Record) || {}; + const parts = (content.parts as Record[]) || []; + return parts + .filter((p) => "text" in p) + .map((p) => (p.text as string) || "") + .join("\n"); + } + + private produce(topic: string, key: string | null, value: string): Promise { + return new Promise((resolve, reject) => { + this.producer.produce( + topic, + null, + Buffer.from(value), + key ?? undefined, + Date.now(), + undefined, + undefined, + (prodErr: Error | null) => { + if (prodErr) reject(prodErr); + else resolve(); + } + ); + this.producer.flush(5000, (flushErr: Error | null) => { + if (flushErr) console.error("Flush error:", flushErr.message); + }); + }); + } + + private async emitErrorResponse( + key: string | null, + value: string | null, + reason: string, + topic: string, + partition: number, + offset: number + ): Promise { + try { + const sessionId = key || ""; + let userId = ""; + if (value) { + try { + const ctx = JSON.parse(value); + userId = ctx.userId || ""; + } catch {} + } + + const now = new Date().toISOString(); + const errorResponse = { + sessionId, + userId, + cost: null, + prevSessionCost: null, + inputTokens: 0, + outputTokens: 0, + messages: [ + { + sessionId, + userId, + role: "assistant", + content: `Sorry, an error occurred while processing your request: ${reason}`, + timestamp: now, + }, + ], + toolUses: [], + endTurn: true, + timestamp: now, + }; + + await this.produce(this.names.output, key, JSON.stringify(errorResponse)); + console.log(`[${sessionId}] Emitted error response to ${this.names.output}`); + } catch (ex) { + console.error("Failed to emit error response:", ex); + } + } + + private async sendToDlq( + key: string | null, + value: string | null, + reason: string, + topic: string, + partition: number, + offset: number + ): Promise { + await this.produce( + this.names.dlq, + key, + JSON.stringify({ originalValue: value, error: reason }) + ); + + const tp: TopicPartition = { topic, partition, offset: offset + 1 }; + this.consumer.commitAsync([tp], (err: Error | null) => { + if (err) console.error("DLQ commit failed:", err.message); + }); + } + + private connectConsumer(): Promise { + return new Promise((resolve, reject) => { + this.consumer.connect({}, (err: Error | null) => { + if (err) reject(err); + else resolve(); + }); + }); + } + + private connectProducer(): Promise { + return new Promise((resolve, reject) => { + this.producer.connect({}, (err: Error | null) => { + if (err) reject(err); + else resolve(); + }); + }); + } +} diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/config/AppConfig.java b/think/think-consumer/src/main/java/io/flightdeck/think/config/AppConfig.java index edb23ec..6dba0d3 100644 --- a/think/think-consumer/src/main/java/io/flightdeck/think/config/AppConfig.java +++ b/think/think-consumer/src/main/java/io/flightdeck/think/config/AppConfig.java @@ -40,6 +40,10 @@ private AppConfig() {} public static final int CLAUDE_MAX_TOKENS = Integer.parseInt(env("CLAUDE_MAX_TOKENS", "4096")); + // Enable Anthropic prompt caching (cache_control breakpoints). Off by default. + public static final boolean PROMPT_CACHING = + Boolean.parseBoolean(env("PROMPT_CACHING", "false")); + // ── Gemini API ────────────────────────────────────────────────────────── public static final String GEMINI_API_KEY = env("GEMINI_API_KEY", ""); diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java index c52d3ba..4db4e39 100644 --- a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java +++ b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java @@ -130,12 +130,8 @@ public ThinkResponse callWithoutTools(String systemPrompt, Map body = new LinkedHashMap<>(); body.put("model", model); body.put("max_tokens", maxTokens); - // Cache the system prompt (see buildRequestBody). No tools to cache here. - body.put("system", List.of(Map.of( - "type", "text", - "text", systemPrompt, - "cache_control", Map.of("type", "ephemeral") - ))); + // Cache the system prompt when enabled (see buildRequestBody). No tools to cache here. + body.put("system", buildSystemBlocks(systemPrompt)); body.put("messages", List.of(Map.of("role", "user", "content", conversationText.toString()))); // No tools @@ -172,14 +168,11 @@ private Map buildRequestBody(String systemPrompt, List body = new LinkedHashMap<>(); body.put("model", model); body.put("max_tokens", maxTokens); - // Send the system prompt as a content block with cache_control so the static - // prefix (tools + system) is cached. A single breakpoint here covers the tools - // below it in the tools -> system -> messages cache order. - body.put("system", List.of(Map.of( - "type", "text", - "text", systemPrompt, - "cache_control", Map.of("type", "ephemeral") - ))); + // Send the system prompt as a content block. When PROMPT_CACHING is enabled a + // cache_control breakpoint is added so the static prefix (tools + system) is cached; + // a single breakpoint here covers the tools below it in the + // tools -> system -> messages cache order. + body.put("system", buildSystemBlocks(systemPrompt)); body.put("messages", messages); List> tools = ToolDefinitions.getTools(); @@ -190,6 +183,24 @@ private Map buildRequestBody(String systemPrompt, List> buildSystemBlocks(String systemPrompt) { + Map block = new LinkedHashMap<>(); + block.put("type", "text"); + block.put("text", systemPrompt); + if (AppConfig.PROMPT_CACHING) { + block.put("cache_control", Map.of("type", "ephemeral")); + } + return List.of(block); + } + /** * Parses the Claude API response JSON into a ThinkResponse. */ From d3ec1216830be71bb28ba200321f57c88ee4d03a Mon Sep 17 00:00:00 2001 From: tsuz <6927131+tsuz@users.noreply.github.com> Date: Mon, 25 May 2026 23:51:37 +0900 Subject: [PATCH 3/4] test: verify PROMPT_CACHING toggles cache_control in think-consumer --- .../think/service/ClaudeApiService.java | 19 +++--- .../think/service/ClaudeApiServiceTest.java | 61 +++++++++++++++++++ 2 files changed, 70 insertions(+), 10 deletions(-) create mode 100644 think/think-consumer/src/test/java/io/flightdeck/think/service/ClaudeApiServiceTest.java diff --git a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java index 4db4e39..30d7acb 100644 --- a/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java +++ b/think/think-consumer/src/main/java/io/flightdeck/think/service/ClaudeApiService.java @@ -131,7 +131,7 @@ public ThinkResponse callWithoutTools(String systemPrompt, body.put("model", model); body.put("max_tokens", maxTokens); // Cache the system prompt when enabled (see buildRequestBody). No tools to cache here. - body.put("system", buildSystemBlocks(systemPrompt)); + body.put("system", buildSystemBlocks(systemPrompt, AppConfig.PROMPT_CACHING)); body.put("messages", List.of(Map.of("role", "user", "content", conversationText.toString()))); // No tools @@ -172,7 +172,7 @@ private Map buildRequestBody(String systemPrompt, List system -> messages cache order. - body.put("system", buildSystemBlocks(systemPrompt)); + body.put("system", buildSystemBlocks(systemPrompt, AppConfig.PROMPT_CACHING)); body.put("messages", messages); List> tools = ToolDefinitions.getTools(); @@ -184,18 +184,17 @@ private Map buildRequestBody(String systemPrompt, List> buildSystemBlocks(String systemPrompt) { + static List> buildSystemBlocks(String systemPrompt, boolean promptCaching) { Map block = new LinkedHashMap<>(); block.put("type", "text"); block.put("text", systemPrompt); - if (AppConfig.PROMPT_CACHING) { + if (promptCaching) { block.put("cache_control", Map.of("type", "ephemeral")); } return List.of(block); diff --git a/think/think-consumer/src/test/java/io/flightdeck/think/service/ClaudeApiServiceTest.java b/think/think-consumer/src/test/java/io/flightdeck/think/service/ClaudeApiServiceTest.java new file mode 100644 index 0000000..455f9ca --- /dev/null +++ b/think/think-consumer/src/test/java/io/flightdeck/think/service/ClaudeApiServiceTest.java @@ -0,0 +1,61 @@ +package io.flightdeck.think.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.*; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.*; + +/** + * Tests for {@link ClaudeApiService#buildSystemBlocks} — verifies the PROMPT_CACHING + * flag controls whether a {@code cache_control} breakpoint is attached to the system prompt. + */ +class ClaudeApiServiceTest { + + private static final String SYSTEM_PROMPT = "You are a helpful assistant."; + + @Test + @DisplayName("PROMPT_CACHING=true attaches an ephemeral cache_control breakpoint") + void cachingEnabledAddsCacheControl() { + List> blocks = ClaudeApiService.buildSystemBlocks(SYSTEM_PROMPT, true); + + assertThat(blocks).hasSize(1); + Map block = blocks.get(0); + assertThat(block) + .containsEntry("type", "text") + .containsEntry("text", SYSTEM_PROMPT) + .containsKey("cache_control"); + assertThat(block.get("cache_control")).isEqualTo(Map.of("type", "ephemeral")); + } + + @Test + @DisplayName("PROMPT_CACHING=false omits cache_control entirely") + void cachingDisabledOmitsCacheControl() { + List> blocks = ClaudeApiService.buildSystemBlocks(SYSTEM_PROMPT, false); + + assertThat(blocks).hasSize(1); + Map block = blocks.get(0); + assertThat(block) + .containsEntry("type", "text") + .containsEntry("text", SYSTEM_PROMPT) + .doesNotContainKey("cache_control"); + } + + @Test + @DisplayName("Serialized JSON includes cache_control when enabled, excludes it when disabled") + void serializedJsonReflectsFlag() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + String enabled = mapper.writeValueAsString( + ClaudeApiService.buildSystemBlocks(SYSTEM_PROMPT, true)); + assertThat(enabled) + .contains("\"cache_control\"") + .contains("\"ephemeral\""); + + String disabled = mapper.writeValueAsString( + ClaudeApiService.buildSystemBlocks(SYSTEM_PROMPT, false)); + assertThat(disabled).doesNotContain("cache_control"); + } +} From 3388e018e5e64a47f275a3394d9083b9691c46e3 Mon Sep 17 00:00:00 2001 From: tsuz <6927131+tsuz@users.noreply.github.com> Date: Mon, 25 May 2026 23:57:11 +0900 Subject: [PATCH 4/4] ading architecgtur --- architecture/models.md | 664 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 664 insertions(+) create mode 100644 architecture/models.md diff --git a/architecture/models.md b/architecture/models.md new file mode 100644 index 0000000..51f0acf --- /dev/null +++ b/architecture/models.md @@ -0,0 +1,664 @@ +# Kafka Topic Schemas + +All topics are prefixed with the `AGENT_NAME` environment variable: `{AGENT_NAME}-{topic-name}` + +## Message Flow + +``` +message-input + | + | [join with think-request-response KTable, + | session-cost KTable, memoir-context KTable] + | +enriched-message-input + | + | [ThinkConsumer calls Claude/Gemini] + | [compaction runs here if triggered] + | +think-request-response + | + |--- [has tool_uses] --> tool-use (fanout: 1 message per tool) + | | + | [tool execution] + | | + | tool-use-result + | | + | tool-use-all-complete (when all results arrive) + | | + | message-input (re-enter pipeline as role="tool") + | + |--- [end_turn=true, no tools] --> message-output + | + |--- session-cost (aggregate token costs) + | + |--- [on inactivity] --> session-end + | + memoir-context-session-end + | + memoir-context (updated long-term memory) +``` + +--- + +## Environment Variables + +### Processing Service + +| Variable | Default | Description | +|----------|---------|-------------| +| `AGENT_NAME` | *(required)* | Unique name for the agent instance. Prefixes all topic names. | +| `KAFKA_BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka broker address | +| `MEMOIR_ENABLED` | `true` | Enable per-user long-term memory across sessions | +| `MEMOIR_SESSION_INACTIVITY_THRESHOLD_SECONDS` | `20` | Seconds of inactivity before a session ends | +| `MEMOIR_SESSION_PUNCTUATE_INTERVAL_SECONDS` | `5` | How often to check for inactive sessions | + +### Think Consumer Service + +| Variable | Default | Description | +|----------|---------|-------------| +| `AGENT_NAME` | *(required)* | Unique name for the agent instance | +| `KAFKA_BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka broker address | +| `KAFKA_CONSUMER_GROUP` | `think-consumer-group` | Kafka consumer group ID | +| `LLM_PROVIDER` | `claude` | LLM provider: `claude` or `gemini` | +| `CLAUDE_API_KEY` | `""` | Anthropic API key | +| `CLAUDE_API_URL` | `https://api.anthropic.com/v1/messages` | Claude API endpoint | +| `CLAUDE_MODEL` | `claude-haiku-4-5-20251001` | Claude model to use | +| `CLAUDE_MAX_TOKENS` | `4096` | Max tokens per Claude response | +| `PROMPT_CACHING` | `false` | Enable Anthropic prompt caching (adds a `cache_control` breakpoint to the system prompt). Only takes effect when the cached prefix exceeds the model's minimum cacheable length (e.g. 4,096 tokens for Haiku 4.5, 1,024 for Sonnet). | +| `GEMINI_API_KEY` | `""` | Google Gemini API key | +| `GEMINI_API_URL` | `https://generativelanguage.googleapis.com/v1beta` | Gemini API endpoint | +| `GEMINI_MODEL` | `gemini-2.5-flash` | Gemini model to use | +| `GEMINI_MAX_TOKENS` | `4096` | Max tokens per Gemini response | +| `SYSTEM_PROMPT_FILE` | `""` | Path to system prompt text file | +| `TOOLS_JSON_FILE` | `""` | Path to tool definitions JSON file | +| `INPUT_TOKEN_PRICE` | *(optional)* | Price per 1M input tokens (e.g. `3` for $3/MTok) | +| `OUTPUT_TOKEN_PRICE` | *(optional)* | Price per 1M output tokens (e.g. `15` for $15/MTok) | +| `BUDGET_PRICE_PER_SESSION` | *(optional)* | Max dollar cost per session. Agent stops on next Think call when exceeded. | +| `COMPACTION_USER_MESSAGE_TRIGGER` | `-1` (disabled) | Number of user messages before compaction runs. Must be >= 2 or -1. | +| `COMPACTION_USER_MESSAGE_UNTIL` | `2` | Number of recent user messages to keep uncompacted | +| `COMPACTION_PROMPT` | `"Summarize the following conversation concisely..."` | Prompt used for the compaction summary LLM call | +| `POLL_TIMEOUT_MS` | `1000` | Kafka consumer poll timeout in milliseconds | + +--- + +## Topics + +### message-input + +Raw user messages and tool results entering the pipeline. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Unique session identifier | +| `user_id` | String | | User identifier | +| `role` | String | | `"user"`, `"assistant"`, or `"tool"` | +| `content` | Object | | Plain text (String) or structured data (List/Map for tool results) | +| `timestamp` | String | | ISO 8601 timestamp | +| `metadata` | Map\ | `null` | Optional metadata | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "What's the invoice balance for customer 456?", + "timestamp": "2025-04-04T10:30:00Z", + "metadata": { + "source": "web_chat" + } +} +``` + +When re-entering the pipeline from tool execution (role="tool"): + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "role": "tool", + "content": [ + { + "tool_use_id": "tuid-abc123", + "name": "get_invoice_balance", + "result": { "balance_due": 1250.50 }, + "status": "success" + } + ], + "timestamp": "2025-04-04T10:30:06Z", + "metadata": { + "tool_count": 1, + "source": "tool-execution" + } +} +``` + +--- + +### enriched-message-input + +User message merged with full session history, cost, and memoir context. Consumed by the ThinkConsumer. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Unique session identifier | +| `user_id` | String | | User identifier | +| `cost` | Double | `null` | Aggregated session cost (USD) | +| `history` | List\ | | Full chronological conversation history | +| `latest_input` | MessageInput | | The most recent user message | +| `memoir_context` | String | `null` | Long-term memoir summary (null if memoir disabled) | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "cost": 0.0005, + "history": [ + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "Hello", + "timestamp": "2025-04-04T10:28:00Z" + }, + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "assistant", + "content": "Hi! How can I help you today?", + "timestamp": "2025-04-04T10:28:02Z" + } + ], + "latest_input": { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "What's the invoice balance for customer 456?", + "timestamp": "2025-04-04T10:30:00Z" + }, + "memoir_context": "User frequently asks about invoice balances. Works in accounts receivable.", + "timestamp": "2025-04-04T10:30:00Z" +} +``` + +--- + +### think-request-response + +LLM response including tool-use blocks. Also stored as a KTable for enrichment and memoir joins. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Unique session identifier | +| `user_id` | String | | User identifier | +| `total_session_cost` | Double | `null` | Total session cost: `previous_session_cost + think_cost + compaction_cost` | +| `previous_session_cost` | Double | `null` | Cumulative cost from prior turns | +| `think_cost` | Double | `null` | Cost of this LLM call's input and output tokens (USD) | +| `think_input_tokens` | int | `0` | Tokens consumed by the request | +| `think_output_tokens` | int | `0` | Tokens generated by the response | +| `previous_messages` | List\ | `[]` | Messages before this turn (may be compacted) | +| `last_input_message` | MessageInput | | The user message that triggered this turn | +| `last_input_response` | List\ | `[]` | The LLM's response messages | +| `tool_uses` | List\ | `null` | Tool invocation blocks (null/empty if no tools) | +| `end_turn` | boolean | `false` | true if LLM signaled end-of-turn | +| `compaction` | boolean | `false` | true if previous_messages were compacted this turn | +| `compaction_input_tokens` | int | `0` | Tokens used by the compaction summary LLM call | +| `compaction_output_tokens` | int | `0` | Tokens generated by the compaction summary | +| `compaction_cost` | double | `0.0` | Cost of the compaction LLM call | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "total_session_cost": 0.0017, + "previous_session_cost": 0.0005, + "think_cost": 0.0012, + "think_input_tokens": 256, + "think_output_tokens": 128, + "previous_messages": [ + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "Hello", + "timestamp": "2025-04-04T10:28:00Z" + }, + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "assistant", + "content": "Hi! How can I help you today?", + "timestamp": "2025-04-04T10:28:02Z" + } + ], + "last_input_message": { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "What's the invoice balance for customer 456?", + "timestamp": "2025-04-04T10:30:00Z" + }, + "last_input_response": [ + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "assistant", + "content": "I'll check the invoice balance for you.", + "timestamp": "2025-04-04T10:30:05Z" + } + ], + "tool_uses": [ + { + "tool_use_id": "tuid-abc123", + "tool_id": "billing-api", + "name": "get_invoice_balance", + "input": { "customer_id": "456" }, + "session_id": "sess-12345", + "total_tools": 1, + "timestamp": "2025-04-04T10:30:05Z" + } + ], + "end_turn": false, + "compaction": false, + "compaction_input_tokens": 0, + "compaction_output_tokens": 0, + "compaction_cost": 0.0, + "timestamp": "2025-04-04T10:30:05Z" +} +``` + +Example with compaction applied: + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "total_session_cost": 0.0071, + "previous_session_cost": 0.0050, + "think_cost": 0.0018, + "think_input_tokens": 180, + "think_output_tokens": 64, + "previous_messages": [ + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "assistant", + "content": "Summary: User asked about invoices for customers 100-456. Balances were retrieved and shared.", + "timestamp": "2025-04-04T10:35:00Z" + }, + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "Now check customer 789", + "timestamp": "2025-04-04T10:36:00Z" + }, + { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "assistant", + "content": "The balance for customer 789 is $500.00.", + "timestamp": "2025-04-04T10:36:02Z" + } + ], + "last_input_message": { + "session_id": "sess-12345", + "user_id": "user-789", + "role": "user", + "content": "And customer 999?", + "timestamp": "2025-04-04T10:37:00Z" + }, + "last_input_response": [ + { + "role": "assistant", + "content": "The balance for customer 999 is $320.00." + } + ], + "tool_uses": null, + "end_turn": true, + "compaction": true, + "compaction_input_tokens": 512, + "compaction_output_tokens": 48, + "compaction_cost": 0.0003, + "timestamp": "2025-04-04T10:37:02Z" +} +``` + +--- + +### tool-use + +One message per individual tool invocation, fanned out from the LLM response by ExtractToolUseItemsProcessor. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `tool_use_id` | String | | Unique identifier for this tool call | +| `tool_id` | String | | Identifier for the tool definition | +| `name` | String | | Tool function name | +| `input` | Map\ | | Tool function parameters | +| `session_id` | String | | Session identifier (message key) | +| `total_tools` | int | | Total parallel tool calls in this batch | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "tool_use_id": "tuid-abc123", + "tool_id": "billing-api", + "name": "get_invoice_balance", + "input": { + "customer_id": "456" + }, + "session_id": "sess-12345", + "total_tools": 1, + "timestamp": "2025-04-04T10:30:05Z" +} +``` + +--- + +### tool-use-dlq + +Dead letter queue for tool-use items that failed validation (missing `tool_use_id` or `name`). + +**Key:** `session_id` + +Same schema as **tool-use**, but fields may be null/blank. + +```json +{ + "tool_use_id": null, + "tool_id": "billing-api", + "name": null, + "input": { "customer_id": "456" }, + "session_id": "sess-12345", + "total_tools": 1, + "timestamp": "2025-04-04T10:30:05Z" +} +``` + +--- + +### tool-use-result + +Results returned from tool execution. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Session identifier (message key) | +| `tool_use_id` | String | | Correlates with the original tool-use request | +| `name` | String | | Tool function name | +| `result` | Map\ | | The tool's return value | +| `latency_ms` | long | `0` | Execution time in milliseconds | +| `status` | String | | `"success"` or `"error"` | +| `total_tools` | int | | Total parallel tool calls expected | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "tool_use_id": "tuid-abc123", + "name": "get_invoice_balance", + "result": { + "customer_id": "456", + "balance_due": 1250.50, + "currency": "USD", + "due_date": "2025-04-15" + }, + "latency_ms": 145, + "status": "success", + "total_tools": 1, + "timestamp": "2025-04-04T10:30:06Z" +} +``` + +--- + +### tool-use-all-complete + +Emitted once all expected tool results for a session have arrived. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Session identifier | +| `user_id` | String | `null` | User identifier | +| `expected_count` | int | `0` | How many tool results were expected | +| `results` | List\ | `[]` | All accumulated tool results | +| `emitted` | boolean | `false` | Guard flag to prevent duplicate emissions | +| `timestamp` | String | `null` | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "expected_count": 1, + "results": [ + { + "session_id": "sess-12345", + "tool_use_id": "tuid-abc123", + "name": "get_invoice_balance", + "result": { + "customer_id": "456", + "balance_due": 1250.50, + "currency": "USD", + "due_date": "2025-04-15" + }, + "latency_ms": 145, + "status": "success", + "total_tools": 1, + "timestamp": "2025-04-04T10:30:06Z" + } + ], + "emitted": true, + "timestamp": "2025-04-04T10:30:06Z" +} +``` + +--- + +### session-cost + +Aggregated cost per session. Compacted topic (KTable). A tombstone (null value) is emitted when the session closes (cost = -1.0 sentinel). + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Session identifier (message key) | +| `user_id` | String | `null` | User identifier | +| `llm_calls` | int | `0` | Total LLM invocations in session | +| `total_input_tokens` | int | `0` | Cumulative input tokens | +| `total_output_tokens` | int | `0` | Cumulative output tokens | +| `estimated_cost_usd` | Double | `null` | Total estimated cost in USD | +| `timestamp` | String | `null` | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "llm_calls": 2, + "total_input_tokens": 640, + "total_output_tokens": 224, + "estimated_cost_usd": 0.0032, + "timestamp": "2025-04-04T10:30:07Z" +} +``` + +--- + +### tool-use-latency + +Per-tool execution latency metrics. Keyed by tool name. Topic and model are defined but the processor is not yet implemented. + +**Key:** `tool_name` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `tool_name` | String | | Tool function name (message key) | +| `latency_ms` | long | `0` | Execution latency in milliseconds | +| `session_id` | String | | Session identifier | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "tool_name": "get_invoice_balance", + "latency_ms": 145, + "session_id": "sess-12345", + "timestamp": "2025-04-04T10:30:06Z" +} +``` + +--- + +### session-end + +Emitted when a session has been inactive for the configured threshold. Only active when `MEMOIR_ENABLED=true`. + +**Key:** `session_id` + +| Field | Type | Description | +|-------|------|-------------| +| key | String | session_id | +| value | String | Empty or null | + +--- + +### memoir-context + +Long-term user memoir/summary. Compacted topic keyed by user_id. Only active when `MEMOIR_ENABLED=true`. + +**Key:** `user_id` + +| Field | Type | Description | +|-------|------|-------------| +| key | String | user_id | +| value | String | Serialized memoir/summary text | + +``` +key: "user-789" +value: "User frequently asks about invoice balances. Prefers concise answers. Works in accounts receivable." +``` + +--- + +### memoir-context-session-end + +Joined snapshot emitted on session end: previous memoir + last LLM response. Only active when `MEMOIR_ENABLED=true`. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Session identifier | +| `user_id` | String | | User identifier | +| `memoir_context` | String | `null` | Previous memoir/summary | +| `think_response` | ThinkResponse | | Last LLM response (contains full conversation) | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "memoir_context": "User frequently asks about invoice balances.", + "think_response": { + "session_id": "sess-12345", + "user_id": "user-789", + "total_session_cost": 0.0017, + "previous_session_cost": 0.0005, + "think_cost": 0.0012, + "think_input_tokens": 256, + "think_output_tokens": 128, + "previous_messages": [], + "last_input_message": { + "role": "user", + "content": "What's the invoice balance for customer 456?" + }, + "last_input_response": [ + { + "role": "assistant", + "content": "The balance for customer 456 is $1,250.50, due April 15." + } + ], + "tool_uses": null, + "end_turn": true, + "compaction": false, + "compaction_input_tokens": 0, + "compaction_output_tokens": 0, + "compaction_cost": 0.0, + "timestamp": "2025-04-04T10:30:07Z" + }, + "timestamp": "2025-04-04T10:35:00Z" +} +``` + +--- + +### message-output + +Final response sent back to the user-facing layer. + +**Key:** `session_id` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `session_id` | String | | Session identifier | +| `user_id` | String | | User identifier | +| `content` | String | | The response text | +| `llm_calls` | int | `0` | Number of LLM calls in this turn | +| `input_tokens` | int | `0` | Input tokens for this call | +| `output_tokens` | int | `0` | Output tokens for this call | +| `cost` | Double | `null` | Total session cost (USD) | +| `source_agent` | String | | Which agent produced this response | +| `timestamp` | String | | ISO 8601 timestamp | + +```json +{ + "session_id": "sess-12345", + "user_id": "user-789", + "content": "The current balance due for customer 456 is $1,250.50, due on April 15, 2025.", + "llm_calls": 2, + "input_tokens": 384, + "output_tokens": 96, + "cost": 0.0032, + "source_agent": "billing-agent", + "timestamp": "2025-04-04T10:30:07Z" +} +``` + +--- + +## State Stores + +| Store | Keyed By | Purpose | +|-------|----------|---------| +| `think-response-store` | session_id | Caches think-request-response KTable | +| `session-cost-table-store` | session_id | Caches session-cost KTable | +| `session-cost-store` | session_id | Aggregates cost per session (processor-local) | +| `tool-result-accumulator-store` | session_id | Accumulates tool results until all arrive | +| `session-last-seen-store` | session_id | Tracks last activity timestamp for inactivity detection | +| `memoir-context-store` | user_id | Caches memoir-context KTable (memoir only) | + +## Processors + +| Processor | Consumes | Produces | +|-----------|----------|----------| +| EnrichInputMessageProcessor | `message-input` + KTable joins | `enriched-message-input` | +| ExtractToolUseItemsProcessor | `think-request-response` | `tool-use`, `tool-use-dlq` | +| AggregateToolExecutionResultProcessor | `tool-use-result` | `tool-use-all-complete` | +| TransformToolUseDoneProcessor | `tool-use-all-complete` | `message-input` | +| SessionCostAggregationProcessor | `think-request-response` | `session-cost` | +| EndTurnProcessor | `think-request-response` | `message-output` | +| SessionEndProcessor | `think-request-response` | `session-end` | +| MemoirSessionEndProcessor | `session-end` + KTable joins | `memoir-context-session-end` |