diff --git a/.mockery.yml b/.mockery.yml index 7459677..82650e4 100644 --- a/.mockery.yml +++ b/.mockery.yml @@ -26,3 +26,7 @@ packages: config: dir: "{{.InterfaceDir}}" filename: "mock_{{.InterfaceName}}.go" + CircuitBreaker: + config: + dir: "{{.InterfaceDir}}" + filename: "mock_{{.InterfaceName}}.go" diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index cf4c02f..0eacc59 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -79,6 +79,15 @@ func main() { return } + // Start sync circuit breaker service + syncCircuitBreakerService := daemon.NewSyncCircuitBreakerService(pubsub) + if err := syncCircuitBreakerService.Start(ctx); err != nil { + slog.Error("Failed to start sync circuit breaker service", slog.Any("err", err)) + } else { + slog.Info("Sync circuit breaker service started") + defer syncCircuitBreakerService.Stop() + } + go daemon.SocketTopicProccessor(msg) // Start CCUsage service if enabled (v1 - ccusage CLI based) diff --git a/daemon/circuit_breaker.go b/daemon/circuit_breaker.go new file mode 100644 index 0000000..2b19a99 --- /dev/null +++ b/daemon/circuit_breaker.go @@ -0,0 +1,54 @@ +package daemon + +import ( + "context" + "encoding/json" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/malamtime/cli/model" +) + +// DaemonCircuitBreaker defines the interface for daemon-specific circuit breaker operations +type DaemonCircuitBreaker interface { + IsOpen() bool + RecordSuccess() + RecordFailure() + SaveForRetry(ctx context.Context, payload interface{}) error +} + +// Global instance +var syncCircuitBreaker DaemonCircuitBreaker + +// SyncCircuitBreakerWrapper wraps model.CircuitBreakerService with daemon-specific logic +type SyncCircuitBreakerWrapper struct { + *model.CircuitBreakerService +} + +// NewSyncCircuitBreakerService creates a new daemon-specific circuit breaker service +func NewSyncCircuitBreakerService(publisher message.Publisher) *SyncCircuitBreakerWrapper { + republishFn := func(data []byte) error { + msg := message.NewMessage(watermill.NewUUID(), data) + return publisher.Publish(PubSubTopic, msg) + } + + svc := model.NewCircuitBreakerService(model.CircuitBreakerConfig{}, republishFn) + wrapper := &SyncCircuitBreakerWrapper{ + CircuitBreakerService: svc, + } + syncCircuitBreaker = wrapper + return wrapper +} + +// SaveForRetry wraps payload in SocketMessage before saving +func (w *SyncCircuitBreakerWrapper) SaveForRetry(ctx context.Context, payload interface{}) error { + socketMsg := SocketMessage{ + Type: SocketMessageTypeSync, + Payload: payload, + } + jsonData, err := json.Marshal(socketMsg) + if err != nil { + return err + } + return w.CircuitBreakerService.SaveForRetry(ctx, jsonData) +} diff --git a/daemon/handlers.sync.go b/daemon/handlers.sync.go index b93866c..0d34034 100644 --- a/daemon/handlers.sync.go +++ b/daemon/handlers.sync.go @@ -11,6 +11,16 @@ import ( ) func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { + // Check circuit breaker first + if syncCircuitBreaker != nil && syncCircuitBreaker.IsOpen() { + slog.Error("Circuit breaker is open, saving sync data locally for later retry") + if err := syncCircuitBreaker.SaveForRetry(ctx, socketMsgPayload); err != nil { + slog.Error("Failed to save sync data for retry", slog.Any("err", err)) + return err + } + return nil // Return nil to ack the message + } + pb, err := json.Marshal(socketMsgPayload) if err != nil { slog.Error("Failed to marshal the sync payload again for unmarshal", slog.Any("payload", socketMsgPayload)) @@ -98,8 +108,15 @@ func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { ) if err != nil { + if syncCircuitBreaker != nil { + syncCircuitBreaker.RecordFailure() + } slog.Error("Failed to sync data to server", slog.Any("err", err)) return err } + + if syncCircuitBreaker != nil { + syncCircuitBreaker.RecordSuccess() + } return nil } diff --git a/model/circuit_breaker.go b/model/circuit_breaker.go new file mode 100644 index 0000000..10e0df9 --- /dev/null +++ b/model/circuit_breaker.go @@ -0,0 +1,264 @@ +package model + +import ( + "bufio" + "context" + "fmt" + "log/slog" + "os" + "sync" + "time" +) + +const ( + DefaultMaxConsecutiveFailures = 10 + DefaultCircuitResetInterval = 1 * time.Hour +) + +// CircuitBreaker defines the interface for circuit breaker operations +type CircuitBreaker interface { + IsOpen() bool + RecordSuccess() + RecordFailure() + SaveForRetry(ctx context.Context, payload []byte) error +} + +// CircuitBreakerConfig holds configuration for the circuit breaker service +type CircuitBreakerConfig struct { + MaxConsecutiveFailures int + ResetInterval time.Duration +} + +// RepublishFunc is called when retrying pending data +type RepublishFunc func(data []byte) error + +// CircuitBreakerService handles circuit breaker with retry functionality +type CircuitBreakerService struct { + mu sync.RWMutex + consecutiveFailures int + isOpen bool + config CircuitBreakerConfig + republishFn RepublishFunc + ticker *time.Ticker + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewCircuitBreakerService creates a new circuit breaker service +func NewCircuitBreakerService(config CircuitBreakerConfig, republishFn RepublishFunc) *CircuitBreakerService { + if config.MaxConsecutiveFailures <= 0 { + config.MaxConsecutiveFailures = DefaultMaxConsecutiveFailures + } + if config.ResetInterval <= 0 { + config.ResetInterval = DefaultCircuitResetInterval + } + return &CircuitBreakerService{ + config: config, + republishFn: republishFn, + stopChan: make(chan struct{}), + } +} + +// Start begins the periodic reset/retry timer +func (s *CircuitBreakerService) Start(ctx context.Context) error { + s.ticker = time.NewTicker(s.config.ResetInterval) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + for { + select { + case <-s.ticker.C: + s.checkAndRetry(ctx) + case <-s.stopChan: + return + case <-ctx.Done(): + return + } + } + }() + + slog.Info("Circuit breaker service started", slog.Duration("interval", s.config.ResetInterval)) + return nil +} + +// Stop stops the circuit breaker service +func (s *CircuitBreakerService) Stop() { + if s.ticker != nil { + s.ticker.Stop() + } + close(s.stopChan) + s.wg.Wait() + slog.Info("Circuit breaker service stopped") +} + +// IsOpen returns true if circuit is open +func (s *CircuitBreakerService) IsOpen() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.isOpen +} + +// RecordSuccess resets failure counter and closes circuit +func (s *CircuitBreakerService) RecordSuccess() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures = 0 + s.isOpen = false +} + +// RecordFailure increments failure counter, opens circuit at threshold +func (s *CircuitBreakerService) RecordFailure() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures++ + if s.consecutiveFailures >= s.config.MaxConsecutiveFailures { + if !s.isOpen { + slog.Error("Circuit breaker opened due to consecutive failures - server may be experiencing issues", + slog.Int("failures", s.consecutiveFailures)) + } + s.isOpen = true + } +} + +// SaveForRetry saves payload to file for later retry +func (s *CircuitBreakerService) SaveForRetry(ctx context.Context, payload []byte) error { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", SYNC_PENDING_FILE)) + + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + _, err = file.Write(payload) + if err != nil { + return err + } + _, err = file.WriteString("\n") + if err != nil { + return err + } + + slog.Info("Saved data for later retry") + return nil +} + +// GetConsecutiveFailures returns the current failure count (for testing) +func (s *CircuitBreakerService) GetConsecutiveFailures() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.consecutiveFailures +} + +func (s *CircuitBreakerService) checkAndRetry(ctx context.Context) { + s.mu.Lock() + if s.isOpen { + slog.Info("Circuit breaker reset by timer, attempting to retry saved data") + s.isOpen = false + s.consecutiveFailures = 0 + } + s.mu.Unlock() + + s.retryPendingData(ctx) +} + +func (s *CircuitBreakerService) retryPendingData(ctx context.Context) { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", SYNC_PENDING_FILE)) + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + slog.Debug("No pending sync file found, nothing to retry") + return + } + + file, err := os.Open(filePath) + if err != nil { + slog.Error("Failed to open pending sync file for retry", slog.Any("err", err)) + return + } + + var lines []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + lines = append(lines, line) + } + } + file.Close() + + if err := scanner.Err(); err != nil { + slog.Error("Error reading pending sync file", slog.Any("err", err)) + return + } + + if len(lines) == 0 { + slog.Debug("No pending sync data to retry") + return + } + + slog.Info("Starting sync data retry", slog.Int("pendingCount", len(lines))) + + var failedLines []string + successCount := 0 + + for _, line := range lines { + if s.republishFn == nil { + slog.Error("No republish function configured") + failedLines = append(failedLines, line) + continue + } + + if err := s.republishFn([]byte(line)); err != nil { + slog.Warn("Failed to republish sync data, keeping for next retry", slog.Any("err", err)) + failedLines = append(failedLines, line) + } else { + successCount++ + } + } + + if err := s.rewriteLogFile(filePath, failedLines); err != nil { + slog.Error("Failed to update pending sync file", slog.Any("err", err)) + return + } + + slog.Info("Sync data retry completed", + slog.Int("republished", successCount), + slog.Int("remaining", len(failedLines))) +} + +func (s *CircuitBreakerService) rewriteLogFile(logFilePath string, lines []string) error { + if len(lines) == 0 { + if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove empty log file: %w", err) + } + return nil + } + + tempFile := logFilePath + ".tmp" + file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + for _, line := range lines { + if _, err := file.WriteString(line + "\n"); err != nil { + file.Close() + os.Remove(tempFile) + return fmt.Errorf("failed to write to temp file: %w", err) + } + } + + if err := file.Close(); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + if err := os.Rename(tempFile, logFilePath); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} diff --git a/model/circuit_breaker_test.go b/model/circuit_breaker_test.go new file mode 100644 index 0000000..43e1bc2 --- /dev/null +++ b/model/circuit_breaker_test.go @@ -0,0 +1,346 @@ +package model + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCircuitBreakerService_NewWithDefaults(t *testing.T) { + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + assert.Equal(t, DefaultMaxConsecutiveFailures, svc.config.MaxConsecutiveFailures) + assert.Equal(t, DefaultCircuitResetInterval, svc.config.ResetInterval) + assert.False(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_NewWithCustomConfig(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 5, + ResetInterval: 30 * time.Minute, + } + svc := NewCircuitBreakerService(config, nil) + + assert.Equal(t, 5, svc.config.MaxConsecutiveFailures) + assert.Equal(t, 30*time.Minute, svc.config.ResetInterval) +} + +func TestCircuitBreakerService_IsOpen(t *testing.T) { + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + assert.False(t, svc.IsOpen(), "circuit should start closed") +} + +func TestCircuitBreakerService_RecordFailure(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // First two failures should not open circuit + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 1, svc.GetConsecutiveFailures()) + + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 2, svc.GetConsecutiveFailures()) + + // Third failure should open circuit + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + assert.Equal(t, 3, svc.GetConsecutiveFailures()) + + // Additional failures should keep circuit open + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + assert.Equal(t, 4, svc.GetConsecutiveFailures()) +} + +func TestCircuitBreakerService_RecordSuccess(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // Open the circuit + svc.RecordFailure() + svc.RecordFailure() + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + + // Success should close circuit and reset counter + svc.RecordSuccess() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 0, svc.GetConsecutiveFailures()) +} + +func TestCircuitBreakerService_RecordSuccess_ResetsCounter(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 5, + } + svc := NewCircuitBreakerService(config, nil) + + // Record some failures + svc.RecordFailure() + svc.RecordFailure() + assert.Equal(t, 2, svc.GetConsecutiveFailures()) + + // Success resets counter + svc.RecordSuccess() + assert.Equal(t, 0, svc.GetConsecutiveFailures()) + + // Now it takes 5 more failures to open circuit + for i := 0; i < 4; i++ { + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + } + svc.RecordFailure() + assert.True(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_SaveForRetry(t *testing.T) { + // Create temp directory + tempDir := t.TempDir() + + // Override SYNC_PENDING_FILE for test + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + // Override HOME for test + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + ctx := context.Background() + payload := []byte(`{"type":"sync","payload":{"data":"test"}}`) + + err := svc.SaveForRetry(ctx, payload) + require.NoError(t, err) + + // Read the file and verify content + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + assert.Contains(t, string(content), `{"type":"sync","payload":{"data":"test"}}`) +} + +func TestCircuitBreakerService_SaveForRetry_AppendsMultiple(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + ctx := context.Background() + + // Save multiple payloads + err := svc.SaveForRetry(ctx, []byte(`{"id":1}`)) + require.NoError(t, err) + err = svc.SaveForRetry(ctx, []byte(`{"id":2}`)) + require.NoError(t, err) + err = svc.SaveForRetry(ctx, []byte(`{"id":3}`)) + require.NoError(t, err) + + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + + lines := string(content) + assert.Contains(t, lines, `{"id":1}`) + assert.Contains(t, lines, `{"id":2}`) + assert.Contains(t, lines, `{"id":3}`) +} + +func TestCircuitBreakerService_RetryPendingData(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + // Create pending file with test data + testData := `{"id":1} +{"id":2} +{"id":3} +` + err := os.WriteFile(SYNC_PENDING_FILE, []byte(testData), 0644) + require.NoError(t, err) + + var republishedData []string + var mu sync.Mutex + + republishFn := func(data []byte) error { + mu.Lock() + defer mu.Unlock() + republishedData = append(republishedData, string(data)) + return nil + } + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, republishFn) + + // Trigger retry + ctx := context.Background() + svc.retryPendingData(ctx) + + // Verify all data was republished + mu.Lock() + defer mu.Unlock() + assert.Len(t, republishedData, 3) + assert.Contains(t, republishedData, `{"id":1}`) + assert.Contains(t, republishedData, `{"id":2}`) + assert.Contains(t, republishedData, `{"id":3}`) + + // File should be removed after successful retry + _, err = os.Stat(SYNC_PENDING_FILE) + assert.True(t, os.IsNotExist(err)) +} + +func TestCircuitBreakerService_RetryPendingData_PartialFailure(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + // Create pending file with test data + testData := `{"id":1} +{"id":2} +{"id":3} +` + err := os.WriteFile(SYNC_PENDING_FILE, []byte(testData), 0644) + require.NoError(t, err) + + callCount := 0 + republishFn := func(data []byte) error { + callCount++ + // Fail on second item + if callCount == 2 { + return assert.AnError + } + return nil + } + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, republishFn) + + ctx := context.Background() + svc.retryPendingData(ctx) + + // File should still exist with failed item + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + assert.Contains(t, string(content), `{"id":2}`) + assert.NotContains(t, string(content), `{"id":1}`) + assert.NotContains(t, string(content), `{"id":3}`) +} + +func TestCircuitBreakerService_StartStop(t *testing.T) { + config := CircuitBreakerConfig{ + ResetInterval: 100 * time.Millisecond, // Short interval for testing + } + svc := NewCircuitBreakerService(config, nil) + + ctx := context.Background() + err := svc.Start(ctx) + require.NoError(t, err) + + // Stop should not block + done := make(chan struct{}) + go func() { + svc.Stop() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(1 * time.Second): + t.Fatal("Stop blocked for too long") + } +} + +func TestCircuitBreakerService_ThreadSafety(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 100, + } + svc := NewCircuitBreakerService(config, nil) + + var wg sync.WaitGroup + iterations := 100 + + // Concurrent RecordFailure + wg.Add(iterations) + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + svc.RecordFailure() + }() + } + wg.Wait() + + assert.Equal(t, 100, svc.GetConsecutiveFailures()) + assert.True(t, svc.IsOpen()) + + // Concurrent RecordSuccess should reset + wg.Add(iterations) + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + svc.RecordSuccess() + }() + } + wg.Wait() + + assert.Equal(t, 0, svc.GetConsecutiveFailures()) + assert.False(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_CheckAndRetry_ResetsCircuit(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // Open the circuit + svc.RecordFailure() + svc.RecordFailure() + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + + // checkAndRetry should reset the circuit + ctx := context.Background() + svc.checkAndRetry(ctx) + + assert.False(t, svc.IsOpen()) + assert.Equal(t, 0, svc.GetConsecutiveFailures()) +} diff --git a/model/db.go b/model/db.go index eedf778..43d5ca1 100644 --- a/model/db.go +++ b/model/db.go @@ -24,6 +24,7 @@ var ( COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" + SYNC_PENDING_FILE = COMMAND_BASE_STORAGE_FOLDER + "/sync-pending.jsonl" ) func InitFolder(baseFolder string) { @@ -36,6 +37,7 @@ func InitFolder(baseFolder string) { COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" + SYNC_PENDING_FILE = COMMAND_BASE_STORAGE_FOLDER + "/sync-pending.jsonl" } // key: ${shell}|${sessionID}|${command}|${username}