From 30dd270da3da8b027c305e7413a61de7ea21f3c9 Mon Sep 17 00:00:00 2001 From: AnnatarHe Date: Thu, 25 Dec 2025 20:14:38 +0800 Subject: [PATCH 1/2] feat(daemon): add circuit breaker for sync handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a circuit breaker pattern to prevent overwhelming the server when it's experiencing errors: - After 10 consecutive failures, circuit opens and saves data locally - 1-hour timer resets circuit and republishes saved data to pub/sub - Data is persisted to ~/.shelltime/sync-pending.jsonl when circuit open - Uses interface pattern for testability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- cmd/daemon/main.go | 9 ++ daemon/circuit_breaker.go | 250 ++++++++++++++++++++++++++++++++++++++ daemon/handlers.sync.go | 17 +++ model/db.go | 2 + 4 files changed, 278 insertions(+) create mode 100644 daemon/circuit_breaker.go diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index cf4c02f..0eacc59 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -79,6 +79,15 @@ func main() { return } + // Start sync circuit breaker service + syncCircuitBreakerService := daemon.NewSyncCircuitBreakerService(pubsub) + if err := syncCircuitBreakerService.Start(ctx); err != nil { + slog.Error("Failed to start sync circuit breaker service", slog.Any("err", err)) + } else { + slog.Info("Sync circuit breaker service started") + defer syncCircuitBreakerService.Stop() + } + go daemon.SocketTopicProccessor(msg) // Start CCUsage service if enabled (v1 - ccusage CLI based) diff --git a/daemon/circuit_breaker.go b/daemon/circuit_breaker.go new file mode 100644 index 0000000..b3c8173 --- /dev/null +++ b/daemon/circuit_breaker.go @@ -0,0 +1,250 @@ +package daemon + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "sync" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/malamtime/cli/model" +) + +const ( + maxConsecutiveFailures = 10 + CircuitBreakerResetInterval = 1 * time.Hour +) + +// CircuitBreaker defines the interface for circuit breaker operations +type CircuitBreaker interface { + IsOpen() bool + RecordSuccess() + RecordFailure() + SaveForRetry(ctx context.Context, payload interface{}) error +} + +// SyncCircuitBreakerService handles circuit breaker with retry functionality +type SyncCircuitBreakerService struct { + mu sync.RWMutex + consecutiveFailures int + isOpen bool + publisher message.Publisher + ticker *time.Ticker + stopChan chan struct{} + wg sync.WaitGroup +} + +// Global instance +var syncCircuitBreaker CircuitBreaker + +// NewSyncCircuitBreakerService creates a new circuit breaker service +func NewSyncCircuitBreakerService(publisher message.Publisher) *SyncCircuitBreakerService { + svc := &SyncCircuitBreakerService{ + publisher: publisher, + stopChan: make(chan struct{}), + } + syncCircuitBreaker = svc + return svc +} + +// Start begins the periodic reset/retry timer +func (s *SyncCircuitBreakerService) Start(ctx context.Context) error { + s.ticker = time.NewTicker(CircuitBreakerResetInterval) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + for { + select { + case <-s.ticker.C: + s.checkAndRetry(ctx) + case <-s.stopChan: + return + case <-ctx.Done(): + return + } + } + }() + + slog.Info("Sync circuit breaker service started", slog.Duration("interval", CircuitBreakerResetInterval)) + return nil +} + +// Stop stops the circuit breaker service +func (s *SyncCircuitBreakerService) Stop() { + if s.ticker != nil { + s.ticker.Stop() + } + close(s.stopChan) + s.wg.Wait() + slog.Info("Sync circuit breaker service stopped") +} + +func (s *SyncCircuitBreakerService) IsOpen() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.isOpen +} + +func (s *SyncCircuitBreakerService) RecordSuccess() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures = 0 + s.isOpen = false +} + +func (s *SyncCircuitBreakerService) RecordFailure() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures++ + if s.consecutiveFailures >= maxConsecutiveFailures { + if !s.isOpen { + slog.Error("Circuit breaker opened due to consecutive failures - server may be experiencing issues", + slog.Int("failures", s.consecutiveFailures)) + } + s.isOpen = true + } +} + +func (s *SyncCircuitBreakerService) SaveForRetry(ctx context.Context, payload interface{}) error { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.SYNC_PENDING_FILE)) + + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + // Save the original SocketMessage for republishing + socketMsg := SocketMessage{ + Type: SocketMessageTypeSync, + Payload: payload, + } + + jsonData, err := json.Marshal(socketMsg) + if err != nil { + return err + } + + _, err = file.WriteString(string(jsonData) + "\n") + if err != nil { + return err + } + + slog.Info("Saved sync data for later retry") + return nil +} + +func (s *SyncCircuitBreakerService) checkAndRetry(ctx context.Context) { + s.mu.Lock() + if s.isOpen { + slog.Info("Circuit breaker reset by timer, attempting to retry saved data") + s.isOpen = false + s.consecutiveFailures = 0 + } + s.mu.Unlock() + + s.retryPendingData(ctx) +} + +func (s *SyncCircuitBreakerService) retryPendingData(ctx context.Context) { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.SYNC_PENDING_FILE)) + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + slog.Debug("No pending sync file found, nothing to retry") + return + } + + file, err := os.Open(filePath) + if err != nil { + slog.Error("Failed to open pending sync file for retry", slog.Any("err", err)) + return + } + + var lines []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + lines = append(lines, line) + } + } + file.Close() + + if err := scanner.Err(); err != nil { + slog.Error("Error reading pending sync file", slog.Any("err", err)) + return + } + + if len(lines) == 0 { + slog.Debug("No pending sync data to retry") + return + } + + slog.Info("Starting sync data retry", slog.Int("pendingCount", len(lines))) + + var failedLines []string + successCount := 0 + + for _, line := range lines { + // Republish to pub/sub topic + msg := message.NewMessage(watermill.NewUUID(), []byte(line)) + if err := s.publisher.Publish(PubSubTopic, msg); err != nil { + slog.Warn("Failed to republish sync data, keeping for next retry", slog.Any("err", err)) + failedLines = append(failedLines, line) + } else { + successCount++ + } + } + + // Rewrite file with only failed lines + if err := s.rewriteLogFile(filePath, failedLines); err != nil { + slog.Error("Failed to update pending sync file", slog.Any("err", err)) + return + } + + slog.Info("Sync data retry completed", + slog.Int("republished", successCount), + slog.Int("remaining", len(failedLines))) +} + +func (s *SyncCircuitBreakerService) rewriteLogFile(logFilePath string, lines []string) error { + if len(lines) == 0 { + if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove empty log file: %w", err) + } + return nil + } + + tempFile := logFilePath + ".tmp" + file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + for _, line := range lines { + if _, err := file.WriteString(line + "\n"); err != nil { + file.Close() + os.Remove(tempFile) + return fmt.Errorf("failed to write to temp file: %w", err) + } + } + + if err := file.Close(); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + if err := os.Rename(tempFile, logFilePath); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} diff --git a/daemon/handlers.sync.go b/daemon/handlers.sync.go index b93866c..0d34034 100644 --- a/daemon/handlers.sync.go +++ b/daemon/handlers.sync.go @@ -11,6 +11,16 @@ import ( ) func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { + // Check circuit breaker first + if syncCircuitBreaker != nil && syncCircuitBreaker.IsOpen() { + slog.Error("Circuit breaker is open, saving sync data locally for later retry") + if err := syncCircuitBreaker.SaveForRetry(ctx, socketMsgPayload); err != nil { + slog.Error("Failed to save sync data for retry", slog.Any("err", err)) + return err + } + return nil // Return nil to ack the message + } + pb, err := json.Marshal(socketMsgPayload) if err != nil { slog.Error("Failed to marshal the sync payload again for unmarshal", slog.Any("payload", socketMsgPayload)) @@ -98,8 +108,15 @@ func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { ) if err != nil { + if syncCircuitBreaker != nil { + syncCircuitBreaker.RecordFailure() + } slog.Error("Failed to sync data to server", slog.Any("err", err)) return err } + + if syncCircuitBreaker != nil { + syncCircuitBreaker.RecordSuccess() + } return nil } diff --git a/model/db.go b/model/db.go index eedf778..43d5ca1 100644 --- a/model/db.go +++ b/model/db.go @@ -24,6 +24,7 @@ var ( COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" + SYNC_PENDING_FILE = COMMAND_BASE_STORAGE_FOLDER + "/sync-pending.jsonl" ) func InitFolder(baseFolder string) { @@ -36,6 +37,7 @@ func InitFolder(baseFolder string) { COMMAND_POST_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/post.txt" COMMAND_CURSOR_STORAGE_FILE = COMMAND_STORAGE_FOLDER + "/cursor.txt" HEARTBEAT_LOG_FILE = COMMAND_BASE_STORAGE_FOLDER + "/coding-heartbeat.data.log" + SYNC_PENDING_FILE = COMMAND_BASE_STORAGE_FOLDER + "/sync-pending.jsonl" } // key: ${shell}|${sessionID}|${command}|${username} From bb37156f57a7ed516195d968138c0ca87246294e Mon Sep 17 00:00:00 2001 From: AnnatarHe Date: Thu, 25 Dec 2025 20:32:12 +0800 Subject: [PATCH 2/2] refactor(model): move circuit breaker to model package with tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move CircuitBreakerService from daemon to model package - Add CircuitBreaker interface with generic []byte payload - Add comprehensive tests for circuit breaker functionality - Keep thin wrapper in daemon for SocketMessage handling - Add CircuitBreaker to mockery config for mock generation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .mockery.yml | 4 + daemon/circuit_breaker.go | 234 ++--------------------- model/circuit_breaker.go | 264 ++++++++++++++++++++++++++ model/circuit_breaker_test.go | 346 ++++++++++++++++++++++++++++++++++ 4 files changed, 633 insertions(+), 215 deletions(-) create mode 100644 model/circuit_breaker.go create mode 100644 model/circuit_breaker_test.go diff --git a/.mockery.yml b/.mockery.yml index 7459677..82650e4 100644 --- a/.mockery.yml +++ b/.mockery.yml @@ -26,3 +26,7 @@ packages: config: dir: "{{.InterfaceDir}}" filename: "mock_{{.InterfaceName}}.go" + CircuitBreaker: + config: + dir: "{{.InterfaceDir}}" + filename: "mock_{{.InterfaceName}}.go" diff --git a/daemon/circuit_breaker.go b/daemon/circuit_breaker.go index b3c8173..2b19a99 100644 --- a/daemon/circuit_breaker.go +++ b/daemon/circuit_breaker.go @@ -1,250 +1,54 @@ package daemon import ( - "bufio" "context" "encoding/json" - "fmt" - "log/slog" - "os" - "sync" - "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/malamtime/cli/model" ) -const ( - maxConsecutiveFailures = 10 - CircuitBreakerResetInterval = 1 * time.Hour -) - -// CircuitBreaker defines the interface for circuit breaker operations -type CircuitBreaker interface { +// DaemonCircuitBreaker defines the interface for daemon-specific circuit breaker operations +type DaemonCircuitBreaker interface { IsOpen() bool RecordSuccess() RecordFailure() SaveForRetry(ctx context.Context, payload interface{}) error } -// SyncCircuitBreakerService handles circuit breaker with retry functionality -type SyncCircuitBreakerService struct { - mu sync.RWMutex - consecutiveFailures int - isOpen bool - publisher message.Publisher - ticker *time.Ticker - stopChan chan struct{} - wg sync.WaitGroup -} - // Global instance -var syncCircuitBreaker CircuitBreaker - -// NewSyncCircuitBreakerService creates a new circuit breaker service -func NewSyncCircuitBreakerService(publisher message.Publisher) *SyncCircuitBreakerService { - svc := &SyncCircuitBreakerService{ - publisher: publisher, - stopChan: make(chan struct{}), - } - syncCircuitBreaker = svc - return svc -} - -// Start begins the periodic reset/retry timer -func (s *SyncCircuitBreakerService) Start(ctx context.Context) error { - s.ticker = time.NewTicker(CircuitBreakerResetInterval) - s.wg.Add(1) +var syncCircuitBreaker DaemonCircuitBreaker - go func() { - defer s.wg.Done() - - for { - select { - case <-s.ticker.C: - s.checkAndRetry(ctx) - case <-s.stopChan: - return - case <-ctx.Done(): - return - } - } - }() - - slog.Info("Sync circuit breaker service started", slog.Duration("interval", CircuitBreakerResetInterval)) - return nil +// SyncCircuitBreakerWrapper wraps model.CircuitBreakerService with daemon-specific logic +type SyncCircuitBreakerWrapper struct { + *model.CircuitBreakerService } -// Stop stops the circuit breaker service -func (s *SyncCircuitBreakerService) Stop() { - if s.ticker != nil { - s.ticker.Stop() +// NewSyncCircuitBreakerService creates a new daemon-specific circuit breaker service +func NewSyncCircuitBreakerService(publisher message.Publisher) *SyncCircuitBreakerWrapper { + republishFn := func(data []byte) error { + msg := message.NewMessage(watermill.NewUUID(), data) + return publisher.Publish(PubSubTopic, msg) } - close(s.stopChan) - s.wg.Wait() - slog.Info("Sync circuit breaker service stopped") -} - -func (s *SyncCircuitBreakerService) IsOpen() bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.isOpen -} - -func (s *SyncCircuitBreakerService) RecordSuccess() { - s.mu.Lock() - defer s.mu.Unlock() - s.consecutiveFailures = 0 - s.isOpen = false -} -func (s *SyncCircuitBreakerService) RecordFailure() { - s.mu.Lock() - defer s.mu.Unlock() - s.consecutiveFailures++ - if s.consecutiveFailures >= maxConsecutiveFailures { - if !s.isOpen { - slog.Error("Circuit breaker opened due to consecutive failures - server may be experiencing issues", - slog.Int("failures", s.consecutiveFailures)) - } - s.isOpen = true + svc := model.NewCircuitBreakerService(model.CircuitBreakerConfig{}, republishFn) + wrapper := &SyncCircuitBreakerWrapper{ + CircuitBreakerService: svc, } + syncCircuitBreaker = wrapper + return wrapper } -func (s *SyncCircuitBreakerService) SaveForRetry(ctx context.Context, payload interface{}) error { - filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.SYNC_PENDING_FILE)) - - file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer file.Close() - - // Save the original SocketMessage for republishing +// SaveForRetry wraps payload in SocketMessage before saving +func (w *SyncCircuitBreakerWrapper) SaveForRetry(ctx context.Context, payload interface{}) error { socketMsg := SocketMessage{ Type: SocketMessageTypeSync, Payload: payload, } - jsonData, err := json.Marshal(socketMsg) if err != nil { return err } - - _, err = file.WriteString(string(jsonData) + "\n") - if err != nil { - return err - } - - slog.Info("Saved sync data for later retry") - return nil -} - -func (s *SyncCircuitBreakerService) checkAndRetry(ctx context.Context) { - s.mu.Lock() - if s.isOpen { - slog.Info("Circuit breaker reset by timer, attempting to retry saved data") - s.isOpen = false - s.consecutiveFailures = 0 - } - s.mu.Unlock() - - s.retryPendingData(ctx) -} - -func (s *SyncCircuitBreakerService) retryPendingData(ctx context.Context) { - filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.SYNC_PENDING_FILE)) - - if _, err := os.Stat(filePath); os.IsNotExist(err) { - slog.Debug("No pending sync file found, nothing to retry") - return - } - - file, err := os.Open(filePath) - if err != nil { - slog.Error("Failed to open pending sync file for retry", slog.Any("err", err)) - return - } - - var lines []string - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - if line != "" { - lines = append(lines, line) - } - } - file.Close() - - if err := scanner.Err(); err != nil { - slog.Error("Error reading pending sync file", slog.Any("err", err)) - return - } - - if len(lines) == 0 { - slog.Debug("No pending sync data to retry") - return - } - - slog.Info("Starting sync data retry", slog.Int("pendingCount", len(lines))) - - var failedLines []string - successCount := 0 - - for _, line := range lines { - // Republish to pub/sub topic - msg := message.NewMessage(watermill.NewUUID(), []byte(line)) - if err := s.publisher.Publish(PubSubTopic, msg); err != nil { - slog.Warn("Failed to republish sync data, keeping for next retry", slog.Any("err", err)) - failedLines = append(failedLines, line) - } else { - successCount++ - } - } - - // Rewrite file with only failed lines - if err := s.rewriteLogFile(filePath, failedLines); err != nil { - slog.Error("Failed to update pending sync file", slog.Any("err", err)) - return - } - - slog.Info("Sync data retry completed", - slog.Int("republished", successCount), - slog.Int("remaining", len(failedLines))) -} - -func (s *SyncCircuitBreakerService) rewriteLogFile(logFilePath string, lines []string) error { - if len(lines) == 0 { - if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove empty log file: %w", err) - } - return nil - } - - tempFile := logFilePath + ".tmp" - file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) - if err != nil { - return fmt.Errorf("failed to create temp file: %w", err) - } - - for _, line := range lines { - if _, err := file.WriteString(line + "\n"); err != nil { - file.Close() - os.Remove(tempFile) - return fmt.Errorf("failed to write to temp file: %w", err) - } - } - - if err := file.Close(); err != nil { - os.Remove(tempFile) - return fmt.Errorf("failed to close temp file: %w", err) - } - - if err := os.Rename(tempFile, logFilePath); err != nil { - os.Remove(tempFile) - return fmt.Errorf("failed to rename temp file: %w", err) - } - - return nil + return w.CircuitBreakerService.SaveForRetry(ctx, jsonData) } diff --git a/model/circuit_breaker.go b/model/circuit_breaker.go new file mode 100644 index 0000000..10e0df9 --- /dev/null +++ b/model/circuit_breaker.go @@ -0,0 +1,264 @@ +package model + +import ( + "bufio" + "context" + "fmt" + "log/slog" + "os" + "sync" + "time" +) + +const ( + DefaultMaxConsecutiveFailures = 10 + DefaultCircuitResetInterval = 1 * time.Hour +) + +// CircuitBreaker defines the interface for circuit breaker operations +type CircuitBreaker interface { + IsOpen() bool + RecordSuccess() + RecordFailure() + SaveForRetry(ctx context.Context, payload []byte) error +} + +// CircuitBreakerConfig holds configuration for the circuit breaker service +type CircuitBreakerConfig struct { + MaxConsecutiveFailures int + ResetInterval time.Duration +} + +// RepublishFunc is called when retrying pending data +type RepublishFunc func(data []byte) error + +// CircuitBreakerService handles circuit breaker with retry functionality +type CircuitBreakerService struct { + mu sync.RWMutex + consecutiveFailures int + isOpen bool + config CircuitBreakerConfig + republishFn RepublishFunc + ticker *time.Ticker + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewCircuitBreakerService creates a new circuit breaker service +func NewCircuitBreakerService(config CircuitBreakerConfig, republishFn RepublishFunc) *CircuitBreakerService { + if config.MaxConsecutiveFailures <= 0 { + config.MaxConsecutiveFailures = DefaultMaxConsecutiveFailures + } + if config.ResetInterval <= 0 { + config.ResetInterval = DefaultCircuitResetInterval + } + return &CircuitBreakerService{ + config: config, + republishFn: republishFn, + stopChan: make(chan struct{}), + } +} + +// Start begins the periodic reset/retry timer +func (s *CircuitBreakerService) Start(ctx context.Context) error { + s.ticker = time.NewTicker(s.config.ResetInterval) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + for { + select { + case <-s.ticker.C: + s.checkAndRetry(ctx) + case <-s.stopChan: + return + case <-ctx.Done(): + return + } + } + }() + + slog.Info("Circuit breaker service started", slog.Duration("interval", s.config.ResetInterval)) + return nil +} + +// Stop stops the circuit breaker service +func (s *CircuitBreakerService) Stop() { + if s.ticker != nil { + s.ticker.Stop() + } + close(s.stopChan) + s.wg.Wait() + slog.Info("Circuit breaker service stopped") +} + +// IsOpen returns true if circuit is open +func (s *CircuitBreakerService) IsOpen() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.isOpen +} + +// RecordSuccess resets failure counter and closes circuit +func (s *CircuitBreakerService) RecordSuccess() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures = 0 + s.isOpen = false +} + +// RecordFailure increments failure counter, opens circuit at threshold +func (s *CircuitBreakerService) RecordFailure() { + s.mu.Lock() + defer s.mu.Unlock() + s.consecutiveFailures++ + if s.consecutiveFailures >= s.config.MaxConsecutiveFailures { + if !s.isOpen { + slog.Error("Circuit breaker opened due to consecutive failures - server may be experiencing issues", + slog.Int("failures", s.consecutiveFailures)) + } + s.isOpen = true + } +} + +// SaveForRetry saves payload to file for later retry +func (s *CircuitBreakerService) SaveForRetry(ctx context.Context, payload []byte) error { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", SYNC_PENDING_FILE)) + + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer file.Close() + + _, err = file.Write(payload) + if err != nil { + return err + } + _, err = file.WriteString("\n") + if err != nil { + return err + } + + slog.Info("Saved data for later retry") + return nil +} + +// GetConsecutiveFailures returns the current failure count (for testing) +func (s *CircuitBreakerService) GetConsecutiveFailures() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.consecutiveFailures +} + +func (s *CircuitBreakerService) checkAndRetry(ctx context.Context) { + s.mu.Lock() + if s.isOpen { + slog.Info("Circuit breaker reset by timer, attempting to retry saved data") + s.isOpen = false + s.consecutiveFailures = 0 + } + s.mu.Unlock() + + s.retryPendingData(ctx) +} + +func (s *CircuitBreakerService) retryPendingData(ctx context.Context) { + filePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", SYNC_PENDING_FILE)) + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + slog.Debug("No pending sync file found, nothing to retry") + return + } + + file, err := os.Open(filePath) + if err != nil { + slog.Error("Failed to open pending sync file for retry", slog.Any("err", err)) + return + } + + var lines []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if line != "" { + lines = append(lines, line) + } + } + file.Close() + + if err := scanner.Err(); err != nil { + slog.Error("Error reading pending sync file", slog.Any("err", err)) + return + } + + if len(lines) == 0 { + slog.Debug("No pending sync data to retry") + return + } + + slog.Info("Starting sync data retry", slog.Int("pendingCount", len(lines))) + + var failedLines []string + successCount := 0 + + for _, line := range lines { + if s.republishFn == nil { + slog.Error("No republish function configured") + failedLines = append(failedLines, line) + continue + } + + if err := s.republishFn([]byte(line)); err != nil { + slog.Warn("Failed to republish sync data, keeping for next retry", slog.Any("err", err)) + failedLines = append(failedLines, line) + } else { + successCount++ + } + } + + if err := s.rewriteLogFile(filePath, failedLines); err != nil { + slog.Error("Failed to update pending sync file", slog.Any("err", err)) + return + } + + slog.Info("Sync data retry completed", + slog.Int("republished", successCount), + slog.Int("remaining", len(failedLines))) +} + +func (s *CircuitBreakerService) rewriteLogFile(logFilePath string, lines []string) error { + if len(lines) == 0 { + if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove empty log file: %w", err) + } + return nil + } + + tempFile := logFilePath + ".tmp" + file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + + for _, line := range lines { + if _, err := file.WriteString(line + "\n"); err != nil { + file.Close() + os.Remove(tempFile) + return fmt.Errorf("failed to write to temp file: %w", err) + } + } + + if err := file.Close(); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + if err := os.Rename(tempFile, logFilePath); err != nil { + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} diff --git a/model/circuit_breaker_test.go b/model/circuit_breaker_test.go new file mode 100644 index 0000000..43e1bc2 --- /dev/null +++ b/model/circuit_breaker_test.go @@ -0,0 +1,346 @@ +package model + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCircuitBreakerService_NewWithDefaults(t *testing.T) { + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + assert.Equal(t, DefaultMaxConsecutiveFailures, svc.config.MaxConsecutiveFailures) + assert.Equal(t, DefaultCircuitResetInterval, svc.config.ResetInterval) + assert.False(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_NewWithCustomConfig(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 5, + ResetInterval: 30 * time.Minute, + } + svc := NewCircuitBreakerService(config, nil) + + assert.Equal(t, 5, svc.config.MaxConsecutiveFailures) + assert.Equal(t, 30*time.Minute, svc.config.ResetInterval) +} + +func TestCircuitBreakerService_IsOpen(t *testing.T) { + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + assert.False(t, svc.IsOpen(), "circuit should start closed") +} + +func TestCircuitBreakerService_RecordFailure(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // First two failures should not open circuit + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 1, svc.GetConsecutiveFailures()) + + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 2, svc.GetConsecutiveFailures()) + + // Third failure should open circuit + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + assert.Equal(t, 3, svc.GetConsecutiveFailures()) + + // Additional failures should keep circuit open + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + assert.Equal(t, 4, svc.GetConsecutiveFailures()) +} + +func TestCircuitBreakerService_RecordSuccess(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // Open the circuit + svc.RecordFailure() + svc.RecordFailure() + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + + // Success should close circuit and reset counter + svc.RecordSuccess() + assert.False(t, svc.IsOpen()) + assert.Equal(t, 0, svc.GetConsecutiveFailures()) +} + +func TestCircuitBreakerService_RecordSuccess_ResetsCounter(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 5, + } + svc := NewCircuitBreakerService(config, nil) + + // Record some failures + svc.RecordFailure() + svc.RecordFailure() + assert.Equal(t, 2, svc.GetConsecutiveFailures()) + + // Success resets counter + svc.RecordSuccess() + assert.Equal(t, 0, svc.GetConsecutiveFailures()) + + // Now it takes 5 more failures to open circuit + for i := 0; i < 4; i++ { + svc.RecordFailure() + assert.False(t, svc.IsOpen()) + } + svc.RecordFailure() + assert.True(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_SaveForRetry(t *testing.T) { + // Create temp directory + tempDir := t.TempDir() + + // Override SYNC_PENDING_FILE for test + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + // Override HOME for test + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + + ctx := context.Background() + payload := []byte(`{"type":"sync","payload":{"data":"test"}}`) + + err := svc.SaveForRetry(ctx, payload) + require.NoError(t, err) + + // Read the file and verify content + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + assert.Contains(t, string(content), `{"type":"sync","payload":{"data":"test"}}`) +} + +func TestCircuitBreakerService_SaveForRetry_AppendsMultiple(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, nil) + ctx := context.Background() + + // Save multiple payloads + err := svc.SaveForRetry(ctx, []byte(`{"id":1}`)) + require.NoError(t, err) + err = svc.SaveForRetry(ctx, []byte(`{"id":2}`)) + require.NoError(t, err) + err = svc.SaveForRetry(ctx, []byte(`{"id":3}`)) + require.NoError(t, err) + + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + + lines := string(content) + assert.Contains(t, lines, `{"id":1}`) + assert.Contains(t, lines, `{"id":2}`) + assert.Contains(t, lines, `{"id":3}`) +} + +func TestCircuitBreakerService_RetryPendingData(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + // Create pending file with test data + testData := `{"id":1} +{"id":2} +{"id":3} +` + err := os.WriteFile(SYNC_PENDING_FILE, []byte(testData), 0644) + require.NoError(t, err) + + var republishedData []string + var mu sync.Mutex + + republishFn := func(data []byte) error { + mu.Lock() + defer mu.Unlock() + republishedData = append(republishedData, string(data)) + return nil + } + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, republishFn) + + // Trigger retry + ctx := context.Background() + svc.retryPendingData(ctx) + + // Verify all data was republished + mu.Lock() + defer mu.Unlock() + assert.Len(t, republishedData, 3) + assert.Contains(t, republishedData, `{"id":1}`) + assert.Contains(t, republishedData, `{"id":2}`) + assert.Contains(t, republishedData, `{"id":3}`) + + // File should be removed after successful retry + _, err = os.Stat(SYNC_PENDING_FILE) + assert.True(t, os.IsNotExist(err)) +} + +func TestCircuitBreakerService_RetryPendingData_PartialFailure(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + // Create pending file with test data + testData := `{"id":1} +{"id":2} +{"id":3} +` + err := os.WriteFile(SYNC_PENDING_FILE, []byte(testData), 0644) + require.NoError(t, err) + + callCount := 0 + republishFn := func(data []byte) error { + callCount++ + // Fail on second item + if callCount == 2 { + return assert.AnError + } + return nil + } + + svc := NewCircuitBreakerService(CircuitBreakerConfig{}, republishFn) + + ctx := context.Background() + svc.retryPendingData(ctx) + + // File should still exist with failed item + content, err := os.ReadFile(SYNC_PENDING_FILE) + require.NoError(t, err) + assert.Contains(t, string(content), `{"id":2}`) + assert.NotContains(t, string(content), `{"id":1}`) + assert.NotContains(t, string(content), `{"id":3}`) +} + +func TestCircuitBreakerService_StartStop(t *testing.T) { + config := CircuitBreakerConfig{ + ResetInterval: 100 * time.Millisecond, // Short interval for testing + } + svc := NewCircuitBreakerService(config, nil) + + ctx := context.Background() + err := svc.Start(ctx) + require.NoError(t, err) + + // Stop should not block + done := make(chan struct{}) + go func() { + svc.Stop() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(1 * time.Second): + t.Fatal("Stop blocked for too long") + } +} + +func TestCircuitBreakerService_ThreadSafety(t *testing.T) { + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 100, + } + svc := NewCircuitBreakerService(config, nil) + + var wg sync.WaitGroup + iterations := 100 + + // Concurrent RecordFailure + wg.Add(iterations) + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + svc.RecordFailure() + }() + } + wg.Wait() + + assert.Equal(t, 100, svc.GetConsecutiveFailures()) + assert.True(t, svc.IsOpen()) + + // Concurrent RecordSuccess should reset + wg.Add(iterations) + for i := 0; i < iterations; i++ { + go func() { + defer wg.Done() + svc.RecordSuccess() + }() + } + wg.Wait() + + assert.Equal(t, 0, svc.GetConsecutiveFailures()) + assert.False(t, svc.IsOpen()) +} + +func TestCircuitBreakerService_CheckAndRetry_ResetsCircuit(t *testing.T) { + tempDir := t.TempDir() + + originalFile := SYNC_PENDING_FILE + SYNC_PENDING_FILE = filepath.Join(tempDir, "test-pending.jsonl") + defer func() { SYNC_PENDING_FILE = originalFile }() + + originalHome := os.Getenv("HOME") + os.Setenv("HOME", "") + defer os.Setenv("HOME", originalHome) + + config := CircuitBreakerConfig{ + MaxConsecutiveFailures: 3, + } + svc := NewCircuitBreakerService(config, nil) + + // Open the circuit + svc.RecordFailure() + svc.RecordFailure() + svc.RecordFailure() + assert.True(t, svc.IsOpen()) + + // checkAndRetry should reset the circuit + ctx := context.Background() + svc.checkAndRetry(ctx) + + assert.False(t, svc.IsOpen()) + assert.Equal(t, 0, svc.GetConsecutiveFailures()) +}