Skip to content

sirrobot01/mcpulse-go

Repository files navigation

MCPulse Go SDK

Official Go SDK for MCPulse - the analytics and observability platform for MCP servers.

Features

  • gRPC and REST transport - Choose between high-performance gRPC or simple HTTP REST
  • Automatic buffering - Metrics are buffered and flushed periodically
  • Parameter sanitization - Automatically redacts sensitive keys like passwords and tokens
  • Retry logic - Built-in exponential backoff for failed requests
  • Sampling support - Control metric collection with configurable sample rates
  • Session tracking - Built-in session context management
  • Middleware pattern - Wrap tool handlers with automatic tracking
  • Zero-allocation hot path - Optimized for performance

Installation

go get github.com/sirrobot01/mcpulse-go

Quick Start

Authentication

If your MCPulse server requires authentication, you need an API key:

  1. Log in to your MCPulse dashboard
  2. Navigate to API Keys page
  3. Click Create API Key and give it a name
  4. Copy the generated key (shown only once!)

Basic Usage with Middleware

package main

import (
    "context"
    "fmt"

    mcpulse "github.com/sirrobot01/mcpulse-go"
)

func main() {
    ctx := context.Background()

    // Create configuration
    config := mcpulse.DefaultConfig("my-mcp-server")
    config.GRPCEndpoint = "localhost:9090"
    config.APIKey = "mcpulse_..." // API key from MCPulse UI (required if auth enabled)

    // Create middleware
    middleware, err := mcpulse.NewMiddleware(ctx, config)
    if err != nil {
        panic(err)
    }
    defer middleware.Close(ctx)

    // Track a tool call
    result, err := middleware.TrackToolCall(ctx, "calculate", map[string]interface{}{
        "operation": "add",
        "a": 5,
        "b": 3,
    }, func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
        // Your tool implementation
        a := params["a"].(int)
        b := params["b"].(int)
        return a + b, nil
    })

    if err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("Result:", result)
    }
}

With Session Tracking

// Start a session
sessionMgr := mcpulse.NewSessionManager(middleware)
ctx, session := sessionMgr.StartSession(ctx)

fmt.Println("Session ID:", session.ID)

// All tracked calls within this context will be associated with this session
result, _ := middleware.TrackToolCall(ctx, "search", params, handler)

Direct Client Usage

// Create a low-level client for direct control
client, err := mcpulse.New(ctx, mcpulse.ClientOptions{
    Transport:    mcpulse.TransportGRPC,
    GRPCEndpoint: "localhost:9090",
})
if err != nil {
    panic(err)
}
defer client.Close()

// Manually create and send metrics
metrics := []mcpulse.ToolCallMetric{
    {
        ID:         "metric-1",
        Timestamp:  time.Now(),
        ServerID:   "my-server",
        ToolName:   "calculate",
        DurationMS: 42,
        Status:     "success",
        Parameters: map[string]interface{}{
            "operation": "add",
            "a": 5,
            "b": 3,
        },
    },
}

result, err := client.IngestMetrics(ctx, metrics)
fmt.Printf("Accepted: %d, Rejected: %d\n", result.Accepted, result.Rejected)

Streaming Ingestion (gRPC only)

// Stream batches
batches := make(chan []mcpulse.ToolCallMetric)

go func() {
    defer close(batches)
    for i := 0; i < 10; i++ {
        batches <- []mcpulse.ToolCallMetric{
            // ... metrics
        }
    }
}()

result, err := client.IngestMetricsStream(ctx, batches)

Real-time Metric Streaming (gRPC only)

// Subscribe to real-time updates
updates, errors, err := client.StreamMetrics(ctx, "my-server", []string{"metrics", "errors"})
if err != nil {
    panic(err)
}

for {
    select {
    case update := <-updates:
        fmt.Printf("Update: %+v\n", update)
    case err := <-errors:
        fmt.Printf("Error: %v\n", err)
        return
    case <-ctx.Done():
        return
    }
}

Configuration

config := mcpulse.Config{
    ServerID:   "my-mcp-server",
    Transport:  mcpulse.TransportGRPC,

    // Endpoints
    GRPCEndpoint: "localhost:9090",

    // Collection settings
    EnableParamCollection: true,
    AsyncCollection:       true,
    BufferSize:            100,
    FlushInterval:         5 * time.Second,
    MaxBatchSize:          1000,

    // Privacy
    SanitizeParams: true,
    SensitiveKeys:  []string{"password", "token", "api_key"},

    // Sampling
    SampleRate: 1.0, // 1.0 = 100%, 0.1 = 10%

    // Retry
    MaxRetries:   3,
    RetryBackoff: 1 * time.Second,

    // Timeout
    Timeout: 10 * time.Second,

    // Protocol metadata
    ProtocolVersion: "2024-11-05",
    ClientName:      "mcpulse-go",
    ClientVersion:   "1.0.0",
}

API Reference

Middleware

  • NewMiddleware(ctx, config) - Create a new middleware instance
  • TrackToolCall(ctx, toolName, params, handler) - Track a tool call with automatic metrics
  • TrackToolCallWithOptions(ctx, opts, handler) - Track with custom options
  • Flush(ctx) - Manually flush buffered metrics
  • Close(ctx) - Close and flush remaining metrics

Client

  • New(ctx, opts) - Create a new low-level client
  • IngestMetrics(ctx, metrics) - Send a batch of metrics
  • IngestMetricsStream(ctx, batches) - Stream batches (gRPC only)
  • StreamMetrics(ctx, serverID, topics) - Subscribe to real-time updates (gRPC only)
  • Close() - Close the client connection

Session

  • NewSession() - Create a new session with generated ID
  • NewSessionWithID(id) - Create a session with specific ID
  • WithSession(ctx, session) - Add session to context
  • GetSession(ctx) - Get session from context
  • GetSessionID(ctx) - Get session ID from context

Examples

See the examples directory for more complete examples.

License

Same as parent MCPulse project.

About

MCPulse Go SDK

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages