diff --git a/.env.example b/.env.example index 4d2795d6..3dd69150 100644 --- a/.env.example +++ b/.env.example @@ -99,6 +99,9 @@ EXECUTION_MONITOR_ENABLED= EXECUTION_MONITOR_SAME_TOOL_LIMIT= EXECUTION_MONITOR_TOTAL_TOOL_LIMIT= +## Evidence receipt hash chain +EVIDENCE_RECEIPTS_ENABLED=false + ## Agent execution tool calls limit MAX_GENERAL_AGENT_TOOL_CALLS= MAX_LIMITED_AGENT_TOOL_CALLS= diff --git a/backend/docs/config.md b/backend/docs/config.md index 26b2c342..de0e0840 100644 --- a/backend/docs/config.md +++ b/backend/docs/config.md @@ -105,14 +105,15 @@ This function automatically loads environment variables from a `.env` file if pr These settings control basic application behavior and are foundational for the system's operation. -| Option | Environment Variable | Default Value | Description | -| -------------- | -------------------- | ---------------------------------------------------------------------------- | ------------------------------------------------------------------------ | -| DatabaseURL | `DATABASE_URL` | `postgres://pentagiuser:pentagipass@pgvector:5432/pentagidb?sslmode=disable` | Connection string for the PostgreSQL database with pgvector extension | -| Debug | `DEBUG` | `false` | Enables debug mode with additional logging | -| DataDir | `DATA_DIR` | `./data` | Directory for storing persistent data | -| AskUser | `ASK_USER` | `false` | When enabled, requires explicit user confirmation for certain operations | -| InstallationID | `INSTALLATION_ID` | *(none)* | Unique installation identifier for PentAGI Cloud API communication | -| LicenseKey | `LICENSE_KEY` | *(none)* | License key for PentAGI Cloud API authentication and feature activation | +| Option | Environment Variable | Default Value | Description | +| ------------------------ | ----------------------------- | ---------------------------------------------------------------------------- | ------------------------------------------------------------------------ | +| DatabaseURL | `DATABASE_URL` | `postgres://pentagiuser:pentagipass@pgvector:5432/pentagidb?sslmode=disable` | Connection string for the PostgreSQL database with pgvector extension | +| Debug | `DEBUG` | `false` | Enables debug mode with additional logging | +| DataDir | `DATA_DIR` | `./data` | Directory for storing persistent data | +| AskUser | `ASK_USER` | `false` | When enabled, requires explicit user confirmation for certain operations | +| EvidenceReceiptsEnabled | `EVIDENCE_RECEIPTS_ENABLED` | `false` | Enables export-only toolcall evidence receipts | +| InstallationID | `INSTALLATION_ID` | *(none)* | Unique installation identifier for PentAGI Cloud API communication | +| LicenseKey | `LICENSE_KEY` | *(none)* | License key for PentAGI Cloud API authentication and feature activation | ### Usage Details @@ -1749,6 +1750,10 @@ The supervision settings work together as a comprehensive system: ``` Disabled supervision for debugging to observe natural agent behavior. +## Evidence Receipt Settings + +When `EVIDENCE_RECEIPTS_ENABLED=true`, PentAGI writes hash-chain-only JSONL receipts for finished and failed tool calls to `/flow-/evidence/receipts.jsonl`. Receipts include toolcall provenance metadata plus hashes of arguments and results, not raw argument or result content. Ed25519 signing and report bundle export are deferred to a later evidence-chain milestone. + ## Observability Settings These settings control the observability and monitoring capabilities, including telemetry and trace collection for system performance and debugging. diff --git a/backend/pkg/config/config.go b/backend/pkg/config/config.go index 0f88c97c..469bc228 100644 --- a/backend/pkg/config/config.go +++ b/backend/pkg/config/config.go @@ -22,6 +22,9 @@ type Config struct { DataDir string `env:"DATA_DIR" envDefault:"./data"` AskUser bool `env:"ASK_USER" envDefault:"false"` + // === Evidence Receipt Prototype === + EvidenceReceiptsEnabled bool `env:"EVIDENCE_RECEIPTS_ENABLED" envDefault:"false"` + // === PentAGI Cloud Service Integration === InstallationID string `env:"INSTALLATION_ID"` LicenseKey string `env:"LICENSE_KEY"` diff --git a/backend/pkg/config/config_test.go b/backend/pkg/config/config_test.go index 10f7c787..97dcee0d 100644 --- a/backend/pkg/config/config_test.go +++ b/backend/pkg/config/config_test.go @@ -269,7 +269,7 @@ func clearConfigEnv(t *testing.T) { t.Helper() envVars := []string{ - "DATABASE_URL", "DEBUG", "DATA_DIR", "ASK_USER", "INSTALLATION_ID", "LICENSE_KEY", + "DATABASE_URL", "DEBUG", "DATA_DIR", "ASK_USER", "EVIDENCE_RECEIPTS_ENABLED", "INSTALLATION_ID", "LICENSE_KEY", "DOCKER_INSIDE", "DOCKER_NET_ADMIN", "DOCKER_SOCKET", "DOCKER_NETWORK", "DOCKER_PUBLIC_IP", "DOCKER_WORK_DIR", "DOCKER_DEFAULT_IMAGE", "DOCKER_DEFAULT_IMAGE_FOR_PENTEST", "SERVER_PORT", "SERVER_HOST", "SERVER_USE_SSL", "SERVER_SSL_KEY", "SERVER_SSL_CRT", @@ -331,6 +331,7 @@ func TestNewConfig_Defaults(t *testing.T) { assert.Equal(t, "0.0.0.0", config.ServerHost) assert.Equal(t, false, config.Debug) assert.Equal(t, "./data", config.DataDir) + assert.Equal(t, false, config.EvidenceReceiptsEnabled) assert.Equal(t, false, config.ServerUseSSL) assert.Equal(t, "openai", config.EmbeddingProvider) assert.Equal(t, 512, config.EmbeddingBatchSize) @@ -511,6 +512,20 @@ func TestNewConfig_CorsOrigins(t *testing.T) { assert.Equal(t, []string{"*"}, config.CorsOrigins) } +func TestNewConfig_EvidenceReceipts(t *testing.T) { + clearConfigEnv(t) + t.Chdir(t.TempDir()) + + config, err := NewConfig() + require.NoError(t, err) + assert.Equal(t, false, config.EvidenceReceiptsEnabled) + + t.Setenv("EVIDENCE_RECEIPTS_ENABLED", "true") + config, err = NewConfig() + require.NoError(t, err) + assert.Equal(t, true, config.EvidenceReceiptsEnabled) +} + func TestNewConfig_OllamaDefaults(t *testing.T) { clearConfigEnv(t) t.Chdir(t.TempDir()) diff --git a/backend/pkg/tools/evidence_receipts.go b/backend/pkg/tools/evidence_receipts.go new file mode 100644 index 00000000..145ba4a9 --- /dev/null +++ b/backend/pkg/tools/evidence_receipts.go @@ -0,0 +1,316 @@ +package tools + +import ( + "bufio" + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/google/uuid" +) + +const ( + evidenceReceiptSchema = "pentagi.evidence_receipt" + evidenceReceiptVersion = 1 + evidenceReceiptFile = "receipts.jsonl" + + evidenceReceiptStatusFinished = "finished" + evidenceReceiptStatusFailed = "failed" +) + +type evidenceReceiptRecorder interface { + RecordFinished(context.Context, evidenceReceiptEvent) error + RecordFailed(context.Context, evidenceReceiptEvent) error +} + +type evidenceReceiptEvent struct { + FlowID int64 + TaskID *int64 + SubtaskID *int64 + ToolcallID int64 + CallID string + ToolName string + Args json.RawMessage + Result string +} + +type evidenceReceipt struct { + Schema string `json:"schema"` + Version int `json:"version"` + ReceiptID string `json:"receipt_id"` + ReceiptHash string `json:"receipt_hash"` + PreviousReceiptHash string `json:"previous_receipt_hash"` + FlowID int64 `json:"flow_id"` + TaskID *int64 `json:"task_id,omitempty"` + SubtaskID *int64 `json:"subtask_id,omitempty"` + ToolcallID int64 `json:"toolcall_id"` + CallID string `json:"call_id"` + ToolName string `json:"tool_name"` + Status string `json:"status"` + ArgsHash string `json:"args_hash"` + ResultHash string `json:"result_hash"` + CreatedAt time.Time `json:"created_at"` +} + +type evidenceReceiptPayload struct { + Schema string `json:"schema"` + Version int `json:"version"` + ReceiptID string `json:"receipt_id"` + PreviousReceiptHash string `json:"previous_receipt_hash"` + FlowID int64 `json:"flow_id"` + TaskID *int64 `json:"task_id,omitempty"` + SubtaskID *int64 `json:"subtask_id,omitempty"` + ToolcallID int64 `json:"toolcall_id"` + CallID string `json:"call_id"` + ToolName string `json:"tool_name"` + Status string `json:"status"` + ArgsHash string `json:"args_hash"` + ResultHash string `json:"result_hash"` + CreatedAt time.Time `json:"created_at"` +} + +type noopEvidenceReceiptRecorder struct{} + +func (noopEvidenceReceiptRecorder) RecordFinished(context.Context, evidenceReceiptEvent) error { + return nil +} + +func (noopEvidenceReceiptRecorder) RecordFailed(context.Context, evidenceReceiptEvent) error { + return nil +} + +type fileEvidenceReceiptRecorder struct { + dataDir string + flowID int64 + now func() time.Time + newID func() string +} + +var evidenceReceiptLocks sync.Map + +func newEvidenceReceiptRecorder(dataDir string, flowID int64, enabled bool) evidenceReceiptRecorder { + if !enabled { + return noopEvidenceReceiptRecorder{} + } + + return &fileEvidenceReceiptRecorder{ + dataDir: dataDir, + flowID: flowID, + now: time.Now, + newID: newEvidenceReceiptID, + } +} + +func newEvidenceReceiptID() string { + return "receipt_" + uuid.NewString() +} + +func (r *fileEvidenceReceiptRecorder) RecordFinished(ctx context.Context, event evidenceReceiptEvent) error { + return r.record(ctx, event, evidenceReceiptStatusFinished) +} + +func (r *fileEvidenceReceiptRecorder) RecordFailed(ctx context.Context, event evidenceReceiptEvent) error { + return r.record(ctx, event, evidenceReceiptStatusFailed) +} + +func (r *fileEvidenceReceiptRecorder) record(ctx context.Context, event evidenceReceiptEvent, status string) error { + if err := ctx.Err(); err != nil { + return err + } + + event.FlowID = r.flowID + path, err := evidenceReceiptsPath(r.dataDir, event.FlowID) + if err != nil { + return err + } + + lock := evidenceReceiptPathLock(path) + lock.Lock() + defer lock.Unlock() + + previousHash, err := readLastEvidenceReceiptHash(path) + if err != nil { + return err + } + + argsHash, err := hashCanonicalJSON(event.Args) + if err != nil { + return fmt.Errorf("failed to hash evidence receipt arguments: %w", err) + } + + receipt := evidenceReceipt{ + Schema: evidenceReceiptSchema, + Version: evidenceReceiptVersion, + ReceiptID: r.newID(), + PreviousReceiptHash: previousHash, + FlowID: event.FlowID, + TaskID: event.TaskID, + SubtaskID: event.SubtaskID, + ToolcallID: event.ToolcallID, + CallID: event.CallID, + ToolName: event.ToolName, + Status: status, + ArgsHash: argsHash, + ResultHash: hashBytes([]byte(event.Result)), + CreatedAt: r.now().UTC(), + } + + receiptHash, err := computeEvidenceReceiptHash(receipt) + if err != nil { + return err + } + receipt.ReceiptHash = receiptHash + + line, err := json.Marshal(receipt) + if err != nil { + return fmt.Errorf("failed to marshal evidence receipt: %w", err) + } + + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return fmt.Errorf("failed to prepare evidence receipt directory: %w", err) + } + + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open evidence receipt file: %w", err) + } + defer file.Close() + + if _, err := file.Write(append(line, '\n')); err != nil { + return fmt.Errorf("failed to write evidence receipt: %w", err) + } + + return nil +} + +func evidenceReceiptsPath(dataDir string, flowID int64) (string, error) { + baseDir, err := filepath.Abs(dataDir) + if err != nil { + return "", fmt.Errorf("failed to resolve evidence receipt data directory: %w", err) + } + + path := filepath.Join(baseDir, fmt.Sprintf("flow-%d", flowID), "evidence", evidenceReceiptFile) + cleanPath := filepath.Clean(path) + rel, err := filepath.Rel(baseDir, cleanPath) + if err != nil { + return "", fmt.Errorf("failed to validate evidence receipt path: %w", err) + } + if rel == "." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) || rel == ".." || filepath.IsAbs(rel) { + return "", fmt.Errorf("evidence receipt path escapes data directory") + } + + return cleanPath, nil +} + +func evidenceReceiptPathLock(path string) *sync.Mutex { + lock, _ := evidenceReceiptLocks.LoadOrStore(path, &sync.Mutex{}) + return lock.(*sync.Mutex) +} + +func readLastEvidenceReceiptHash(path string) (string, error) { + file, err := os.Open(path) + if os.IsNotExist(err) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to read evidence receipt file: %w", err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + previousHash := "" + lineNumber := 0 + for scanner.Scan() { + line := bytes.TrimSpace(scanner.Bytes()) + if len(line) == 0 { + continue + } + lineNumber++ + + var receipt evidenceReceipt + if err := json.Unmarshal(line, &receipt); err != nil { + return "", fmt.Errorf("failed to parse evidence receipt line %d: %w", lineNumber, err) + } + if receipt.Schema != evidenceReceiptSchema || receipt.Version != evidenceReceiptVersion { + return "", fmt.Errorf("unsupported evidence receipt schema or version on line %d", lineNumber) + } + if receipt.PreviousReceiptHash != previousHash { + return "", fmt.Errorf("evidence receipt chain mismatch on line %d", lineNumber) + } + + hash, err := computeEvidenceReceiptHash(receipt) + if err != nil { + return "", fmt.Errorf("failed to hash evidence receipt line %d: %w", lineNumber, err) + } + if receipt.ReceiptHash != hash { + return "", fmt.Errorf("evidence receipt hash mismatch on line %d", lineNumber) + } + + previousHash = receipt.ReceiptHash + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("failed to scan evidence receipt file: %w", err) + } + + return previousHash, nil +} + +func computeEvidenceReceiptHash(receipt evidenceReceipt) (string, error) { + payload, err := json.Marshal(evidenceReceiptPayload{ + Schema: receipt.Schema, + Version: receipt.Version, + ReceiptID: receipt.ReceiptID, + PreviousReceiptHash: receipt.PreviousReceiptHash, + FlowID: receipt.FlowID, + TaskID: receipt.TaskID, + SubtaskID: receipt.SubtaskID, + ToolcallID: receipt.ToolcallID, + CallID: receipt.CallID, + ToolName: receipt.ToolName, + Status: receipt.Status, + ArgsHash: receipt.ArgsHash, + ResultHash: receipt.ResultHash, + CreatedAt: receipt.CreatedAt, + }) + if err != nil { + return "", fmt.Errorf("failed to marshal evidence receipt payload: %w", err) + } + + return hashBytes(payload), nil +} + +func hashCanonicalJSON(raw json.RawMessage) (string, error) { + decoder := json.NewDecoder(bytes.NewReader(raw)) + decoder.UseNumber() + + var value any + if err := decoder.Decode(&value); err != nil { + return "", err + } + if err := decoder.Decode(&struct{}{}); err != io.EOF { + return "", fmt.Errorf("unexpected extra JSON data") + } + + canonical, err := json.Marshal(value) + if err != nil { + return "", err + } + + return hashBytes(canonical), nil +} + +func hashBytes(data []byte) string { + sum := sha256.Sum256(data) + return "sha256:" + hex.EncodeToString(sum[:]) +} diff --git a/backend/pkg/tools/evidence_receipts_test.go b/backend/pkg/tools/evidence_receipts_test.go new file mode 100644 index 00000000..ee47eba0 --- /dev/null +++ b/backend/pkg/tools/evidence_receipts_test.go @@ -0,0 +1,249 @@ +package tools + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestEvidenceReceiptHashDeterministic(t *testing.T) { + t.Parallel() + + argsHashA, err := hashCanonicalJSON(json.RawMessage(`{"b":2,"a":{"d":4,"c":3}}`)) + if err != nil { + t.Fatalf("hashCanonicalJSON() unexpected error: %v", err) + } + argsHashB, err := hashCanonicalJSON(json.RawMessage(`{"a":{"c":3,"d":4},"b":2}`)) + if err != nil { + t.Fatalf("hashCanonicalJSON() unexpected error: %v", err) + } + if argsHashA != argsHashB { + t.Fatalf("canonical argument hashes differ: %s != %s", argsHashA, argsHashB) + } + + receipt := testEvidenceReceipt(argsHashA, hashBytes([]byte("result"))) + hashA, err := computeEvidenceReceiptHash(receipt) + if err != nil { + t.Fatalf("computeEvidenceReceiptHash() unexpected error: %v", err) + } + hashB, err := computeEvidenceReceiptHash(receipt) + if err != nil { + t.Fatalf("computeEvidenceReceiptHash() unexpected error: %v", err) + } + if hashA != hashB { + t.Fatalf("receipt hash should be deterministic: %s != %s", hashA, hashB) + } +} + +func TestEvidenceReceiptHashChangesWithContentHashes(t *testing.T) { + t.Parallel() + + receipt := testEvidenceReceipt(hashBytes([]byte("args")), hashBytes([]byte("result"))) + baseHash, err := computeEvidenceReceiptHash(receipt) + if err != nil { + t.Fatalf("computeEvidenceReceiptHash() unexpected error: %v", err) + } + + receipt.ArgsHash = hashBytes([]byte("different args")) + argsHash, err := computeEvidenceReceiptHash(receipt) + if err != nil { + t.Fatalf("computeEvidenceReceiptHash() unexpected error: %v", err) + } + if argsHash == baseHash { + t.Fatal("receipt hash should change when args_hash changes") + } + + receipt = testEvidenceReceipt(hashBytes([]byte("args")), hashBytes([]byte("different result"))) + resultHash, err := computeEvidenceReceiptHash(receipt) + if err != nil { + t.Fatalf("computeEvidenceReceiptHash() unexpected error: %v", err) + } + if resultHash == baseHash { + t.Fatal("receipt hash should change when result_hash changes") + } +} + +func TestNoopEvidenceReceiptRecorder(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + recorder := newEvidenceReceiptRecorder(dir, 10, false) + err := recorder.RecordFinished(t.Context(), testEvidenceReceiptEvent()) + if err != nil { + t.Fatalf("RecordFinished() unexpected error: %v", err) + } + + path, err := evidenceReceiptsPath(dir, 10) + if err != nil { + t.Fatalf("evidenceReceiptsPath() unexpected error: %v", err) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatalf("disabled recorder should not create %s, stat err: %v", path, err) + } +} + +func TestFileEvidenceReceiptRecorderWritesJSONLAndLinksReceipts(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + recorder := newTestEvidenceReceiptRecorder(dir, 42) + + if err := recorder.RecordFinished(t.Context(), testEvidenceReceiptEvent()); err != nil { + t.Fatalf("RecordFinished() unexpected error: %v", err) + } + if err := recorder.RecordFinished(t.Context(), evidenceReceiptEvent{ + FlowID: 42, + ToolcallID: 101, + CallID: "call-101", + ToolName: "terminal", + Args: json.RawMessage(`{"cmd":"whoami"}`), + Result: "root", + }); err != nil { + t.Fatalf("RecordFinished() second receipt unexpected error: %v", err) + } + + path, err := evidenceReceiptsPath(dir, 42) + if err != nil { + t.Fatalf("evidenceReceiptsPath() unexpected error: %v", err) + } + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to read evidence receipts: %v", err) + } + if strings.Contains(string(data), "sensitive output") { + t.Fatal("receipt file should not store raw result content") + } + receipts := readEvidenceReceiptLines(t, path) + if len(receipts) != 2 { + t.Fatalf("got %d receipts, want 2", len(receipts)) + } + + if receipts[0].Status != evidenceReceiptStatusFinished { + t.Fatalf("first receipt status = %q, want %q", receipts[0].Status, evidenceReceiptStatusFinished) + } + if receipts[0].PreviousReceiptHash != "" { + t.Fatalf("first receipt previous hash = %q, want empty", receipts[0].PreviousReceiptHash) + } + if receipts[1].PreviousReceiptHash != receipts[0].ReceiptHash { + t.Fatalf("second receipt previous hash = %q, want %q", receipts[1].PreviousReceiptHash, receipts[0].ReceiptHash) + } +} + +func TestFileEvidenceReceiptRecorderRecordsFailedStatus(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + recorder := newTestEvidenceReceiptRecorder(dir, 77) + + if err := recorder.RecordFailed(t.Context(), testEvidenceReceiptEvent()); err != nil { + t.Fatalf("RecordFailed() unexpected error: %v", err) + } + + path, err := evidenceReceiptsPath(dir, 77) + if err != nil { + t.Fatalf("evidenceReceiptsPath() unexpected error: %v", err) + } + receipts := readEvidenceReceiptLines(t, path) + if len(receipts) != 1 { + t.Fatalf("got %d receipts, want 1", len(receipts)) + } + if receipts[0].Status != evidenceReceiptStatusFailed { + t.Fatalf("receipt status = %q, want %q", receipts[0].Status, evidenceReceiptStatusFailed) + } +} + +func TestFileEvidenceReceiptRecorderFailsClosedOnUnreadablePreviousReceipt(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + path, err := evidenceReceiptsPath(dir, 88) + if err != nil { + t.Fatalf("evidenceReceiptsPath() unexpected error: %v", err) + } + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + t.Fatalf("failed to create evidence dir: %v", err) + } + if err := os.WriteFile(path, []byte(`{"schema":"wrong"}`+"\n"), 0644); err != nil { + t.Fatalf("failed to seed corrupt receipt: %v", err) + } + + recorder := newTestEvidenceReceiptRecorder(dir, 88) + err = recorder.RecordFinished(t.Context(), testEvidenceReceiptEvent()) + if err == nil { + t.Fatal("RecordFinished() should fail when previous receipt cannot be verified") + } +} + +func testEvidenceReceipt(argsHash, resultHash string) evidenceReceipt { + return evidenceReceipt{ + Schema: evidenceReceiptSchema, + Version: evidenceReceiptVersion, + ReceiptID: "receipt_test", + PreviousReceiptHash: "sha256:previous", + FlowID: 42, + ToolcallID: 100, + CallID: "call-100", + ToolName: "terminal", + Status: evidenceReceiptStatusFinished, + ArgsHash: argsHash, + ResultHash: resultHash, + CreatedAt: time.Date(2026, 4, 22, 12, 0, 0, 0, time.UTC), + } +} + +func testEvidenceReceiptEvent() evidenceReceiptEvent { + taskID := int64(20) + subtaskID := int64(30) + + return evidenceReceiptEvent{ + FlowID: 10, + TaskID: &taskID, + SubtaskID: &subtaskID, + ToolcallID: 100, + CallID: "call-100", + ToolName: "terminal", + Args: json.RawMessage(`{"cmd":"id"}`), + Result: "sensitive output", + } +} + +func newTestEvidenceReceiptRecorder(dir string, flowID int64) *fileEvidenceReceiptRecorder { + var count int + + return &fileEvidenceReceiptRecorder{ + dataDir: dir, + flowID: flowID, + now: func() time.Time { + count++ + return time.Date(2026, 4, 22, 12, 0, count, 0, time.UTC) + }, + newID: func() string { + return fmt.Sprintf("receipt_%03d", count+1) + }, + } +} + +func readEvidenceReceiptLines(t *testing.T, path string) []evidenceReceipt { + t.Helper() + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("failed to read evidence receipts: %v", err) + } + + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + receipts := make([]evidenceReceipt, 0, len(lines)) + for _, line := range lines { + var receipt evidenceReceipt + if err := json.Unmarshal([]byte(line), &receipt); err != nil { + t.Fatalf("failed to parse receipt line %q: %v", line, err) + } + receipts = append(receipts, receipt) + } + + return receipts +} diff --git a/backend/pkg/tools/executor.go b/backend/pkg/tools/executor.go index c885b401..fa6b1652 100644 --- a/backend/pkg/tools/executor.go +++ b/backend/pkg/tools/executor.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "slices" "strings" @@ -134,10 +135,11 @@ type customExecutor struct { taskID *int64 subtaskID *int64 - db database.Querier - mlp MsgLogProvider - store *pgvector.Store - vslp VectorStoreLogProvider + db database.Querier + mlp MsgLogProvider + store *pgvector.Store + vslp VectorStoreLogProvider + evidence evidenceReceiptRecorder definitions []llms.FunctionDefinition handlers map[string]ExecutorHandler @@ -307,12 +309,21 @@ func (ce *customExecutor) Execute( result, err := handler(ctx, name, args) if err != nil { durationDelta := time.Since(startTime).Seconds() - _, _ = ce.db.UpdateToolcallFailedResult(ctx, database.UpdateToolcallFailedResultParams{ - Result: fmt.Sprintf("failed to execute handler: %s", err.Error()), + failureResult := fmt.Sprintf("failed to execute handler: %s", err.Error()) + updatedToolcall, updateErr := ce.db.UpdateToolcallFailedResult(ctx, database.UpdateToolcallFailedResultParams{ + Result: failureResult, DurationSeconds: durationDelta, ID: tc.ID, }) - return "", resultFormat, fmt.Errorf("failed to execute handler: %w", err) + if updateErr != nil { + return "", resultFormat, fmt.Errorf("failed to update failed toolcall result: %w", updateErr) + } + + handlerErr := fmt.Errorf("failed to execute handler: %w", err) + if err := ce.recordEvidenceReceipt(ctx, updatedToolcall, evidenceReceiptStatusFailed, failureResult); err != nil { + return "", resultFormat, fmt.Errorf("failed to record evidence receipt: %w", errors.Join(handlerErr, err)) + } + return "", resultFormat, handlerErr } result = database.SanitizeUTF8(result) @@ -325,12 +336,21 @@ func (ce *customExecutor) Execute( result, err = ce.summarizer(ctx, summarizePrompt) if err != nil { durationDelta := time.Since(startTime).Seconds() - _, _ = ce.db.UpdateToolcallFailedResult(ctx, database.UpdateToolcallFailedResultParams{ - Result: fmt.Sprintf("failed to summarize result: %s", err.Error()), + failureResult := fmt.Sprintf("failed to summarize result: %s", err.Error()) + updatedToolcall, updateErr := ce.db.UpdateToolcallFailedResult(ctx, database.UpdateToolcallFailedResultParams{ + Result: failureResult, DurationSeconds: durationDelta, ID: tc.ID, }) - return "", resultFormat, fmt.Errorf("failed to summarize result: %w", err) + if updateErr != nil { + return "", resultFormat, fmt.Errorf("failed to update failed toolcall result: %w", updateErr) + } + + summarizeErr := fmt.Errorf("failed to summarize result: %w", err) + if err := ce.recordEvidenceReceipt(ctx, updatedToolcall, evidenceReceiptStatusFailed, failureResult); err != nil { + return "", resultFormat, fmt.Errorf("failed to record evidence receipt: %w", errors.Join(summarizeErr, err)) + } + return "", resultFormat, summarizeErr } resultFormat = database.MsglogResultFormatMarkdown } else if allowSummarize && len(result) > DefaultResultSizeLimit*2 { @@ -344,7 +364,7 @@ func (ce *customExecutor) Execute( } durationDelta := time.Since(startTime).Seconds() - _, err = ce.db.UpdateToolcallFinishedResult(ctx, database.UpdateToolcallFinishedResultParams{ + updatedToolcall, err := ce.db.UpdateToolcallFinishedResult(ctx, database.UpdateToolcallFinishedResultParams{ Result: result, DurationSeconds: durationDelta, ID: tc.ID, @@ -352,6 +372,9 @@ func (ce *customExecutor) Execute( if err != nil { return "", resultFormat, fmt.Errorf("failed to update toolcall result: %w", err) } + if err := ce.recordEvidenceReceipt(ctx, updatedToolcall, evidenceReceiptStatusFinished, result); err != nil { + return "", resultFormat, fmt.Errorf("failed to record evidence receipt: %w", err) + } return result, resultFormat, nil } @@ -385,6 +408,49 @@ func (ce *customExecutor) Execute( return result, nil } +func (ce *customExecutor) recordEvidenceReceipt( + ctx context.Context, + tc database.Toolcall, + status string, + result string, +) error { + if ce.evidence == nil { + return nil + } + + var taskID *int64 + if tc.TaskID.Valid { + value := tc.TaskID.Int64 + taskID = &value + } + + var subtaskID *int64 + if tc.SubtaskID.Valid { + value := tc.SubtaskID.Int64 + subtaskID = &value + } + + event := evidenceReceiptEvent{ + FlowID: tc.FlowID, + TaskID: taskID, + SubtaskID: subtaskID, + ToolcallID: tc.ID, + CallID: tc.CallID, + ToolName: tc.Name, + Args: tc.Args, + Result: result, + } + + switch status { + case evidenceReceiptStatusFinished: + return ce.evidence.RecordFinished(ctx, event) + case evidenceReceiptStatusFailed: + return ce.evidence.RecordFailed(ctx, event) + default: + return fmt.Errorf("unknown evidence receipt status %q", status) + } +} + func (ce *customExecutor) IsBarrierFunction(name string) bool { _, ok := ce.barriers[name] return ok diff --git a/backend/pkg/tools/tools.go b/backend/pkg/tools/tools.go index 531e296c..87b1a561 100644 --- a/backend/pkg/tools/tools.go +++ b/backend/pkg/tools/tools.go @@ -142,6 +142,7 @@ type flowToolsExecutor struct { db database.Querier cfg *config.Config + evidence evidenceReceiptRecorder store *pgvector.Store graphitiClient *graphiti.Client image string @@ -329,6 +330,7 @@ func NewFlowToolsExecutor( functions: functions, replacer: replacer, cfg: cfg, + evidence: newEvidenceReceiptRecorder(cfg.DataDir, flowID, cfg.EvidenceReceiptsEnabled), flowID: flowID, definitions: make(map[string]llms.FunctionDefinition), handlers: make(map[string]ExecutorHandler), @@ -337,6 +339,7 @@ func NewFlowToolsExecutor( func (fte *flowToolsExecutor) SetFlowID(flowID int64) { fte.flowID = flowID + fte.evidence = newEvidenceReceiptRecorder(fte.cfg.DataDir, flowID, fte.cfg.EvidenceReceiptsEnabled) } func (fte *flowToolsExecutor) SetImage(image string) { @@ -485,6 +488,7 @@ func (fte *flowToolsExecutor) GetCustomExecutor(cfg CustomExecutorConfig) (Conte vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: cfg.Definitions, handlers: cfg.Handlers, barriers: barriers, @@ -690,6 +694,7 @@ func (fte *flowToolsExecutor) GetAssistantExecutor(cfg AssistantExecutorConfig) vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: definitions, handlers: handlers, barriers: map[string]struct{}{}, @@ -736,6 +741,7 @@ func (fte *flowToolsExecutor) GetPrimaryExecutor(cfg PrimaryExecutorConfig) (Con vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[FinalyToolName], registryDefinitions[AdviceToolName], @@ -809,6 +815,7 @@ func (fte *flowToolsExecutor) GetInstallerExecutor(cfg InstallerExecutorConfig) vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[MaintenanceResultToolName], registryDefinitions[AdviceToolName], @@ -892,6 +899,7 @@ func (fte *flowToolsExecutor) GetCoderExecutor(cfg CoderExecutorConfig) (Context vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[CodeResultToolName], registryDefinitions[AdviceToolName], @@ -1003,6 +1011,7 @@ func (fte *flowToolsExecutor) GetPentesterExecutor(cfg PentesterExecutorConfig) vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[HackResultToolName], registryDefinitions[AdviceToolName], @@ -1101,6 +1110,7 @@ func (fte *flowToolsExecutor) GetSearcherExecutor(cfg SearcherExecutorConfig) (C vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[SearchResultToolName], registryDefinitions[MemoristToolName], @@ -1259,12 +1269,13 @@ func (fte *flowToolsExecutor) GetGeneratorExecutor(cfg GeneratorExecutorConfig) ) ce := &customExecutor{ - flowID: fte.flowID, - taskID: &cfg.TaskID, - mlp: fte.mlp, - vslp: fte.vslp, - db: fte.db, - store: fte.store, + flowID: fte.flowID, + taskID: &cfg.TaskID, + mlp: fte.mlp, + vslp: fte.vslp, + db: fte.db, + store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[MemoristToolName], registryDefinitions[SearchToolName], @@ -1324,12 +1335,13 @@ func (fte *flowToolsExecutor) GetRefinerExecutor(cfg RefinerExecutorConfig) (Con ) ce := &customExecutor{ - flowID: fte.flowID, - taskID: &cfg.TaskID, - mlp: fte.mlp, - vslp: fte.vslp, - db: fte.db, - store: fte.store, + flowID: fte.flowID, + taskID: &cfg.TaskID, + mlp: fte.mlp, + vslp: fte.vslp, + db: fte.db, + store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[MemoristToolName], registryDefinitions[SearchToolName], @@ -1392,6 +1404,7 @@ func (fte *flowToolsExecutor) GetMemoristExecutor(cfg MemoristExecutorConfig) (C vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[MemoristResultToolName], registryDefinitions[TerminalToolName], @@ -1460,6 +1473,7 @@ func (fte *flowToolsExecutor) GetEnricherExecutor(cfg EnricherExecutorConfig) (C vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{ registryDefinitions[EnricherResultToolName], registryDefinitions[TerminalToolName], @@ -1527,6 +1541,7 @@ func (fte *flowToolsExecutor) GetReporterExecutor(cfg ReporterExecutorConfig) (C vslp: fte.vslp, db: fte.db, store: fte.store, + evidence: fte.evidence, definitions: []llms.FunctionDefinition{registryDefinitions[ReportResultToolName]}, handlers: map[string]ExecutorHandler{ReportResultToolName: cfg.ReportResult}, barriers: map[string]struct{}{ReportResultToolName: {}}, diff --git a/docker-compose.yml b/docker-compose.yml index 6a78a7ef..e672d440 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -105,6 +105,7 @@ services: - EXECUTION_MONITOR_ENABLED=${EXECUTION_MONITOR_ENABLED:-} - EXECUTION_MONITOR_SAME_TOOL_LIMIT=${EXECUTION_MONITOR_SAME_TOOL_LIMIT:-} - EXECUTION_MONITOR_TOTAL_TOOL_LIMIT=${EXECUTION_MONITOR_TOTAL_TOOL_LIMIT:-} + - EVIDENCE_RECEIPTS_ENABLED=${EVIDENCE_RECEIPTS_ENABLED:-} - MAX_GENERAL_AGENT_TOOL_CALLS=${MAX_GENERAL_AGENT_TOOL_CALLS:-} - MAX_LIMITED_AGENT_TOOL_CALLS=${MAX_LIMITED_AGENT_TOOL_CALLS:-} - AGENT_PLANNING_STEP_ENABLED=${AGENT_PLANNING_STEP_ENABLED:-}