From de4102c745f0220f8b4e0f0bc23b08a0556674f0 Mon Sep 17 00:00:00 2001 From: Damjan Becirovic Date: Wed, 12 Nov 2025 11:29:43 +0100 Subject: [PATCH 1/4] feat(mcp_server): add metrics for tools usage --- mcp_server/cmd/mcp_server/main.go | 16 ++- .../pkg/tools/internal/shared/metrics.go | 114 ++++++++++++++++++ .../pkg/tools/internal/shared/metrics_test.go | 46 +++++++ mcp_server/pkg/tools/jobs/describe.go | 21 ++++ mcp_server/pkg/tools/jobs/logs.go | 37 +++++- .../pkg/tools/organizations/organizations.go | 22 ++++ mcp_server/pkg/tools/pipelines/pipelines.go | 41 +++++++ mcp_server/pkg/tools/projects/projects.go | 42 +++++++ mcp_server/pkg/tools/workflows/workflows.go | 21 ++++ mcp_server/pkg/watchman/configuration.go | 20 +++ 10 files changed, 374 insertions(+), 6 deletions(-) create mode 100644 mcp_server/pkg/tools/internal/shared/metrics.go create mode 100644 mcp_server/pkg/tools/internal/shared/metrics_test.go create mode 100644 mcp_server/pkg/watchman/configuration.go diff --git a/mcp_server/cmd/mcp_server/main.go b/mcp_server/cmd/mcp_server/main.go index af63e6aca..78b61f9e7 100644 --- a/mcp_server/cmd/mcp_server/main.go +++ b/mcp_server/cmd/mcp_server/main.go @@ -24,17 +24,25 @@ import ( "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/pipelines" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/projects" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/workflows" + "github.com/semaphoreio/semaphore/mcp_server/pkg/watchman" support "github.com/semaphoreio/semaphore/mcp_server/test/support" ) var ( - versionFlag = flag.Bool("version", false, "print the server version and exit") - nameFlag = flag.String("name", "semaphore-mcp-server", "implementation name advertised to MCP clients") - httpAddr = flag.String("http", ":3001", "address to serve the streamable MCP transport") - version = "0.1.0" + versionFlag = flag.Bool("version", false, "print the server version and exit") + nameFlag = flag.String("name", "semaphore-mcp-server", "implementation name advertised to MCP clients") + httpAddr = flag.String("http", ":3001", "address to serve the streamable MCP transport") + version = "0.1.0" + metricsNamespace = os.Getenv("METRICS_NAMESPACE") +) + +const ( + metricService = "mcp-server" ) func main() { + watchman.Configure(fmt.Sprintf("%s.%s", metricService, metricsNamespace)) + flag.Parse() if *versionFlag { diff --git a/mcp_server/pkg/tools/internal/shared/metrics.go b/mcp_server/pkg/tools/internal/shared/metrics.go new file mode 100644 index 000000000..176a20b0d --- /dev/null +++ b/mcp_server/pkg/tools/internal/shared/metrics.go @@ -0,0 +1,114 @@ +package shared + +import ( + "strings" + "time" + + watchman "github.com/renderedtext/go-watchman" + + "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" +) + +// ToolMetrics emits Watchman metrics for a specific tool invocation. +type ToolMetrics struct { + base string + tags []string +} + +// NewToolMetrics prepares a metrics emitter scoped to a tool and optional organization ID. +func NewToolMetrics(baseMetricName, toolName, orgID string) *ToolMetrics { + base := strings.TrimSpace(baseMetricName) + if base == "" { + return nil + } + + tags := make([]string, 0, 3) + if tag := sanitizeMetricTag("tool_" + strings.TrimSpace(toolName)); tag != "" { + tags = append(tags, tag) + } + + if normalizedOrg := strings.TrimSpace(strings.ToLower(orgID)); normalizedOrg != "" { + if tag := sanitizeMetricTag("org_" + normalizedOrg); tag != "" { + tags = append(tags, tag) + } + } + + if len(tags) == 0 { + tags = append(tags, "tool_unknown") + } + + if len(tags) > 3 { + tags = tags[:3] + } + + return &ToolMetrics{ + base: base, + tags: tags, + } +} + +// IncrementTotal bumps the total execution counter. +func (tm *ToolMetrics) IncrementTotal() { + tm.increment("executions_total") +} + +// IncrementSuccess bumps the successful execution counter. +func (tm *ToolMetrics) IncrementSuccess() { + tm.increment("executions_succeeded") +} + +// IncrementFailure bumps the failed execution counter. +func (tm *ToolMetrics) IncrementFailure() { + tm.increment("executions_failed") +} + +// TrackDuration submits the elapsed duration since start. +func (tm *ToolMetrics) TrackDuration(start time.Time) { + if tm == nil { + return + } + + name := tm.metricName("duration_ms") + if err := watchman.BenchmarkWithTags(start, name, tm.tags); err != nil { + logMetricError(name, err) + } +} + +func (tm *ToolMetrics) increment(suffix string) { + if tm == nil { + return + } + name := tm.metricName(suffix) + if err := watchman.IncrementWithTags(name, tm.tags); err != nil { + logMetricError(name, err) + } +} + +func (tm *ToolMetrics) metricName(suffix string) string { + if tm == nil { + return suffix + } + if suffix == "" { + return tm.base + } + return tm.base + "." + suffix +} + +func sanitizeMetricTag(value string) string { + value = strings.TrimSpace(strings.ToLower(value)) + if value == "" { + return "" + } + value = strings.ReplaceAll(value, " ", "_") + return value +} + +func logMetricError(metric string, err error) { + if err == nil { + return + } + logging.ForComponent("metrics"). + WithError(err). + WithField("metric", metric). + Debug("failed to submit Watchman metric") +} diff --git a/mcp_server/pkg/tools/internal/shared/metrics_test.go b/mcp_server/pkg/tools/internal/shared/metrics_test.go new file mode 100644 index 000000000..e955b323e --- /dev/null +++ b/mcp_server/pkg/tools/internal/shared/metrics_test.go @@ -0,0 +1,46 @@ +package shared + +import "testing" + +func TestNewToolMetricsWithOrg(t *testing.T) { + t.Parallel() + + tm := NewToolMetrics("tools.test_tool", "Test Tool", "ABC-123 ") + if tm == nil { + t.Fatalf("expected metrics emitter") + } + + if got, want := len(tm.tags), 2; got != want { + t.Fatalf("expected %d tags, got %d", want, got) + } + if tm.tags[0] != "tool_test_tool" { + t.Fatalf("expected tool tag to be %q, got %q", "tool_test_tool", tm.tags[0]) + } + if tm.tags[1] != "org_abc-123" { + t.Fatalf("expected org tag to be %q, got %q", "org_abc-123", tm.tags[1]) + } +} + +func TestNewToolMetricsWithoutOrg(t *testing.T) { + t.Parallel() + + tm := NewToolMetrics("tools.test_tool", "Test Tool", "") + if tm == nil { + t.Fatalf("expected metrics emitter") + } + + if got := tm.tags[0]; got != "tool_test_tool" { + t.Fatalf("expected tool tag %q, got %q", "tool_test_tool", got) + } + if len(tm.tags) != 1 { + t.Fatalf("expected only tool tag, got %d tags", len(tm.tags)) + } +} + +func TestNewToolMetricsWithoutBase(t *testing.T) { + t.Parallel() + + if tm := NewToolMetrics("", "any", ""); tm != nil { + t.Fatalf("expected nil metrics when base metric name is empty") + } +} diff --git a/mcp_server/pkg/tools/jobs/describe.go b/mcp_server/pkg/tools/jobs/describe.go index d3f63d540..7213cb071 100644 --- a/mcp_server/pkg/tools/jobs/describe.go +++ b/mcp_server/pkg/tools/jobs/describe.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -15,6 +16,7 @@ import ( const ( describeToolName = "jobs_describe" + describeMetricBase = "tools.jobs_describe" projectViewPermission = "project.view" ) @@ -86,6 +88,24 @@ func describeHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(describeMetricBase, describeToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -170,6 +190,7 @@ Troubleshooting: markdown := formatJobMarkdown(summary, mode) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/jobs/logs.go b/mcp_server/pkg/tools/jobs/logs.go index dbbed5396..c23eb20ec 100644 --- a/mcp_server/pkg/tools/jobs/logs.go +++ b/mcp_server/pkg/tools/jobs/logs.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -21,6 +22,7 @@ import ( const ( logsToolName = "jobs_logs" + logsMetricBase = "tools.jobs_logs" loghubSource = "loghub" loghub2Source = "loghub2" loghub2TokenDuration = 300 @@ -106,6 +108,24 @@ func logsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(logsMetricBase, logsToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -174,10 +194,23 @@ Troubleshooting: return shared.ProjectAuthorizationError(err, orgID, jobProjectID, projectViewPermission), nil } + var ( + result *mcp.CallToolResult + callErr error + ) + if job.GetSelfHosted() { - return fetchSelfHostedLogs(ctx, api, jobID) + result, callErr = fetchSelfHostedLogs(ctx, api, jobID) + } else { + result, callErr = fetchHostedLogs(ctx, api, jobID, startingLine) + } + if callErr != nil { + return result, callErr + } + if result != nil && !result.IsError { + success = true } - return fetchHostedLogs(ctx, api, jobID, startingLine) + return result, nil } } diff --git a/mcp_server/pkg/tools/organizations/organizations.go b/mcp_server/pkg/tools/organizations/organizations.go index 18a78ce5f..62b5b3983 100644 --- a/mcp_server/pkg/tools/organizations/organizations.go +++ b/mcp_server/pkg/tools/organizations/organizations.go @@ -23,6 +23,7 @@ import ( const ( listToolName = "organizations_list" + listMetricBase = "tools.organizations_list" defaultPageSize = 20 maxPageSize = 100 ) @@ -136,6 +137,24 @@ type organizationDetails struct { func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + metrics := shared.NewToolMetrics(listMetricBase, listToolName, "") + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + client := api.Organizations() if client == nil { return mcp.NewToolResultError(`Organization gRPC endpoint is not configured. @@ -238,6 +257,7 @@ The RBAC service confirms which organizations the authenticated user can access. markdown := formatOrganizationsMarkdown(result.Organizations, mode, "") markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{mcp.NewTextContent(markdown)}, StructuredContent: result, @@ -285,6 +305,7 @@ The organization service could not describe the permitted organizations. Retry i markdown := formatOrganizationsMarkdown(result.Organizations, mode, "") markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{mcp.NewTextContent(markdown)}, StructuredContent: result, @@ -332,6 +353,7 @@ The organization service could not describe the permitted organizations. Retry i markdown := formatOrganizationsMarkdown(orgs, mode, result.NextCursor) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/pipelines/pipelines.go b/mcp_server/pkg/tools/pipelines/pipelines.go index 23d24789b..b2d25c317 100644 --- a/mcp_server/pkg/tools/pipelines/pipelines.go +++ b/mcp_server/pkg/tools/pipelines/pipelines.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -20,7 +21,9 @@ import ( const ( listToolName = "pipelines_list" + listMetricBase = "tools.pipelines_list" jobsToolName = "pipeline_jobs" + jobsMetricBase = "tools.pipeline_jobs" defaultLimit = 20 maxLimit = 100 errNoClient = "pipeline gRPC endpoint is not configured" @@ -241,6 +244,24 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(listMetricBase, listToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -433,6 +454,7 @@ Check that: markdown := formatPipelineListMarkdown(result, mode, workflowID, projectID, orgID, limit) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -458,6 +480,24 @@ func jobsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(jobsMetricBase, jobsToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -595,6 +635,7 @@ Troubleshooting: markdown := formatPipelineJobsMarkdown(result, mode, orgID) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/projects/projects.go b/mcp_server/pkg/tools/projects/projects.go index 260bea927..e06dd1105 100644 --- a/mcp_server/pkg/tools/projects/projects.go +++ b/mcp_server/pkg/tools/projects/projects.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "strings" + "time" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -22,7 +23,9 @@ import ( const ( listToolName = "projects_list" + listMetricBase = "tools.projects_list" searchToolName = "projects_search" + searchMetricBase = "tools.projects_search" defaultListLimit = 25 maxListLimit = 200 defaultSearchLimit = 20 @@ -470,6 +473,24 @@ You can discover organizations by calling organizations_list first.`), nil Example: projects_list(organization_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")`, err)), nil } + metrics := shared.NewToolMetrics(listMetricBase, listToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -585,6 +606,7 @@ Try removing optional filters or verifying access permissions.`, err)), nil markdown := formatProjectListMarkdown(result, mode, orgID) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -612,6 +634,24 @@ Check INTERNAL_API_URL_PROJECT or MCP_PROJECT_GRPC_ENDPOINT and ensure ProjectHu return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(searchMetricBase, searchToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -799,6 +839,7 @@ Ensure you have permission to list projects in organization %s.`, err, orgID)), markdown := formatProjectSearchMarkdown(result, mode, orgID, queryDisplay, repoDisplay, limit, maxPages) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -837,6 +878,7 @@ Ensure you have permission to list projects in organization %s.`, err, orgID)), markdown := formatProjectSearchMarkdown(result, mode, orgID, queryDisplay, repoDisplay, limit, maxPages) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/workflows/workflows.go b/mcp_server/pkg/tools/workflows/workflows.go index 0216bf3a8..6b0ce0b6e 100644 --- a/mcp_server/pkg/tools/workflows/workflows.go +++ b/mcp_server/pkg/tools/workflows/workflows.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -20,6 +21,7 @@ import ( const ( searchToolName = "workflows_search" + searchMetricBase = "tools.workflows_search" defaultLimit = 20 maxLimit = 100 missingWorkflowError = "workflow gRPC endpoint is not configured" @@ -157,6 +159,24 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } + metrics := shared.NewToolMetrics(searchMetricBase, searchToolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + start := time.Now() + success := false + defer func() { + if metrics == nil { + return + } + metrics.TrackDuration(start) + if success { + metrics.IncrementSuccess() + } else { + metrics.IncrementFailure() + } + }() + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil } @@ -342,6 +362,7 @@ Double-check that: markdown := formatWorkflowsMarkdown(result, mode, projectID, orgID, branch, requesterFilter, myWorkflowsOnly, userID, limit) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + success = true return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/watchman/configuration.go b/mcp_server/pkg/watchman/configuration.go new file mode 100644 index 000000000..d3e2d3332 --- /dev/null +++ b/mcp_server/pkg/watchman/configuration.go @@ -0,0 +1,20 @@ +// Package watchman holds a helper function for configuring watchman's client. +package watchman + +import ( + "log" + "time" + + "github.com/renderedtext/go-watchman" +) + +func Configure(metricNamespace string) { + // Give some time for the statsd sidecar to start up + time.Sleep(3000 * time.Millisecond) + + err := watchman.Configure("0.0.0.0", "8125", metricNamespace) + + if err != nil { + log.Printf("(err) Failed to configure watchman") + } +} From b753a261a1f2f55887b13fdc75586b0d059c0cbd Mon Sep 17 00:00:00 2001 From: Damjan Becirovic Date: Wed, 12 Nov 2025 11:38:05 +0100 Subject: [PATCH 2/4] feat(mcp_server): adjust metric names --- .../pkg/tools/internal/shared/metrics.go | 26 ++++++----------- .../pkg/tools/internal/shared/metrics_test.go | 28 +++++++++---------- mcp_server/pkg/tools/jobs/describe.go | 3 +- mcp_server/pkg/tools/jobs/logs.go | 3 +- .../pkg/tools/organizations/organizations.go | 3 +- mcp_server/pkg/tools/pipelines/pipelines.go | 6 ++-- mcp_server/pkg/tools/projects/projects.go | 6 ++-- mcp_server/pkg/tools/workflows/workflows.go | 3 +- 8 files changed, 30 insertions(+), 48 deletions(-) diff --git a/mcp_server/pkg/tools/internal/shared/metrics.go b/mcp_server/pkg/tools/internal/shared/metrics.go index 176a20b0d..ad4668aa2 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics.go +++ b/mcp_server/pkg/tools/internal/shared/metrics.go @@ -16,16 +16,14 @@ type ToolMetrics struct { } // NewToolMetrics prepares a metrics emitter scoped to a tool and optional organization ID. -func NewToolMetrics(baseMetricName, toolName, orgID string) *ToolMetrics { - base := strings.TrimSpace(baseMetricName) - if base == "" { +func NewToolMetrics(toolName, orgID string) *ToolMetrics { + name := strings.TrimSpace(toolName) + if name == "" { return nil } - tags := make([]string, 0, 3) - if tag := sanitizeMetricTag("tool_" + strings.TrimSpace(toolName)); tag != "" { - tags = append(tags, tag) - } + base := "tools." + name + tags := make([]string, 0, 1) if normalizedOrg := strings.TrimSpace(strings.ToLower(orgID)); normalizedOrg != "" { if tag := sanitizeMetricTag("org_" + normalizedOrg); tag != "" { @@ -33,14 +31,6 @@ func NewToolMetrics(baseMetricName, toolName, orgID string) *ToolMetrics { } } - if len(tags) == 0 { - tags = append(tags, "tool_unknown") - } - - if len(tags) > 3 { - tags = tags[:3] - } - return &ToolMetrics{ base: base, tags: tags, @@ -49,17 +39,17 @@ func NewToolMetrics(baseMetricName, toolName, orgID string) *ToolMetrics { // IncrementTotal bumps the total execution counter. func (tm *ToolMetrics) IncrementTotal() { - tm.increment("executions_total") + tm.increment("count_total") } // IncrementSuccess bumps the successful execution counter. func (tm *ToolMetrics) IncrementSuccess() { - tm.increment("executions_succeeded") + tm.increment("count_passed") } // IncrementFailure bumps the failed execution counter. func (tm *ToolMetrics) IncrementFailure() { - tm.increment("executions_failed") + tm.increment("count_failed") } // TrackDuration submits the elapsed duration since start. diff --git a/mcp_server/pkg/tools/internal/shared/metrics_test.go b/mcp_server/pkg/tools/internal/shared/metrics_test.go index e955b323e..0ed03e04a 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics_test.go +++ b/mcp_server/pkg/tools/internal/shared/metrics_test.go @@ -5,42 +5,42 @@ import "testing" func TestNewToolMetricsWithOrg(t *testing.T) { t.Parallel() - tm := NewToolMetrics("tools.test_tool", "Test Tool", "ABC-123 ") + tm := NewToolMetrics("test_tool", "ABC-123 ") if tm == nil { t.Fatalf("expected metrics emitter") } - if got, want := len(tm.tags), 2; got != want { + if got, want := len(tm.tags), 1; got != want { t.Fatalf("expected %d tags, got %d", want, got) } - if tm.tags[0] != "tool_test_tool" { - t.Fatalf("expected tool tag to be %q, got %q", "tool_test_tool", tm.tags[0]) + if tm.tags[0] != "org_abc-123" { + t.Fatalf("expected org tag to be %q, got %q", "org_abc-123", tm.tags[0]) } - if tm.tags[1] != "org_abc-123" { - t.Fatalf("expected org tag to be %q, got %q", "org_abc-123", tm.tags[1]) + if tm.base != "tools.test_tool" { + t.Fatalf("expected base metric %q, got %q", "tools.test_tool", tm.base) } } func TestNewToolMetricsWithoutOrg(t *testing.T) { t.Parallel() - tm := NewToolMetrics("tools.test_tool", "Test Tool", "") + tm := NewToolMetrics("test_tool", "") if tm == nil { t.Fatalf("expected metrics emitter") } - if got := tm.tags[0]; got != "tool_test_tool" { - t.Fatalf("expected tool tag %q, got %q", "tool_test_tool", got) + if len(tm.tags) != 0 { + t.Fatalf("expected no tags, got %d", len(tm.tags)) } - if len(tm.tags) != 1 { - t.Fatalf("expected only tool tag, got %d tags", len(tm.tags)) + if tm.base != "tools.test_tool" { + t.Fatalf("expected base metric %q, got %q", "tools.test_tool", tm.base) } } -func TestNewToolMetricsWithoutBase(t *testing.T) { +func TestNewToolMetricsWithoutName(t *testing.T) { t.Parallel() - if tm := NewToolMetrics("", "any", ""); tm != nil { - t.Fatalf("expected nil metrics when base metric name is empty") + if tm := NewToolMetrics("", ""); tm != nil { + t.Fatalf("expected nil metrics when tool name is empty") } } diff --git a/mcp_server/pkg/tools/jobs/describe.go b/mcp_server/pkg/tools/jobs/describe.go index 7213cb071..0dd2bd70a 100644 --- a/mcp_server/pkg/tools/jobs/describe.go +++ b/mcp_server/pkg/tools/jobs/describe.go @@ -16,7 +16,6 @@ import ( const ( describeToolName = "jobs_describe" - describeMetricBase = "tools.jobs_describe" projectViewPermission = "project.view" ) @@ -88,7 +87,7 @@ func describeHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(describeMetricBase, describeToolName, orgID) + metrics := shared.NewToolMetrics(describeToolName, orgID) if metrics != nil { metrics.IncrementTotal() } diff --git a/mcp_server/pkg/tools/jobs/logs.go b/mcp_server/pkg/tools/jobs/logs.go index c23eb20ec..68ff307c4 100644 --- a/mcp_server/pkg/tools/jobs/logs.go +++ b/mcp_server/pkg/tools/jobs/logs.go @@ -22,7 +22,6 @@ import ( const ( logsToolName = "jobs_logs" - logsMetricBase = "tools.jobs_logs" loghubSource = "loghub" loghub2Source = "loghub2" loghub2TokenDuration = 300 @@ -108,7 +107,7 @@ func logsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(logsMetricBase, logsToolName, orgID) + metrics := shared.NewToolMetrics(logsToolName, orgID) if metrics != nil { metrics.IncrementTotal() } diff --git a/mcp_server/pkg/tools/organizations/organizations.go b/mcp_server/pkg/tools/organizations/organizations.go index 62b5b3983..0152e5f4c 100644 --- a/mcp_server/pkg/tools/organizations/organizations.go +++ b/mcp_server/pkg/tools/organizations/organizations.go @@ -23,7 +23,6 @@ import ( const ( listToolName = "organizations_list" - listMetricBase = "tools.organizations_list" defaultPageSize = 20 maxPageSize = 100 ) @@ -137,7 +136,7 @@ type organizationDetails struct { func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - metrics := shared.NewToolMetrics(listMetricBase, listToolName, "") + metrics := shared.NewToolMetrics(listToolName, "") if metrics != nil { metrics.IncrementTotal() } diff --git a/mcp_server/pkg/tools/pipelines/pipelines.go b/mcp_server/pkg/tools/pipelines/pipelines.go index b2d25c317..241f93bf6 100644 --- a/mcp_server/pkg/tools/pipelines/pipelines.go +++ b/mcp_server/pkg/tools/pipelines/pipelines.go @@ -21,9 +21,7 @@ import ( const ( listToolName = "pipelines_list" - listMetricBase = "tools.pipelines_list" jobsToolName = "pipeline_jobs" - jobsMetricBase = "tools.pipeline_jobs" defaultLimit = 20 maxLimit = 100 errNoClient = "pipeline gRPC endpoint is not configured" @@ -244,7 +242,7 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(listMetricBase, listToolName, orgID) + metrics := shared.NewToolMetrics(listToolName, orgID) if metrics != nil { metrics.IncrementTotal() } @@ -480,7 +478,7 @@ func jobsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(jobsMetricBase, jobsToolName, orgID) + metrics := shared.NewToolMetrics(jobsToolName, orgID) if metrics != nil { metrics.IncrementTotal() } diff --git a/mcp_server/pkg/tools/projects/projects.go b/mcp_server/pkg/tools/projects/projects.go index e06dd1105..03c12ac3a 100644 --- a/mcp_server/pkg/tools/projects/projects.go +++ b/mcp_server/pkg/tools/projects/projects.go @@ -23,9 +23,7 @@ import ( const ( listToolName = "projects_list" - listMetricBase = "tools.projects_list" searchToolName = "projects_search" - searchMetricBase = "tools.projects_search" defaultListLimit = 25 maxListLimit = 200 defaultSearchLimit = 20 @@ -473,7 +471,7 @@ You can discover organizations by calling organizations_list first.`), nil Example: projects_list(organization_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")`, err)), nil } - metrics := shared.NewToolMetrics(listMetricBase, listToolName, orgID) + metrics := shared.NewToolMetrics(listToolName, orgID) if metrics != nil { metrics.IncrementTotal() } @@ -634,7 +632,7 @@ Check INTERNAL_API_URL_PROJECT or MCP_PROJECT_GRPC_ENDPOINT and ensure ProjectHu return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(searchMetricBase, searchToolName, orgID) + metrics := shared.NewToolMetrics(searchToolName, orgID) if metrics != nil { metrics.IncrementTotal() } diff --git a/mcp_server/pkg/tools/workflows/workflows.go b/mcp_server/pkg/tools/workflows/workflows.go index 6b0ce0b6e..02d50dae1 100644 --- a/mcp_server/pkg/tools/workflows/workflows.go +++ b/mcp_server/pkg/tools/workflows/workflows.go @@ -21,7 +21,6 @@ import ( const ( searchToolName = "workflows_search" - searchMetricBase = "tools.workflows_search" defaultLimit = 20 maxLimit = 100 missingWorkflowError = "workflow gRPC endpoint is not configured" @@ -159,7 +158,7 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(searchMetricBase, searchToolName, orgID) + metrics := shared.NewToolMetrics(searchToolName, orgID) if metrics != nil { metrics.IncrementTotal() } From b0d9baf1478d31a4d6c6bfed417618b4cba41099 Mon Sep 17 00:00:00 2001 From: Damjan Becirovic Date: Wed, 12 Nov 2025 11:46:23 +0100 Subject: [PATCH 3/4] feat(mcp_server): add tests for submiting metrics --- .../pkg/tools/internal/shared/metrics.go | 9 +- .../pkg/tools/internal/shared/metrics_test.go | 100 ++++++++++++++++-- 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/mcp_server/pkg/tools/internal/shared/metrics.go b/mcp_server/pkg/tools/internal/shared/metrics.go index ad4668aa2..f59f91169 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics.go +++ b/mcp_server/pkg/tools/internal/shared/metrics.go @@ -9,6 +9,11 @@ import ( "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" ) +var ( + watchmanBenchmarkWithTags = watchman.BenchmarkWithTags + watchmanIncrementWithTags = watchman.IncrementWithTags +) + // ToolMetrics emits Watchman metrics for a specific tool invocation. type ToolMetrics struct { base string @@ -59,7 +64,7 @@ func (tm *ToolMetrics) TrackDuration(start time.Time) { } name := tm.metricName("duration_ms") - if err := watchman.BenchmarkWithTags(start, name, tm.tags); err != nil { + if err := watchmanBenchmarkWithTags(start, name, tm.tags); err != nil { logMetricError(name, err) } } @@ -69,7 +74,7 @@ func (tm *ToolMetrics) increment(suffix string) { return } name := tm.metricName(suffix) - if err := watchman.IncrementWithTags(name, tm.tags); err != nil { + if err := watchmanIncrementWithTags(name, tm.tags); err != nil { logMetricError(name, err) } } diff --git a/mcp_server/pkg/tools/internal/shared/metrics_test.go b/mcp_server/pkg/tools/internal/shared/metrics_test.go index 0ed03e04a..9c8bd510f 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics_test.go +++ b/mcp_server/pkg/tools/internal/shared/metrics_test.go @@ -1,10 +1,11 @@ package shared -import "testing" +import ( + "testing" + "time" +) func TestNewToolMetricsWithOrg(t *testing.T) { - t.Parallel() - tm := NewToolMetrics("test_tool", "ABC-123 ") if tm == nil { t.Fatalf("expected metrics emitter") @@ -22,8 +23,6 @@ func TestNewToolMetricsWithOrg(t *testing.T) { } func TestNewToolMetricsWithoutOrg(t *testing.T) { - t.Parallel() - tm := NewToolMetrics("test_tool", "") if tm == nil { t.Fatalf("expected metrics emitter") @@ -38,9 +37,96 @@ func TestNewToolMetricsWithoutOrg(t *testing.T) { } func TestNewToolMetricsWithoutName(t *testing.T) { - t.Parallel() - if tm := NewToolMetrics("", ""); tm != nil { t.Fatalf("expected nil metrics when tool name is empty") } } + +func TestToolMetricsIncrementCounters(t *testing.T) { + tm := NewToolMetrics("test_tool", "Org-123") + if tm == nil { + t.Fatalf("expected metrics emitter") + } + + orig := watchmanIncrementWithTags + defer func() { watchmanIncrementWithTags = orig }() + + tests := []struct { + name string + invoke func(*ToolMetrics) + wantMetric string + expectedTag string + }{ + { + name: "total", + invoke: (*ToolMetrics).IncrementTotal, + wantMetric: "tools.test_tool.count_total", + expectedTag: "org_org-123", + }, + { + name: "success", + invoke: (*ToolMetrics).IncrementSuccess, + wantMetric: "tools.test_tool.count_passed", + expectedTag: "org_org-123", + }, + { + name: "failure", + invoke: (*ToolMetrics).IncrementFailure, + wantMetric: "tools.test_tool.count_failed", + expectedTag: "org_org-123", + }, + } + + for _, tt := range tests { + called := 0 + watchmanIncrementWithTags = func(metric string, tags []string) error { + called++ + if metric != tt.wantMetric { + t.Fatalf("test %s: expected metric %q, got %q", tt.name, tt.wantMetric, metric) + } + if len(tags) != 1 || tags[0] != tt.expectedTag { + t.Fatalf("test %s: expected tags [%s], got %v", tt.name, tt.expectedTag, tags) + } + return nil + } + + tt.invoke(tm) + + if called != 1 { + t.Fatalf("test %s: expected watchman call once, got %d", tt.name, called) + } + } +} + +func TestToolMetricsTrackDuration(t *testing.T) { + tm := NewToolMetrics("test_tool", "Org123") + if tm == nil { + t.Fatalf("expected metrics emitter") + } + + start := time.Unix(0, 0) + + orig := watchmanBenchmarkWithTags + defer func() { watchmanBenchmarkWithTags = orig }() + + called := 0 + watchmanBenchmarkWithTags = func(actualStart time.Time, name string, tags []string) error { + called++ + if !actualStart.Equal(start) { + t.Fatalf("expected start time %v, got %v", start, actualStart) + } + if name != "tools.test_tool.duration_ms" { + t.Fatalf("expected metric name %q, got %q", "tools.test_tool.duration_ms", name) + } + if len(tags) != 1 || tags[0] != "org_org123" { + t.Fatalf("expected tags [org_org123], got %v", tags) + } + return nil + } + + tm.TrackDuration(start) + + if called != 1 { + t.Fatalf("expected watchman benchmark to be called once, got %d", called) + } +} From 4d532546b59e0c40050f28d92546679e2eec7194 Mon Sep 17 00:00:00 2001 From: Damjan Becirovic Date: Wed, 12 Nov 2025 13:57:03 +0100 Subject: [PATCH 4/4] feat(mcp_server): tag metrics with org name instead of id --- mcp_server/AGENTS.md | 6 + mcp_server/cmd/mcp_server/main.go | 5 + mcp_server/pkg/tools/config.go | 16 + .../pkg/tools/internal/shared/metrics.go | 91 ++++- .../pkg/tools/internal/shared/metrics_test.go | 326 +++++++++++++++++- .../pkg/tools/internal/shared/org_resolver.go | 155 +++++++++ .../internal/shared/org_resolver_test.go | 207 +++++++++++ mcp_server/pkg/tools/jobs/describe.go | 22 +- mcp_server/pkg/tools/jobs/logs.go | 29 +- .../pkg/tools/organizations/organizations.go | 25 +- mcp_server/pkg/tools/pipelines/pipelines.go | 43 +-- mcp_server/pkg/tools/projects/projects.go | 45 +-- mcp_server/pkg/tools/workflows/workflows.go | 22 +- mcp_server/test/support/stubs.go | 50 ++- 14 files changed, 856 insertions(+), 186 deletions(-) create mode 100644 mcp_server/pkg/tools/config.go create mode 100644 mcp_server/pkg/tools/internal/shared/org_resolver.go create mode 100644 mcp_server/pkg/tools/internal/shared/org_resolver_test.go diff --git a/mcp_server/AGENTS.md b/mcp_server/AGENTS.md index 9794d8188..02ff25dc8 100644 --- a/mcp_server/AGENTS.md +++ b/mcp_server/AGENTS.md @@ -17,3 +17,9 @@ Follow Conventional Commits (e.g., `feat(auth):`, `fix(front):`, `docs:`) and ke ## Security & Configuration Tips Surface dependency issues early with `make check.ex.deps`, `make check.go.deps`, and `make check.docker`. Store secrets in local `.env` files; never commit credentials. Runtime configuration reads internal gRPC endpoints from `INTERNAL_API_URL_PLUMBER`, `INTERNAL_API_URL_JOB`, `INTERNAL_API_URL_LOGHUB`, and `INTERNAL_API_URL_LOGHUB2`, falling back to legacy `MCP_*` variables. Export `DOCKER_BUILDKIT=1` to mirror CI Docker builds. + +## MCP Tool Metrics Quickstart +- Shared instrumentation lives in `pkg/tools/internal/shared/metrics.go`. Inside every MCP tool handler, create a tracker with `tracker := shared.TrackToolExecution(ctx, "", orgID)`, `defer tracker.Cleanup()`, and call `tracker.MarkSuccess()` right before you return a successful result. +- Organization tags resolve via `pkg/tools/internal/shared/org_resolver.go`. The resolver is configured once through `tools.ConfigureMetrics(provider)` during server bootstrap, so new tools only need to supply the org ID (or `""` when not applicable). +- For org-agnostic tools (e.g., `organizations_list`), pass an empty org ID so we still emit `count_*` and `duration_ms` metrics without tags. +- Following this pattern ensures every tool automatically publishes `tools..count_total|count_passed|count_failed` and `tools..duration_ms` metrics, with human-readable org tags whenever available, keeping dashboards consistent without extra boilerplate. diff --git a/mcp_server/cmd/mcp_server/main.go b/mcp_server/cmd/mcp_server/main.go index 78b61f9e7..5f846c2bc 100644 --- a/mcp_server/cmd/mcp_server/main.go +++ b/mcp_server/cmd/mcp_server/main.go @@ -19,6 +19,7 @@ import ( "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/jobs" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/organizations" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/pipelines" @@ -99,6 +100,10 @@ func main() { }() } + // Configure organization name resolver for metrics tagging. + // This must be called once before registering tools that emit metrics. + tools.ConfigureMetrics(provider) + organizations.Register(srv, provider) projects.Register(srv, provider) workflows.Register(srv, provider) diff --git a/mcp_server/pkg/tools/config.go b/mcp_server/pkg/tools/config.go new file mode 100644 index 000000000..6682af4fa --- /dev/null +++ b/mcp_server/pkg/tools/config.go @@ -0,0 +1,16 @@ +package tools + +import ( + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared" +) + +// ConfigureMetrics initializes global metrics configuration for all tools. +// This should be called once during server initialization before registering any tools. +// +// It configures the organization name resolver used for metrics tagging, +// allowing metrics to be tagged with human-readable organization names +// instead of UUIDs. +func ConfigureMetrics(provider internalapi.Provider) { + shared.ConfigureDefaultOrgResolver(provider) +} diff --git a/mcp_server/pkg/tools/internal/shared/metrics.go b/mcp_server/pkg/tools/internal/shared/metrics.go index f59f91169..104d10a69 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics.go +++ b/mcp_server/pkg/tools/internal/shared/metrics.go @@ -1,6 +1,7 @@ package shared import ( + "context" "strings" "time" @@ -21,7 +22,12 @@ type ToolMetrics struct { } // NewToolMetrics prepares a metrics emitter scoped to a tool and optional organization ID. -func NewToolMetrics(toolName, orgID string) *ToolMetrics { +func NewToolMetrics(ctx context.Context, toolName, orgID string) *ToolMetrics { + resolver := getOrgNameResolver() + return newToolMetricsWithResolver(ctx, toolName, orgID, resolver) +} + +func newToolMetricsWithResolver(ctx context.Context, toolName, orgID string, resolver OrgNameResolver) *ToolMetrics { name := strings.TrimSpace(toolName) if name == "" { return nil @@ -30,10 +36,8 @@ func NewToolMetrics(toolName, orgID string) *ToolMetrics { base := "tools." + name tags := make([]string, 0, 1) - if normalizedOrg := strings.TrimSpace(strings.ToLower(orgID)); normalizedOrg != "" { - if tag := sanitizeMetricTag("org_" + normalizedOrg); tag != "" { - tags = append(tags, tag) - } + if tag := resolveOrgTag(ctx, orgID, resolver); tag != "" { + tags = append(tags, tag) } return &ToolMetrics{ @@ -89,6 +93,30 @@ func (tm *ToolMetrics) metricName(suffix string) string { return tm.base + "." + suffix } +func resolveOrgTag(ctx context.Context, orgID string, resolver OrgNameResolver) string { + orgID = strings.TrimSpace(orgID) + if orgID == "" { + return "" + } + + value := orgID + if resolver != nil { + if name, err := resolver.Resolve(ctx, orgID); err == nil { + name = strings.TrimSpace(name) + if name != "" { + value = name + } + } else { + logging.ForComponent("metrics"). + WithError(err). + WithField("orgId", orgID). + Debug("failed to resolve organization name for metrics") + } + } + + return sanitizeMetricTag("org_" + value) +} + func sanitizeMetricTag(value string) string { value = strings.TrimSpace(strings.ToLower(value)) if value == "" { @@ -107,3 +135,56 @@ func logMetricError(metric string, err error) { WithField("metric", metric). Debug("failed to submit Watchman metric") } + +// ToolExecutionTracker helps track tool execution metrics with a consistent pattern. +// It provides methods to mark success and automatically handles cleanup via defer. +type ToolExecutionTracker struct { + metrics *ToolMetrics + start time.Time + success *bool +} + +// TrackToolExecution creates a new tracker for monitoring tool execution metrics. +// It automatically increments the total counter and sets up cleanup logic. +// +// Usage: +// +// tracker := shared.TrackToolExecution(ctx, toolName, orgID) +// defer tracker.Cleanup() +// // ... tool logic ... +// tracker.MarkSuccess() // Call before successful return +func TrackToolExecution(ctx context.Context, toolName, orgID string) *ToolExecutionTracker { + metrics := NewToolMetrics(ctx, toolName, orgID) + if metrics != nil { + metrics.IncrementTotal() + } + + success := false + return &ToolExecutionTracker{ + metrics: metrics, + start: time.Now(), + success: &success, + } +} + +// MarkSuccess marks the tool execution as successful. +// This should be called just before returning a successful result. +func (t *ToolExecutionTracker) MarkSuccess() { + if t != nil && t.success != nil { + *t.success = true + } +} + +// Cleanup emits duration and success/failure metrics. +// This should be called via defer immediately after creating the tracker. +func (t *ToolExecutionTracker) Cleanup() { + if t == nil || t.metrics == nil { + return + } + t.metrics.TrackDuration(t.start) + if t.success != nil && *t.success { + t.metrics.IncrementSuccess() + } else { + t.metrics.IncrementFailure() + } +} diff --git a/mcp_server/pkg/tools/internal/shared/metrics_test.go b/mcp_server/pkg/tools/internal/shared/metrics_test.go index 9c8bd510f..968ccdbf5 100644 --- a/mcp_server/pkg/tools/internal/shared/metrics_test.go +++ b/mcp_server/pkg/tools/internal/shared/metrics_test.go @@ -1,12 +1,33 @@ package shared import ( + "context" + "fmt" + "strings" + "sync" "testing" "time" ) -func TestNewToolMetricsWithOrg(t *testing.T) { - tm := NewToolMetrics("test_tool", "ABC-123 ") +type orgResolverFunc func(ctx context.Context, orgID string) (string, error) + +func (f orgResolverFunc) Resolve(ctx context.Context, orgID string) (string, error) { + return f(ctx, orgID) +} + +func withTestOrgResolver(t *testing.T, resolver OrgNameResolver) { + SetOrgNameResolver(resolver) + t.Cleanup(func() { + SetOrgNameResolver(nil) + }) +} + +func TestNewToolMetricsWithOrgName(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Acme Corp", nil + })) + + tm := NewToolMetrics(context.Background(), "test_tool", "ABC-123 ") if tm == nil { t.Fatalf("expected metrics emitter") } @@ -14,16 +35,31 @@ func TestNewToolMetricsWithOrg(t *testing.T) { if got, want := len(tm.tags), 1; got != want { t.Fatalf("expected %d tags, got %d", want, got) } - if tm.tags[0] != "org_abc-123" { - t.Fatalf("expected org tag to be %q, got %q", "org_abc-123", tm.tags[0]) + if tm.tags[0] != "org_acme_corp" { + t.Fatalf("expected org tag to be %q, got %q", "org_acme_corp", tm.tags[0]) } if tm.base != "tools.test_tool" { t.Fatalf("expected base metric %q, got %q", "tools.test_tool", tm.base) } } +func TestNewToolMetricsFallsBackToOrgID(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "", fmt.Errorf("resolver error") + })) + + tm := NewToolMetrics(context.Background(), "test_tool", "ABC-123 ") + if tm == nil { + t.Fatalf("expected metrics emitter") + } + + if len(tm.tags) != 1 || tm.tags[0] != "org_abc-123" { + t.Fatalf("expected fallback tag org_abc-123, got %v", tm.tags) + } +} + func TestNewToolMetricsWithoutOrg(t *testing.T) { - tm := NewToolMetrics("test_tool", "") + tm := NewToolMetrics(context.Background(), "test_tool", "") if tm == nil { t.Fatalf("expected metrics emitter") } @@ -37,13 +73,17 @@ func TestNewToolMetricsWithoutOrg(t *testing.T) { } func TestNewToolMetricsWithoutName(t *testing.T) { - if tm := NewToolMetrics("", ""); tm != nil { + if tm := NewToolMetrics(context.Background(), "", ""); tm != nil { t.Fatalf("expected nil metrics when tool name is empty") } } func TestToolMetricsIncrementCounters(t *testing.T) { - tm := NewToolMetrics("test_tool", "Org-123") + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Demo Org", nil + })) + + tm := NewToolMetrics(context.Background(), "test_tool", "Org-123") if tm == nil { t.Fatalf("expected metrics emitter") } @@ -61,19 +101,19 @@ func TestToolMetricsIncrementCounters(t *testing.T) { name: "total", invoke: (*ToolMetrics).IncrementTotal, wantMetric: "tools.test_tool.count_total", - expectedTag: "org_org-123", + expectedTag: "org_demo_org", }, { name: "success", invoke: (*ToolMetrics).IncrementSuccess, wantMetric: "tools.test_tool.count_passed", - expectedTag: "org_org-123", + expectedTag: "org_demo_org", }, { name: "failure", invoke: (*ToolMetrics).IncrementFailure, wantMetric: "tools.test_tool.count_failed", - expectedTag: "org_org-123", + expectedTag: "org_demo_org", }, } @@ -99,7 +139,11 @@ func TestToolMetricsIncrementCounters(t *testing.T) { } func TestToolMetricsTrackDuration(t *testing.T) { - tm := NewToolMetrics("test_tool", "Org123") + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Demo Org", nil + })) + + tm := NewToolMetrics(context.Background(), "test_tool", "Org123") if tm == nil { t.Fatalf("expected metrics emitter") } @@ -118,8 +162,8 @@ func TestToolMetricsTrackDuration(t *testing.T) { if name != "tools.test_tool.duration_ms" { t.Fatalf("expected metric name %q, got %q", "tools.test_tool.duration_ms", name) } - if len(tags) != 1 || tags[0] != "org_org123" { - t.Fatalf("expected tags [org_org123], got %v", tags) + if len(tags) != 1 || tags[0] != "org_demo_org" { + t.Fatalf("expected tags [org_demo_org], got %v", tags) } return nil } @@ -130,3 +174,259 @@ func TestToolMetricsTrackDuration(t *testing.T) { t.Fatalf("expected watchman benchmark to be called once, got %d", called) } } + +func TestToolExecutionTrackerSuccess(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Test Org", nil + })) + + origInc := watchmanIncrementWithTags + origBench := watchmanBenchmarkWithTags + defer func() { + watchmanIncrementWithTags = origInc + watchmanBenchmarkWithTags = origBench + }() + + totalCalled := 0 + successCalled := 0 + durationCalled := 0 + + watchmanIncrementWithTags = func(metric string, tags []string) error { + if strings.HasSuffix(metric, "count_total") { + totalCalled++ + } else if strings.HasSuffix(metric, "count_passed") { + successCalled++ + } + return nil + } + + watchmanBenchmarkWithTags = func(start time.Time, name string, tags []string) error { + durationCalled++ + return nil + } + + tracker := TrackToolExecution(context.Background(), "test_tool", "org-123") + if tracker == nil { + t.Fatal("expected non-nil tracker") + } + + tracker.MarkSuccess() + tracker.Cleanup() + + if totalCalled != 1 { + t.Fatalf("expected total counter once, got %d", totalCalled) + } + if successCalled != 1 { + t.Fatalf("expected success counter once, got %d", successCalled) + } + if durationCalled != 1 { + t.Fatalf("expected duration tracking once, got %d", durationCalled) + } +} + +func TestToolExecutionTrackerFailure(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Test Org", nil + })) + + origInc := watchmanIncrementWithTags + origBench := watchmanBenchmarkWithTags + defer func() { + watchmanIncrementWithTags = origInc + watchmanBenchmarkWithTags = origBench + }() + + totalCalled := 0 + failureCalled := 0 + successCalled := 0 + + watchmanIncrementWithTags = func(metric string, tags []string) error { + if strings.HasSuffix(metric, "count_total") { + totalCalled++ + } else if strings.HasSuffix(metric, "count_failed") { + failureCalled++ + } else if strings.HasSuffix(metric, "count_passed") { + successCalled++ + } + return nil + } + + watchmanBenchmarkWithTags = func(start time.Time, name string, tags []string) error { + return nil + } + + tracker := TrackToolExecution(context.Background(), "test_tool", "org-123") + // Don't call MarkSuccess - simulates failure + tracker.Cleanup() + + if totalCalled != 1 { + t.Fatalf("expected total counter once, got %d", totalCalled) + } + if failureCalled != 1 { + t.Fatalf("expected failure counter once, got %d", failureCalled) + } + if successCalled != 0 { + t.Fatalf("expected no success counter, got %d", successCalled) + } +} + +func TestToolExecutionTrackerNilSafety(t *testing.T) { + var tracker *ToolExecutionTracker + // Should not panic + tracker.MarkSuccess() + tracker.Cleanup() +} + +// TestMetricsIntegrationWithWatchman verifies that metrics are properly emitted +// to Watchman when tools are executed. This is an integration test that validates +// the full metrics pipeline. +func TestMetricsIntegrationWithWatchman(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Integration Test Org", nil + })) + + origInc := watchmanIncrementWithTags + origBench := watchmanBenchmarkWithTags + defer func() { + watchmanIncrementWithTags = origInc + watchmanBenchmarkWithTags = origBench + }() + + // Track all metrics emitted + type metricCall struct { + name string + tags []string + } + var incrementCalls []metricCall + var benchmarkCalls []metricCall + var mu sync.Mutex + + watchmanIncrementWithTags = func(metric string, tags []string) error { + mu.Lock() + defer mu.Unlock() + incrementCalls = append(incrementCalls, metricCall{ + name: metric, + tags: append([]string(nil), tags...), + }) + return nil + } + + watchmanBenchmarkWithTags = func(start time.Time, name string, tags []string) error { + mu.Lock() + defer mu.Unlock() + benchmarkCalls = append(benchmarkCalls, metricCall{ + name: name, + tags: append([]string(nil), tags...), + }) + return nil + } + + // Simulate a successful tool execution + ctx := context.Background() + metrics := NewToolMetrics(ctx, "integration_test_tool", "org-abc123") + if metrics == nil { + t.Fatal("expected non-nil metrics") + } + + metrics.IncrementTotal() + time.Sleep(10 * time.Millisecond) // Simulate work + metrics.TrackDuration(time.Now().Add(-10 * time.Millisecond)) + metrics.IncrementSuccess() + + // Verify metrics were emitted + mu.Lock() + defer mu.Unlock() + + if len(incrementCalls) != 2 { + t.Fatalf("expected 2 increment calls (total, success), got %d", len(incrementCalls)) + } + + if len(benchmarkCalls) != 1 { + t.Fatalf("expected 1 benchmark call (duration), got %d", len(benchmarkCalls)) + } + + // Verify total counter + totalCall := incrementCalls[0] + if !strings.Contains(totalCall.name, "integration_test_tool") { + t.Errorf("expected tool name in metric, got %q", totalCall.name) + } + if !strings.HasSuffix(totalCall.name, "count_total") { + t.Errorf("expected count_total suffix, got %q", totalCall.name) + } + if len(totalCall.tags) != 1 || !strings.Contains(totalCall.tags[0], "integration_test_org") { + t.Errorf("expected org tag, got %v", totalCall.tags) + } + + // Verify success counter + successCall := incrementCalls[1] + if !strings.HasSuffix(successCall.name, "count_passed") { + t.Errorf("expected count_passed suffix, got %q", successCall.name) + } + + // Verify duration tracking + durationCall := benchmarkCalls[0] + if !strings.HasSuffix(durationCall.name, "duration_ms") { + t.Errorf("expected duration_ms suffix, got %q", durationCall.name) + } + if len(durationCall.tags) != 1 || !strings.Contains(durationCall.tags[0], "integration_test_org") { + t.Errorf("expected org tag in duration, got %v", durationCall.tags) + } +} + +// TestMetricsIntegrationFailureCase verifies that failure metrics are properly +// emitted when a tool execution fails. +func TestMetricsIntegrationFailureCase(t *testing.T) { + withTestOrgResolver(t, orgResolverFunc(func(ctx context.Context, orgID string) (string, error) { + return "Failure Test Org", nil + })) + + origInc := watchmanIncrementWithTags + origBench := watchmanBenchmarkWithTags + defer func() { + watchmanIncrementWithTags = origInc + watchmanBenchmarkWithTags = origBench + }() + + type metricCall struct { + name string + } + var incrementCalls []metricCall + var mu sync.Mutex + + watchmanIncrementWithTags = func(metric string, tags []string) error { + mu.Lock() + defer mu.Unlock() + incrementCalls = append(incrementCalls, metricCall{name: metric}) + return nil + } + + watchmanBenchmarkWithTags = func(start time.Time, name string, tags []string) error { + return nil + } + + // Simulate a failed tool execution + ctx := context.Background() + metrics := NewToolMetrics(ctx, "failure_test_tool", "org-xyz789") + if metrics == nil { + t.Fatal("expected non-nil metrics") + } + + metrics.IncrementTotal() + time.Sleep(5 * time.Millisecond) // Simulate work + metrics.TrackDuration(time.Now().Add(-5 * time.Millisecond)) + metrics.IncrementFailure() + + // Verify failure metrics were emitted + mu.Lock() + defer mu.Unlock() + + if len(incrementCalls) != 2 { + t.Fatalf("expected 2 increment calls (total, failure), got %d", len(incrementCalls)) + } + + // Verify failure counter + failureCall := incrementCalls[1] + if !strings.HasSuffix(failureCall.name, "count_failed") { + t.Errorf("expected count_failed suffix, got %q", failureCall.name) + } +} diff --git a/mcp_server/pkg/tools/internal/shared/org_resolver.go b/mcp_server/pkg/tools/internal/shared/org_resolver.go new file mode 100644 index 000000000..bef0e47ce --- /dev/null +++ b/mcp_server/pkg/tools/internal/shared/org_resolver.go @@ -0,0 +1,155 @@ +package shared + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + orgpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/organization" + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" + "google.golang.org/grpc" +) + +// OrgNameResolver resolves organization IDs to human-readable names. +type OrgNameResolver interface { + Resolve(ctx context.Context, orgID string) (string, error) +} + +type orgDescriber interface { + Describe(ctx context.Context, in *orgpb.DescribeRequest, opts ...grpc.CallOption) (*orgpb.DescribeResponse, error) +} + +type cachedOrgResolver struct { + client orgDescriber + timeout time.Duration + ttl time.Duration + now func() time.Time + + mu sync.RWMutex + cache map[string]orgCacheEntry +} + +type orgCacheEntry struct { + name string + expiresAt time.Time +} + +var ( + orgResolver OrgNameResolver + orgResolverLock sync.RWMutex + orgResolverInit sync.Once +) + +// SetOrgNameResolver allows the application to provide a resolver used by metric helpers. +func SetOrgNameResolver(resolver OrgNameResolver) { + orgResolverLock.Lock() + defer orgResolverLock.Unlock() + orgResolver = resolver +} + +func getOrgNameResolver() OrgNameResolver { + orgResolverLock.RLock() + defer orgResolverLock.RUnlock() + return orgResolver +} + +// NewCachedOrgNameResolver returns a resolver backed by the organization gRPC client with basic caching. +func NewCachedOrgNameResolver(provider internalapi.Provider, ttl time.Duration) OrgNameResolver { + if provider == nil { + return nil + } + client := provider.Organizations() + if client == nil { + return nil + } + if ttl <= 0 { + ttl = 30 * time.Minute + } + return &cachedOrgResolver{ + client: client, + timeout: provider.CallTimeout(), + ttl: ttl, + now: time.Now, + cache: make(map[string]orgCacheEntry), + } +} + +// ConfigureDefaultOrgResolver installs a cached resolver backed by the provided internal API clients. +func ConfigureDefaultOrgResolver(provider internalapi.Provider) { + if provider == nil { + return + } + orgResolverInit.Do(func() { + if resolver := NewCachedOrgNameResolver(provider, 30*time.Minute); resolver != nil { + SetOrgNameResolver(resolver) + } + }) +} + +func (r *cachedOrgResolver) Resolve(ctx context.Context, orgID string) (string, error) { + if r == nil { + return "", fmt.Errorf("resolver is not configured") + } + + key := strings.TrimSpace(strings.ToLower(orgID)) + if key == "" { + return "", fmt.Errorf("organization id is required") + } + + if name, ok := r.lookup(key); ok { + return name, nil + } + + callCtx := ctx + var cancel context.CancelFunc + if r.timeout > 0 { + callCtx, cancel = context.WithTimeout(ctx, r.timeout) + defer cancel() + } + + resp, err := r.client.Describe(callCtx, &orgpb.DescribeRequest{OrgId: key}) + if err != nil { + return "", err + } + if err := CheckResponseStatus(resp.GetStatus()); err != nil { + return "", err + } + org := resp.GetOrganization() + if org == nil { + return "", fmt.Errorf("describe response missing organization payload") + } + name := strings.TrimSpace(org.GetName()) + if name == "" { + name = strings.TrimSpace(org.GetOrgUsername()) + } + if name == "" { + name = key + } + + r.store(key, name) + return name, nil +} + +func (r *cachedOrgResolver) lookup(key string) (string, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + entry, ok := r.cache[key] + if !ok { + return "", false + } + if r.ttl > 0 && r.now().After(entry.expiresAt) { + return "", false + } + return entry.name, true +} + +func (r *cachedOrgResolver) store(key, name string) { + r.mu.Lock() + defer r.mu.Unlock() + r.cache[key] = orgCacheEntry{ + name: name, + expiresAt: r.now().Add(r.ttl), + } +} diff --git a/mcp_server/pkg/tools/internal/shared/org_resolver_test.go b/mcp_server/pkg/tools/internal/shared/org_resolver_test.go new file mode 100644 index 000000000..89aeb84f1 --- /dev/null +++ b/mcp_server/pkg/tools/internal/shared/org_resolver_test.go @@ -0,0 +1,207 @@ +package shared + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + orgpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/organization" + responsepb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/response_status" + "google.golang.org/grpc" +) + +type describeStub struct { + responses map[string]*orgpb.Organization + calls int + err error +} + +func (d *describeStub) Describe(ctx context.Context, in *orgpb.DescribeRequest, opts ...grpc.CallOption) (*orgpb.DescribeResponse, error) { + if d.err != nil { + return nil, d.err + } + d.calls++ + org := d.responses[strings.ToLower(strings.TrimSpace(in.GetOrgId()))] + if org == nil { + return &orgpb.DescribeResponse{ + Status: &responsepb.ResponseStatus{ + Code: responsepb.ResponseStatus_BAD_PARAM, + Message: "not found", + }, + }, nil + } + return &orgpb.DescribeResponse{ + Status: &responsepb.ResponseStatus{ + Code: responsepb.ResponseStatus_OK, + }, + Organization: org, + }, nil +} + +func TestCachedOrgResolverCachesResults(t *testing.T) { + stub := &describeStub{ + responses: map[string]*orgpb.Organization{ + "org-1": {OrgId: "org-1", Name: "Acme Org"}, + }, + } + now := time.Unix(0, 0) + resolver := &cachedOrgResolver{ + client: stub, + timeout: time.Second, + ttl: time.Minute, + now: func() time.Time { + return now + }, + cache: make(map[string]orgCacheEntry), + } + + name, err := resolver.Resolve(context.Background(), "ORG-1") + if err != nil { + t.Fatalf("resolve returned error: %v", err) + } + if name != "Acme Org" { + t.Fatalf("expected Acme Org, got %q", name) + } + + name, err = resolver.Resolve(context.Background(), "org-1") + if err != nil { + t.Fatalf("second resolve returned error: %v", err) + } + if name != "Acme Org" { + t.Fatalf("expected cached Acme Org, got %q", name) + } + if stub.calls != 1 { + t.Fatalf("expected single RPC, got %d", stub.calls) + } +} + +func TestCachedOrgResolverExpiresEntries(t *testing.T) { + stub := &describeStub{ + responses: map[string]*orgpb.Organization{ + "org-2": {OrgId: "org-2", Name: "Beta Org"}, + }, + } + now := time.Unix(0, 0) + resolver := &cachedOrgResolver{ + client: stub, + timeout: time.Second, + ttl: 30 * time.Second, + now: func() time.Time { + return now + }, + cache: make(map[string]orgCacheEntry), + } + + if _, err := resolver.Resolve(context.Background(), "org-2"); err != nil { + t.Fatalf("resolve failed: %v", err) + } + if stub.calls != 1 { + t.Fatalf("expected call count 1, got %d", stub.calls) + } + + now = now.Add(time.Minute) + if _, err := resolver.Resolve(context.Background(), "org-2"); err != nil { + t.Fatalf("resolve after expiry failed: %v", err) + } + if stub.calls != 2 { + t.Fatalf("expected cache miss after expiry (2 calls), got %d", stub.calls) + } +} + +func TestCachedOrgResolverPropagatesErrors(t *testing.T) { + stub := &describeStub{err: errors.New("boom")} + resolver := &cachedOrgResolver{ + client: stub, + timeout: time.Second, + ttl: time.Minute, + now: time.Now, + cache: make(map[string]orgCacheEntry), + } + + if _, err := resolver.Resolve(context.Background(), "org-3"); err == nil { + t.Fatalf("expected error") + } +} + +func TestCachedOrgResolverConcurrentAccess(t *testing.T) { + stub := &describeStub{ + responses: map[string]*orgpb.Organization{ + "org-1": {OrgId: "org-1", Name: "Org One"}, + "org-2": {OrgId: "org-2", Name: "Org Two"}, + "org-3": {OrgId: "org-3", Name: "Org Three"}, + }, + } + resolver := &cachedOrgResolver{ + client: stub, + timeout: time.Second, + ttl: time.Minute, + now: time.Now, + cache: make(map[string]orgCacheEntry), + } + + // Pre-populate cache with org-1 to test cached access pattern + _, err := resolver.Resolve(context.Background(), "org-1") + if err != nil { + t.Fatalf("failed to pre-populate cache: %v", err) + } + initialCalls := stub.calls // Should be 1 + + const goroutines = 50 + const iterations = 10 + + done := make(chan bool, goroutines) + errCh := make(chan error, goroutines*iterations) + + // Launch multiple goroutines that concurrently access the resolver + for i := 0; i < goroutines; i++ { + go func(id int) { + defer func() { done <- true }() + for j := 0; j < iterations; j++ { + // Each goroutine accesses all three orgs + // org-1 is cached, org-2 and org-3 need to be fetched + orgIDs := []string{"org-1", "org-2", "org-3"} + for _, orgID := range orgIDs { + name, err := resolver.Resolve(context.Background(), orgID) + if err != nil { + errCh <- err + return + } + // Verify we got a valid name + if name == "" { + errCh <- errors.New("got empty org name") + return + } + } + } + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < goroutines; i++ { + <-done + } + close(errCh) + + // Check for any errors + for err := range errCh { + t.Fatalf("concurrent access error: %v", err) + } + + // Verify caching worked: + // - org-1 was pre-cached: 0 additional calls + // - org-2 and org-3: at most a few calls each (due to concurrent first access) + // Total should be much less than goroutines * iterations * 2 (uncached orgs) + totalCalls := stub.calls + expectedMax := initialCalls + 20 // Allow some concurrent fetches for org-2 and org-3 + + if totalCalls > expectedMax { + t.Fatalf("too many RPC calls: expected at most %d, got %d (caching may not be working)", expectedMax, totalCalls) + } + + // Verify we made at least some calls for org-2 and org-3 + if totalCalls <= initialCalls { + t.Fatalf("expected additional calls for org-2 and org-3, got %d total (%d initial)", totalCalls, initialCalls) + } +} diff --git a/mcp_server/pkg/tools/jobs/describe.go b/mcp_server/pkg/tools/jobs/describe.go index 0dd2bd70a..9c00cef88 100644 --- a/mcp_server/pkg/tools/jobs/describe.go +++ b/mcp_server/pkg/tools/jobs/describe.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -87,23 +86,8 @@ func describeHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(describeToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, describeToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -189,7 +173,7 @@ Troubleshooting: markdown := formatJobMarkdown(summary, mode) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/jobs/logs.go b/mcp_server/pkg/tools/jobs/logs.go index 68ff307c4..66fad5f90 100644 --- a/mcp_server/pkg/tools/jobs/logs.go +++ b/mcp_server/pkg/tools/jobs/logs.go @@ -5,7 +5,6 @@ import ( "fmt" "strconv" "strings" - "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -107,23 +106,8 @@ func logsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(logsToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, logsToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -207,7 +191,14 @@ Troubleshooting: return result, callErr } if result != nil && !result.IsError { - success = true + // For self-hosted jobs, also verify that a token was actually generated + if job.GetSelfHosted() { + if structured, ok := result.StructuredContent.(logsResult); ok && structured.Token != "" { + tracker.MarkSuccess() + } + } else { + tracker.MarkSuccess() + } } return result, nil } diff --git a/mcp_server/pkg/tools/organizations/organizations.go b/mcp_server/pkg/tools/organizations/organizations.go index 0152e5f4c..65df8ddf6 100644 --- a/mcp_server/pkg/tools/organizations/organizations.go +++ b/mcp_server/pkg/tools/organizations/organizations.go @@ -136,23 +136,8 @@ type organizationDetails struct { func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - metrics := shared.NewToolMetrics(listToolName, "") - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, listToolName, "") + defer tracker.Cleanup() client := api.Organizations() if client == nil { @@ -256,7 +241,7 @@ The RBAC service confirms which organizations the authenticated user can access. markdown := formatOrganizationsMarkdown(result.Organizations, mode, "") markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{mcp.NewTextContent(markdown)}, StructuredContent: result, @@ -304,7 +289,7 @@ The organization service could not describe the permitted organizations. Retry i markdown := formatOrganizationsMarkdown(result.Organizations, mode, "") markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{mcp.NewTextContent(markdown)}, StructuredContent: result, @@ -352,7 +337,7 @@ The organization service could not describe the permitted organizations. Retry i markdown := formatOrganizationsMarkdown(orgs, mode, result.NextCursor) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/pipelines/pipelines.go b/mcp_server/pkg/tools/pipelines/pipelines.go index 241f93bf6..9cb68d5a4 100644 --- a/mcp_server/pkg/tools/pipelines/pipelines.go +++ b/mcp_server/pkg/tools/pipelines/pipelines.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strings" - "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -242,23 +241,8 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(listToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, listToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -452,7 +436,7 @@ Check that: markdown := formatPipelineListMarkdown(result, mode, workflowID, projectID, orgID, limit) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -478,23 +462,8 @@ func jobsHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(jobsToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, jobsToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -633,7 +602,7 @@ Troubleshooting: markdown := formatPipelineJobsMarkdown(result, mode, orgID) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/projects/projects.go b/mcp_server/pkg/tools/projects/projects.go index 03c12ac3a..3323514f2 100644 --- a/mcp_server/pkg/tools/projects/projects.go +++ b/mcp_server/pkg/tools/projects/projects.go @@ -5,7 +5,6 @@ import ( "fmt" "sort" "strings" - "time" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -471,23 +470,8 @@ You can discover organizations by calling organizations_list first.`), nil Example: projects_list(organization_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")`, err)), nil } - metrics := shared.NewToolMetrics(listToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, listToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -604,7 +588,7 @@ Try removing optional filters or verifying access permissions.`, err)), nil markdown := formatProjectListMarkdown(result, mode, orgID) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -632,23 +616,8 @@ Check INTERNAL_API_URL_PROJECT or MCP_PROJECT_GRPC_ENDPOINT and ensure ProjectHu return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(searchToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, searchToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -837,7 +806,7 @@ Ensure you have permission to list projects in organization %s.`, err, orgID)), markdown := formatProjectSearchMarkdown(result, mode, orgID, queryDisplay, repoDisplay, limit, maxPages) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), @@ -876,7 +845,7 @@ Ensure you have permission to list projects in organization %s.`, err, orgID)), markdown := formatProjectSearchMarkdown(result, mode, orgID, queryDisplay, repoDisplay, limit, maxPages) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/pkg/tools/workflows/workflows.go b/mcp_server/pkg/tools/workflows/workflows.go index 02d50dae1..2c7666a86 100644 --- a/mcp_server/pkg/tools/workflows/workflows.go +++ b/mcp_server/pkg/tools/workflows/workflows.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" @@ -158,23 +157,8 @@ func listHandler(api internalapi.Provider) server.ToolHandlerFunc { return mcp.NewToolResultError(err.Error()), nil } - metrics := shared.NewToolMetrics(searchToolName, orgID) - if metrics != nil { - metrics.IncrementTotal() - } - start := time.Now() - success := false - defer func() { - if metrics == nil { - return - } - metrics.TrackDuration(start) - if success { - metrics.IncrementSuccess() - } else { - metrics.IncrementFailure() - } - }() + tracker := shared.TrackToolExecution(ctx, searchToolName, orgID) + defer tracker.Cleanup() if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { return mcp.NewToolResultError(err.Error()), nil @@ -361,7 +345,7 @@ Double-check that: markdown := formatWorkflowsMarkdown(result, mode, projectID, orgID, branch, requesterFilter, myWorkflowsOnly, userID, limit) markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) - success = true + tracker.MarkSuccess() return &mcp.CallToolResult{ Content: []mcp.Content{ mcp.NewTextContent(markdown), diff --git a/mcp_server/test/support/stubs.go b/mcp_server/test/support/stubs.go index 994badd9e..f53050069 100644 --- a/mcp_server/test/support/stubs.go +++ b/mcp_server/test/support/stubs.go @@ -116,18 +116,19 @@ func (p *pipelineStub) ListKeyset(ctx context.Context, in *pipelinepb.ListKeyset return &pipelinepb.ListKeysetResponse{ Pipelines: []*pipelinepb.Pipeline{ { - PplId: "ppl-local", - Name: "Build", - WfId: orDefault(in.GetWfId(), "wf-local"), - ProjectId: orDefault(in.GetProjectId(), "project-local"), - BranchName: "main", - CommitSha: "abcdef0", - State: pipelinepb.Pipeline_RUNNING, - Result: pipelinepb.Pipeline_PASSED, - ResultReason: pipelinepb.Pipeline_TEST, - CreatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), - Queue: &pipelinepb.Queue{QueueId: "queue-local", Name: "default", Type: pipelinepb.QueueType_IMPLICIT}, - Triggerer: &pipelinepb.Triggerer{PplTriggeredBy: pipelinepb.TriggeredBy_WORKFLOW}, + PplId: "ppl-local", + Name: "Build", + WfId: orDefault(in.GetWfId(), "wf-local"), + ProjectId: orDefault(in.GetProjectId(), "project-local"), + OrganizationId: "org-local", + BranchName: "main", + CommitSha: "abcdef0", + State: pipelinepb.Pipeline_RUNNING, + Result: pipelinepb.Pipeline_PASSED, + ResultReason: pipelinepb.Pipeline_TEST, + CreatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + Queue: &pipelinepb.Queue{QueueId: "queue-local", Name: "default", Type: pipelinepb.QueueType_IMPLICIT}, + Triggerer: &pipelinepb.Triggerer{PplTriggeredBy: pipelinepb.TriggeredBy_WORKFLOW}, }, }, }, nil @@ -137,10 +138,11 @@ func (p *pipelineStub) Describe(ctx context.Context, in *pipelinepb.DescribeRequ return &pipelinepb.DescribeResponse{ ResponseStatus: &pipelinepb.ResponseStatus{Code: pipelinepb.ResponseStatus_OK}, Pipeline: &pipelinepb.Pipeline{ - PplId: orDefault(in.GetPplId(), "ppl-local"), - Name: "Build", - ProjectId: "project-local", - WfId: "wf-local", + PplId: orDefault(in.GetPplId(), "ppl-local"), + Name: "Build", + ProjectId: "project-local", + OrganizationId: "org-local", + WfId: "wf-local", }, Blocks: []*pipelinepb.Block{ { @@ -249,6 +251,22 @@ type organizationStub struct { orgpb.OrganizationServiceClient } +func (o *organizationStub) Describe(ctx context.Context, in *orgpb.DescribeRequest, opts ...grpc.CallOption) (*orgpb.DescribeResponse, error) { + orgID := in.GetOrgId() + if orgID == "" { + orgID = "org-local" + } + return &orgpb.DescribeResponse{ + Status: &responsepb.ResponseStatus{Code: responsepb.ResponseStatus_OK}, + Organization: &orgpb.Organization{ + OrgId: orgID, + Name: "Local Org", + OrgUsername: "local-org", + OwnerId: "user-local", + }, + }, nil +} + func (o *organizationStub) List(ctx context.Context, in *orgpb.ListRequest, opts ...grpc.CallOption) (*orgpb.ListResponse, error) { return &orgpb.ListResponse{ Status: &responsepb.ResponseStatus{Code: responsepb.ResponseStatus_OK},