Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ func main() {
}
}

// Start heartbeat resync service if codeTracking is enabled
if cfg.CodeTracking != nil && cfg.CodeTracking.Enabled != nil && *cfg.CodeTracking.Enabled {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition cfg.CodeTracking != nil && cfg.CodeTracking.Enabled != nil && *cfg.CodeTracking.Enabled is verbose and is also duplicated in daemon/socket.go (line 118). To improve readability and reduce duplication, consider adding a helper method to the CodeTracking struct.

For example, in model/types.go:

func (c *CodeTracking) IsEnabled() bool {
    return c != nil && c.Enabled != nil && *c.Enabled
}

Then the check in both places becomes a much cleaner if cfg.CodeTracking.IsEnabled().

heartbeatResyncService := daemon.NewHeartbeatResyncService(cfg)
if err := heartbeatResyncService.Start(ctx); err != nil {
slog.Error("Failed to start heartbeat resync service", slog.Any("err", err))
} else {
slog.Info("Heartbeat resync service started")
defer heartbeatResyncService.Stop()
}
}

// Create processor instance
processor := daemon.NewSocketHandler(&cfg, pubsub)

Expand Down
26 changes: 18 additions & 8 deletions daemon/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ func SocketTopicProccessor(messages <-chan *message.Message) {
if err := json.Unmarshal(msg.Payload, &socketMsg); err != nil {
slog.ErrorContext(ctx, "failed to parse socket message", slog.Any("err", err))
msg.Nack()
continue
}

if socketMsg.Type == SocketMessageTypeSync {
err := handlePubSubSync(ctx, socketMsg.Payload)
if err != nil {
slog.ErrorContext(ctx, "failed to parse socket message", slog.Any("err", err))
msg.Nack()
} else {
msg.Ack()
}
var err error
switch socketMsg.Type {
case SocketMessageTypeSync:
err = handlePubSubSync(ctx, socketMsg.Payload)
case SocketMessageTypeHeartbeat:
err = handlePubSubHeartbeat(ctx, socketMsg.Payload)
default:
slog.ErrorContext(ctx, "unknown socket message type", slog.String("type", string(socketMsg.Type)))
msg.Nack()
continue
}

if err != nil {
slog.ErrorContext(ctx, "failed to handle socket message", slog.Any("err", err), slog.String("type", string(socketMsg.Type)))
msg.Nack()
} else {
msg.Ack()
}
}
}
80 changes: 80 additions & 0 deletions daemon/handlers.heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package daemon

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"

"github.com/malamtime/cli/model"
)

func handlePubSubHeartbeat(ctx context.Context, socketMsgPayload interface{}) error {
pb, err := json.Marshal(socketMsgPayload)
if err != nil {
slog.Error("Failed to marshal the heartbeat payload again for unmarshal", slog.Any("payload", socketMsgPayload))
return err
}

var heartbeatPayload model.HeartbeatPayload
err = json.Unmarshal(pb, &heartbeatPayload)
if err != nil {
slog.Error("Failed to parse heartbeat payload", slog.Any("payload", socketMsgPayload))
return err
}
Comment on lines +14 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation marshals socketMsgPayload (which is a map[string]interface{}) to a JSON byte slice, only to immediately unmarshal it back into a model.HeartbeatPayload struct. This marshal/unmarshal cycle is inefficient.

A more performant approach is to use a library like mapstructure to directly decode the map into your struct.

Example:

import "github.com/mitchellh/mapstructure"

// ...

func handlePubSubHeartbeat(ctx context.Context, socketMsgPayload interface{}) error {
	var heartbeatPayload model.HeartbeatPayload
	err := mapstructure.Decode(socketMsgPayload, &heartbeatPayload)
	if err != nil {
		slog.Error("Failed to parse heartbeat payload", slog.Any("payload", socketMsgPayload), slog.Any("err", err))
		return err
	}

	// ... rest of the function
}

This avoids the overhead of the unnecessary JSON marshaling step.


if len(heartbeatPayload.Heartbeats) == 0 {
slog.Debug("Empty heartbeat payload, skipping")
return nil
}

cfg, err := stConfig.ReadConfigFile(ctx)
if err != nil {
slog.Error("Failed to read config file", slog.Any("err", err))
return err
}
Comment on lines +32 to +36
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This function calls stConfig.ReadConfigFile(ctx) on every invocation. Since ReadConfigFile reads and parses the configuration from disk each time, this is inefficient and will degrade performance under load from frequent heartbeats.

The configuration is already loaded in main.go. It should be passed down to the message processor and its handlers instead of being re-read from disk. Consider refactoring SocketTopicProccessor to accept the model.ShellTimeConfig object and pass it to this handler.


// Try to send to server
err = model.SendHeartbeatsToServer(ctx, cfg, heartbeatPayload)
if err != nil {
slog.Warn("Failed to send heartbeats to server, saving to local file", slog.Any("err", err))
// On failure, save to local file
if saveErr := saveHeartbeatToFile(heartbeatPayload); saveErr != nil {
slog.Error("Failed to save heartbeat to local file", slog.Any("err", saveErr))
return saveErr
}
// Return nil because we saved the data locally - don't nack the message
return nil
}

slog.Info("Successfully sent heartbeats to server", slog.Int("count", len(heartbeatPayload.Heartbeats)))
return nil
}

// saveHeartbeatToFile appends a heartbeat payload as a single JSON line to the log file
func saveHeartbeatToFile(payload model.HeartbeatPayload) error {
logFilePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.HEARTBEAT_LOG_FILE))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better platform compatibility (e.g., on Windows), it's recommended to use filepath.Join to construct file paths instead of string formatting with /. Additionally, os.UserHomeDir() is a more robust way to get the user's home directory than relying on the $HOME environment variable.

Suggested change:

homeDir, err := os.UserHomeDir()
if err != nil {
    return fmt.Errorf("failed to get user home dir: %w", err)
}
logFilePath := filepath.Join(homeDir, model.HEARTBEAT_LOG_FILE)

This change should also be applied in daemon/heartbeat_resync.go.


// Open file for appending, create if not exists
file, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open heartbeat log file: %w", err)
}
defer file.Close()

// Marshal payload to JSON
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal heartbeat payload: %w", err)
}

// Write as single line with newline
_, err = file.Write(append(data, '\n'))
if err != nil {
return fmt.Errorf("failed to write heartbeat to file: %w", err)
}

slog.Debug("Saved heartbeat to local file", slog.String("path", logFilePath), slog.Int("count", len(payload.Heartbeats)))
return nil
}
181 changes: 181 additions & 0 deletions daemon/heartbeat_resync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package daemon

import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"sync"
"time"

"github.com/malamtime/cli/model"
)

const (
// HeartbeatResyncInterval is the interval for retrying failed heartbeats
HeartbeatResyncInterval = 30 * time.Minute
)

// HeartbeatResyncService handles periodic resync of failed heartbeats
type HeartbeatResyncService struct {
config model.ShellTimeConfig
ticker *time.Ticker
stopChan chan struct{}
wg sync.WaitGroup
}

// NewHeartbeatResyncService creates a new heartbeat resync service
func NewHeartbeatResyncService(config model.ShellTimeConfig) *HeartbeatResyncService {
return &HeartbeatResyncService{
config: config,
stopChan: make(chan struct{}),
}
}

// Start begins the periodic resync job
func (s *HeartbeatResyncService) Start(ctx context.Context) error {
s.ticker = time.NewTicker(HeartbeatResyncInterval)
s.wg.Add(1)

go func() {
defer s.wg.Done()

// Run once at startup
s.resync(ctx)

for {
select {
case <-s.ticker.C:
s.resync(ctx)
case <-s.stopChan:
return
case <-ctx.Done():
return
}
}
}()

slog.Info("Heartbeat resync service started", slog.Duration("interval", HeartbeatResyncInterval))
return nil
}

// Stop stops the resync service
func (s *HeartbeatResyncService) Stop() {
if s.ticker != nil {
s.ticker.Stop()
}
close(s.stopChan)
s.wg.Wait()
slog.Info("Heartbeat resync service stopped")
}

// resync reads failed heartbeats from the log file and attempts to send them
func (s *HeartbeatResyncService) resync(ctx context.Context) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a critical race condition in the resync logic that can lead to data loss. The current implementation reads the entire log file, processes it in memory, and then overwrites the original file. If a new failed heartbeat is written to the log file while processing is underway, it will be deleted when the file is overwritten.

To fix this, you should make the file handling atomic. A common pattern is:

  1. Atomically rename the log file (e.g., heartbeat.log to heartbeat.log.processing).
  2. New failed heartbeats can now be safely written to a new (and empty) heartbeat.log.
  3. Process the contents of heartbeat.log.processing.
  4. If any heartbeats from the processing file fail again, append them to the new heartbeat.log file.
  5. Delete heartbeat.log.processing once you're done with it.

logFilePath := os.ExpandEnv(fmt.Sprintf("%s/%s", "$HOME", model.HEARTBEAT_LOG_FILE))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better platform compatibility, it's recommended to use filepath.Join to construct file paths instead of string formatting with /. Additionally, os.UserHomeDir() is a more robust way to get the user's home directory than relying on the $HOME environment variable.

Suggested change:

homeDir, err := os.UserHomeDir()
if err != nil {
    slog.Error("could not get user home directory", slog.Any("err", err))
    return
}
logFilePath := filepath.Join(homeDir, model.HEARTBEAT_LOG_FILE)


// Check if file exists
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
slog.Debug("No heartbeat log file found, nothing to resync")
return
}

// Read the file
file, err := os.Open(logFilePath)
if err != nil {
slog.Error("Failed to open heartbeat log file for resync", slog.Any("err", err))
return
}

var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
lines = append(lines, line)
}
}
Comment on lines +91 to +98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resync function reads the entire contents of the heartbeat log file into memory. If a user is offline for an extended period, this file could become very large, leading to high memory consumption.

A more memory-efficient approach would be to process the file line-by-line. When combined with the fix for the race condition, you could read from the renamed file (.processing) one line at a time, and write any lines that fail to resync to the new log file directly, without buffering all lines in memory.

file.Close()

if err := scanner.Err(); err != nil {
slog.Error("Error reading heartbeat log file", slog.Any("err", err))
return
}

if len(lines) == 0 {
slog.Debug("No failed heartbeats to resync")
return
}

slog.Info("Starting heartbeat resync", slog.Int("pendingCount", len(lines)))

// Process each line
var failedLines []string
successCount := 0

for _, line := range lines {
var payload model.HeartbeatPayload
if err := json.Unmarshal([]byte(line), &payload); err != nil {
slog.Error("Failed to parse heartbeat line, discarding", slog.Any("err", err), slog.String("line", line))
continue
}

// Try to send to server
if err := model.SendHeartbeatsToServer(ctx, s.config, payload); err != nil {
slog.Warn("Failed to resync heartbeat, keeping for next retry", slog.Any("err", err))
failedLines = append(failedLines, line)
} else {
successCount++
}
}

// Rewrite the file with only failed lines
if err := s.rewriteLogFile(logFilePath, failedLines); err != nil {
slog.Error("Failed to update heartbeat log file", slog.Any("err", err))
return
}

slog.Info("Heartbeat resync completed",
slog.Int("success", successCount),
slog.Int("remaining", len(failedLines)))
}

// rewriteLogFile atomically rewrites the log file with the given lines
func (s *HeartbeatResyncService) rewriteLogFile(logFilePath string, lines []string) error {
// If no lines remaining, remove the file
if len(lines) == 0 {
if err := os.Remove(logFilePath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove empty log file: %w", err)
}
return nil
}

// Write to temp file first
tempFile := logFilePath + ".tmp"
file, err := os.OpenFile(tempFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}

for _, line := range lines {
if _, err := file.WriteString(line + "\n"); err != nil {
file.Close()
os.Remove(tempFile)
return fmt.Errorf("failed to write to temp file: %w", err)
}
}

if err := file.Close(); err != nil {
os.Remove(tempFile)
return fmt.Errorf("failed to close temp file: %w", err)
}

// Atomic rename
if err := os.Rename(tempFile, logFilePath); err != nil {
os.Remove(tempFile)
return fmt.Errorf("failed to rename temp file: %w", err)
}

return nil
}
19 changes: 18 additions & 1 deletion daemon/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
type SocketMessageType string

const (
SocketMessageTypeSync SocketMessageType = "sync"
SocketMessageTypeSync SocketMessageType = "sync"
SocketMessageTypeHeartbeat SocketMessageType = "heartbeat"
)

type SocketMessage struct {
Expand Down Expand Up @@ -112,6 +113,22 @@ func (p *SocketHandler) handleConnection(conn net.Conn) {
if err := p.channel.Publish(PubSubTopic, chMsg); err != nil {
slog.Error("Error to publish topic", slog.Any("err", err))
}
case SocketMessageTypeHeartbeat:
// Only process heartbeat if codeTracking is enabled
if p.config.CodeTracking == nil || p.config.CodeTracking.Enabled == nil || !*p.config.CodeTracking.Enabled {
slog.Debug("Heartbeat message received but codeTracking is disabled, ignoring")
return
}
buf, err := json.Marshal(msg)
if err != nil {
slog.Error("Error encoding heartbeat message", slog.Any("err", err))
return
}

chMsg := message.NewMessage(watermill.NewUUID(), buf)
if err := p.channel.Publish(PubSubTopic, chMsg); err != nil {
slog.Error("Error publishing heartbeat topic", slog.Any("err", err))
}
default:
slog.Error("Unknown message type:", slog.String("messageType", string(msg.Type)))
}
Expand Down
35 changes: 35 additions & 0 deletions model/api_heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package model

import (
"context"
"net/http"
"time"
)

// SendHeartbeatsToServer sends heartbeat data to the server
func SendHeartbeatsToServer(ctx context.Context, cfg ShellTimeConfig, payload HeartbeatPayload) error {
ctx, span := modelTracer.Start(ctx, "api.sendHeartbeats")
defer span.End()

endpoint := Endpoint{
Token: cfg.Token,
APIEndpoint: cfg.APIEndpoint,
}

var response HeartbeatResponse
err := SendHTTPRequestJSON(HTTPRequestOptions[HeartbeatPayload, HeartbeatResponse]{
Context: ctx,
Endpoint: endpoint,
Method: http.MethodPost,
Path: "/api/v1/heartbeats",
Payload: payload,
Response: &response,
Timeout: 10 * time.Second,
})

if err != nil {
return err
}

return nil
Comment on lines +30 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function assumes a successful API call if the HTTP request itself doesn't error. However, the server can return a 200 OK status but indicate a logical failure in the response body (e.g., success: false). These failures are not being handled, which could lead to data loss as the client would incorrectly assume the heartbeats were processed and not save them for retry.

You should inspect the response struct and return an error if the server-side operation was not successful.

// You will need to add "fmt" and "errors" to your imports.
if err != nil {
    return err
}

if !response.Success {
    msg := "heartbeat API request failed"
    if response.Message != "" {
        msg = fmt.Sprintf("%s: %s", msg, response.Message)
    }
    return errors.New(msg)
}

return nil

}
Loading