From 07c22cd9229566d839180fc40f2f88f2777ac654 Mon Sep 17 00:00:00 2001 From: Su Yang Date: Tue, 10 Jun 2025 21:04:21 +0800 Subject: [PATCH 1/2] feat: can dump support --- api.go | 435 ++++++++++++++++++++++++++++++++++++++++++++++-- listener.go | 467 ++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 164 +++++++++++++++++- 3 files changed, 1050 insertions(+), 16 deletions(-) create mode 100644 listener.go diff --git a/api.go b/api.go index f37eee4..d28d72d 100644 --- a/api.go +++ b/api.go @@ -3,35 +3,50 @@ package main import ( "fmt" "net/http" + "strconv" "github.com/gin-gonic/gin" ) // APIHandler handles HTTP API requests type APIHandler struct { - messageSender *MessageSender - monitor *Monitor - setupManager *InterfaceSetupManager - logger Logger + messageSender *MessageSender + monitor *Monitor + setupManager *InterfaceSetupManager + messageListener *CanMessageListener + logger Logger } // NewAPIHandler creates a new API handler (legacy, without setup manager) func NewAPIHandler(messageSender *MessageSender, monitor *Monitor, logger Logger) *APIHandler { return &APIHandler{ - messageSender: messageSender, - monitor: monitor, - setupManager: nil, - logger: logger, + messageSender: messageSender, + monitor: monitor, + setupManager: nil, + messageListener: nil, + logger: logger, } } // NewAPIHandlerWithSetup creates a new API handler with setup manager func NewAPIHandlerWithSetup(messageSender *MessageSender, monitor *Monitor, setupManager *InterfaceSetupManager, logger Logger) *APIHandler { return &APIHandler{ - messageSender: messageSender, - monitor: monitor, - setupManager: setupManager, - logger: logger, + messageSender: messageSender, + monitor: monitor, + setupManager: setupManager, + messageListener: nil, + logger: logger, + } +} + +// NewAPIHandlerWithSetupAndListener creates a new API handler with setup manager and message listener +func NewAPIHandlerWithSetupAndListener(messageSender *MessageSender, monitor *Monitor, setupManager *InterfaceSetupManager, messageListener *CanMessageListener, logger Logger) *APIHandler { + return &APIHandler{ + messageSender: messageSender, + monitor: monitor, + setupManager: setupManager, + messageListener: messageListener, + logger: logger, } } @@ -67,6 +82,29 @@ func (h *APIHandler) SetupRoutes(r *gin.Engine) { setup.POST("/interfaces/teardown-all", h.handleTeardownAllInterfaces) } } + + // Message listening endpoints (new) + if h.messageListener != nil { + messages := api.Group("/messages") + { + // Get messages from specific interface + messages.GET("/:interface", h.handleGetMessages) + messages.GET("/:interface/recent", h.handleGetRecentMessages) + messages.GET("/:interface/statistics", h.handleGetMessageStatistics) + messages.DELETE("/:interface", h.handleClearMessages) + + // Global message operations + messages.GET("/", h.handleGetAllMessages) + messages.GET("/statistics", h.handleGetAllMessageStatistics) + messages.DELETE("/", h.handleClearAllMessages) + + // Listener control + messages.POST("/:interface/listen/start", h.handleStartListening) + messages.POST("/:interface/listen/stop", h.handleStopListening) + messages.GET("/:interface/listen/status", h.handleGetListenStatus) + messages.GET("/listen/status", h.handleGetAllListenStatus) + } + } } } @@ -123,6 +161,11 @@ func (h *APIHandler) handleInterfacesList(c *gin.Context) { "activeCount": status.ActiveInterfaces, } + // Add listening status if message listener is available + if h.messageListener != nil { + data["listeningInterfaces"] = h.messageListener.GetListeningInterfaces() + } + h.respondSuccess(c, "", data) } @@ -140,7 +183,22 @@ func (h *APIHandler) handleInterfaceStatus(c *gin.Context) { return } - h.respondSuccess(c, "", status) + // Add listening status if message listener is available + if h.messageListener != nil { + statusMap := map[string]interface{}{ + "interfaceStatus": status, + "isListening": h.messageListener.IsListening(ifName), + } + + // Add message statistics if available + if stats, err := h.messageListener.GetInterfaceStatistics(ifName); err == nil { + statusMap["messageStatistics"] = stats + } + + h.respondSuccess(c, "", statusMap) + } else { + h.respondSuccess(c, "", status) + } } // handleHealthSummary returns system health summary @@ -176,13 +234,20 @@ func (h *APIHandler) handleMetrics(c *gin.Context) { "health_checks_passed": ifStatus.Health.ChecksPassed, "health_checks_failed": ifStatus.Health.ChecksFailed, } + + // Add message listening metrics if available + if h.messageListener != nil { + if stats, err := h.messageListener.GetInterfaceStatistics(name); err == nil { + interfaceMetrics[name].(map[string]interface{})["message_listening"] = stats + } + } } metrics["interfaces"] = interfaceMetrics h.respondSuccess(c, "", metrics) } -// ====== Interface Setup Handlers (New) ====== +// ====== Interface Setup Handlers (Existing) ====== // handleGetSetupConfig returns current setup configuration func (h *APIHandler) handleGetSetupConfig(c *gin.Context) { @@ -331,6 +396,13 @@ func (h *APIHandler) handleSetupInterface(c *gin.Context) { return } + // Start listening if message listener is available + if h.messageListener != nil { + if err := h.messageListener.StartListening(ifName); err != nil { + h.logger.Printf("Warning: failed to start listening on %s: %v", ifName, err) + } + } + // Get interface state state, err := h.setupManager.GetInterfaceState(ifName) if err != nil { @@ -354,6 +426,13 @@ func (h *APIHandler) handleTeardownInterface(c *gin.Context) { return } + // Stop listening if message listener is available + if h.messageListener != nil { + if err := h.messageListener.StopListening(ifName); err != nil { + h.logger.Printf("Warning: failed to stop listening on %s: %v", ifName, err) + } + } + if err := h.setupManager.TeardownInterface(ifName); err != nil { h.respondError(c, http.StatusInternalServerError, "Failed to teardown interface", err) return @@ -466,6 +545,13 @@ func (h *APIHandler) handleSetupAllInterfaces(c *gin.Context) { "error": err.Error(), } } else { + // Start listening if message listener is available + if h.messageListener != nil { + if err := h.messageListener.StartListening(ifName); err != nil { + h.logger.Printf("Warning: failed to start listening on %s: %v", ifName, err) + } + } + // Get interface state if state, err := h.setupManager.GetInterfaceState(ifName); err == nil { results[ifName] = map[string]interface{}{ @@ -511,6 +597,13 @@ func (h *APIHandler) handleTeardownAllInterfaces(c *gin.Context) { var teardownErrors []string for _, ifName := range interfaces { + // Stop listening if message listener is available + if h.messageListener != nil { + if err := h.messageListener.StopListening(ifName); err != nil { + h.logger.Printf("Warning: failed to stop listening on %s: %v", ifName, err) + } + } + if err := h.setupManager.TeardownInterface(ifName); err != nil { teardownErrors = append(teardownErrors, fmt.Sprintf("%s: %v", ifName, err)) results[ifName] = map[string]interface{}{ @@ -540,6 +633,230 @@ func (h *APIHandler) handleTeardownAllInterfaces(c *gin.Context) { } } +// ====== Message Listening Handlers (New) ====== + +// handleGetMessages returns all messages for a specific interface +func (h *APIHandler) handleGetMessages(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + messages, err := h.messageListener.GetMessages(ifName) + if err != nil { + h.respondError(c, http.StatusNotFound, "Failed to get messages", err) + return + } + + data := map[string]interface{}{ + "interface": ifName, + "messages": messages, + "count": len(messages), + "isListening": h.messageListener.IsListening(ifName), + } + + h.respondSuccess(c, "", data) +} + +// handleGetRecentMessages returns recent messages for a specific interface +func (h *APIHandler) handleGetRecentMessages(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + // Get count parameter (default: 10) + countStr := c.DefaultQuery("count", "10") + count, err := strconv.Atoi(countStr) + if err != nil || count <= 0 { + count = 10 + } + + messages, err := h.messageListener.GetRecentMessages(ifName, count) + if err != nil { + h.respondError(c, http.StatusNotFound, "Failed to get recent messages", err) + return + } + + data := map[string]interface{}{ + "interface": ifName, + "messages": messages, + "requestedCount": count, + "actualCount": len(messages), + "isListening": h.messageListener.IsListening(ifName), + } + + h.respondSuccess(c, "", data) +} + +// handleGetMessageStatistics returns message statistics for a specific interface +func (h *APIHandler) handleGetMessageStatistics(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + stats, err := h.messageListener.GetInterfaceStatistics(ifName) + if err != nil { + h.respondError(c, http.StatusNotFound, "Failed to get message statistics", err) + return + } + + stats["isListening"] = h.messageListener.IsListening(ifName) + + h.respondSuccess(c, "", stats) +} + +// handleClearMessages clears message buffer for a specific interface +func (h *APIHandler) handleClearMessages(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + if err := h.messageListener.ClearMessages(ifName); err != nil { + h.respondError(c, http.StatusNotFound, "Failed to clear messages", err) + return + } + + data := map[string]interface{}{ + "interface": ifName, + "status": "cleared", + } + + h.respondSuccess(c, fmt.Sprintf("Message buffer cleared for %s", ifName), data) +} + +// handleGetAllMessages returns messages for all interfaces +func (h *APIHandler) handleGetAllMessages(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + allMessages := h.messageListener.GetAllMessages() + + data := map[string]interface{}{ + "interfaces": allMessages, + "interfaceCount": len(allMessages), + "listeningInterfaces": h.messageListener.GetListeningInterfaces(), + } + + h.respondSuccess(c, "", data) +} + +// handleGetAllMessageStatistics returns message statistics for all interfaces +func (h *APIHandler) handleGetAllMessageStatistics(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + stats := h.messageListener.GetStatistics() + + data := map[string]interface{}{ + "statistics": stats, + "listeningInterfaces": h.messageListener.GetListeningInterfaces(), + } + + h.respondSuccess(c, "", data) +} + +// handleClearAllMessages clears message buffers for all interfaces +func (h *APIHandler) handleClearAllMessages(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + h.messageListener.ClearAllMessages() + + data := map[string]interface{}{ + "status": "all_cleared", + } + + h.respondSuccess(c, "All message buffers cleared", data) +} + +// handleGetListenStatus returns listening status for a specific interface +func (h *APIHandler) handleGetListenStatus(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + isListening := h.messageListener.IsListening(ifName) + + data := map[string]interface{}{ + "interface": ifName, + "isListening": isListening, + "status": func() string { + if isListening { + return "listening" + } + return "not_listening" + }(), + } + + // Add statistics if listening + if isListening { + if stats, err := h.messageListener.GetInterfaceStatistics(ifName); err == nil { + data["statistics"] = stats + } + } + + h.respondSuccess(c, "", data) +} + +// handleGetAllListenStatus returns listening status for all interfaces +func (h *APIHandler) handleGetAllListenStatus(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + listeningInterfaces := h.messageListener.GetListeningInterfaces() + allStats := h.messageListener.GetStatistics() + + data := map[string]interface{}{ + "listeningInterfaces": listeningInterfaces, + "listeningCount": len(listeningInterfaces), + "allStatistics": allStats, + } + + h.respondSuccess(c, "", data) +} + // ====== Helper methods for consistent response formatting ====== // respondSuccess sends a successful JSON response @@ -627,3 +944,93 @@ func RecoveryMiddleware(logger Logger) gin.HandlerFunc { }) }) } + +// handleStopListening stops message listening on a specific interface +func (h *APIHandler) handleStopListening(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + if err := h.messageListener.StopListening(ifName); err != nil { + h.respondError(c, http.StatusInternalServerError, "Failed to stop listening", err) + return + } + + data := map[string]interface{}{ + "interface": ifName, + "status": "stopped", + "isListening": false, + } + + h.respondSuccess(c, fmt.Sprintf("Stopped listening on %s", ifName), data) +} + +// handleGetListenStatus returns listening status for a specific interface +func (h *APIHandler) handleGetListenStatus(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + isListening := h.messageListener.IsListening(ifName) + + data := map[string]interface{}{ + "interface": ifName, + "isListening": isListening, + "status": func() string { + if isListening { + return "listening" + } + return "not_listening" + }(), + } + + // Add statistics if listening + if isListening { + if stats, err := h.messageListener.GetInterfaceStatistics(ifName); err == nil { + data["statistics"] = stats + } + } + + h.respondSuccess(c, "", data) +} + +// handleStartListening starts message listening on a specific interface +func (h *APIHandler) handleStartListening(c *gin.Context) { + if h.messageListener == nil { + h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) + return + } + + ifName := c.Param("interface") + if ifName == "" { + h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) + return + } + + if err := h.messageListener.StartListening(ifName); err != nil { + h.respondError(c, http.StatusInternalServerError, "Failed to start listening", err) + return + } + + data := map[string]interface{}{ + "interface": ifName, + "status": "listening", + "isListening": true, + } + + h.respondSuccess(c, fmt.Sprintf("Started listening on %s", ifName), data) +} diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..d7e1c25 --- /dev/null +++ b/listener.go @@ -0,0 +1,467 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + "unsafe" + + "golang.org/x/sys/unix" +) + +// CanMessageLog represents a logged CAN message +type CanMessageLog struct { + Interface string `json:"interface"` + ID uint32 `json:"id"` + Data []byte `json:"data"` + Length uint8 `json:"length"` + Timestamp time.Time `json:"timestamp"` + Direction string `json:"direction"` // "RX" for received messages +} + +// InterfaceMessageBuffer manages message history for a single interface +type InterfaceMessageBuffer struct { + interfaceName string + messages []CanMessageLog + maxSize int + mutex sync.RWMutex + totalReceived uint64 +} + +// NewInterfaceMessageBuffer creates a new message buffer for an interface +func NewInterfaceMessageBuffer(interfaceName string, maxSize int) *InterfaceMessageBuffer { + return &InterfaceMessageBuffer{ + interfaceName: interfaceName, + messages: make([]CanMessageLog, 0, maxSize), + maxSize: maxSize, + } +} + +// AddMessage adds a new message to the buffer +func (buf *InterfaceMessageBuffer) AddMessage(msg CanMessageLog) { + buf.mutex.Lock() + defer buf.mutex.Unlock() + + buf.totalReceived++ + + // Add message to buffer + buf.messages = append(buf.messages, msg) + + // Maintain buffer size limit + if len(buf.messages) > buf.maxSize { + // Remove oldest message + buf.messages = buf.messages[1:] + } +} + +// GetMessages returns a copy of all messages +func (buf *InterfaceMessageBuffer) GetMessages() []CanMessageLog { + buf.mutex.RLock() + defer buf.mutex.RUnlock() + + // Return a copy to avoid race conditions + result := make([]CanMessageLog, len(buf.messages)) + copy(result, buf.messages) + return result +} + +// GetRecentMessages returns the last N messages +func (buf *InterfaceMessageBuffer) GetRecentMessages(count int) []CanMessageLog { + buf.mutex.RLock() + defer buf.mutex.RUnlock() + + if count <= 0 { + return []CanMessageLog{} + } + + if count >= len(buf.messages) { + // Return all messages + result := make([]CanMessageLog, len(buf.messages)) + copy(result, buf.messages) + return result + } + + // Return last N messages + start := len(buf.messages) - count + result := make([]CanMessageLog, count) + copy(result, buf.messages[start:]) + return result +} + +// GetStatistics returns buffer statistics +func (buf *InterfaceMessageBuffer) GetStatistics() map[string]interface{} { + buf.mutex.RLock() + defer buf.mutex.RUnlock() + + return map[string]interface{}{ + "interface": buf.interfaceName, + "totalReceived": buf.totalReceived, + "bufferedCount": len(buf.messages), + "maxBufferSize": buf.maxSize, + "bufferUsage": float64(len(buf.messages)) / float64(buf.maxSize) * 100, + } +} + +// Clear clears all messages from the buffer +func (buf *InterfaceMessageBuffer) Clear() { + buf.mutex.Lock() + defer buf.mutex.Unlock() + + buf.messages = buf.messages[:0] // Clear slice but keep capacity + buf.totalReceived = 0 +} + +// CanMessageListener manages listening to CAN messages on multiple interfaces +type CanMessageListener struct { + buffers map[string]*InterfaceMessageBuffer + buffersMutex sync.RWMutex + listeners map[string]*interfaceListener + maxMessages int + logger Logger + ctx context.Context + cancel context.CancelFunc +} + +// interfaceListener manages listening for a single interface +type interfaceListener struct { + interfaceName string + socket int + isRunning bool + stopChan chan bool + buffer *InterfaceMessageBuffer + logger Logger +} + +// NewCanMessageListener creates a new CAN message listener +func NewCanMessageListener(maxMessages int, logger Logger) *CanMessageListener { + ctx, cancel := context.WithCancel(context.Background()) + return &CanMessageListener{ + buffers: make(map[string]*InterfaceMessageBuffer), + listeners: make(map[string]*interfaceListener), + maxMessages: maxMessages, + logger: logger, + ctx: ctx, + cancel: cancel, + } +} + +// StartListening starts listening on a specific CAN interface +func (cml *CanMessageListener) StartListening(interfaceName string) error { + cml.buffersMutex.Lock() + defer cml.buffersMutex.Unlock() + + // Check if already listening + if listener, exists := cml.listeners[interfaceName]; exists && listener.isRunning { + cml.logger.Printf("๐Ÿ“ก Already listening on %s", interfaceName) + return nil + } + + cml.logger.Printf("๐Ÿ“ก Starting CAN message listener for %s", interfaceName) + + // Create message buffer + buffer := NewInterfaceMessageBuffer(interfaceName, cml.maxMessages) + cml.buffers[interfaceName] = buffer + + // Create socket for listening + socket, err := unix.Socket(unix.AF_CAN, unix.SOCK_RAW, unix.CAN_RAW) + if err != nil { + return fmt.Errorf("failed to create listening socket: %w", err) + } + + // Get interface index + var ifr ifreq + copy(ifr.Name[:], interfaceName) + _, _, errno := unix.Syscall( + unix.SYS_IOCTL, + uintptr(socket), + uintptr(unix.SIOCGIFINDEX), + uintptr(unsafe.Pointer(&ifr)), + ) + if errno != 0 { + unix.Close(socket) + return fmt.Errorf("failed to get interface index: %v", errno) + } + + // Bind socket to interface + addr := &unix.SockaddrCAN{Ifindex: int(ifr.Index)} + if err := unix.Bind(socket, addr); err != nil { + unix.Close(socket) + return fmt.Errorf("failed to bind listening socket: %w", err) + } + + // Create listener + listener := &interfaceListener{ + interfaceName: interfaceName, + socket: socket, + isRunning: false, + stopChan: make(chan bool, 1), + buffer: buffer, + logger: cml.logger, + } + + cml.listeners[interfaceName] = listener + + // Start listening goroutine + go cml.listenOnInterface(listener) + + cml.logger.Printf("โœ… Started listening on %s", interfaceName) + return nil +} + +// StopListening stops listening on a specific interface +func (cml *CanMessageListener) StopListening(interfaceName string) error { + cml.buffersMutex.Lock() + defer cml.buffersMutex.Unlock() + + listener, exists := cml.listeners[interfaceName] + if !exists { + return fmt.Errorf("not listening on interface %s", interfaceName) + } + + cml.logger.Printf("๐Ÿ›‘ Stopping listener for %s", interfaceName) + + // Signal stop + if listener.isRunning { + listener.stopChan <- true + } + + // Close socket + if err := unix.Close(listener.socket); err != nil { + cml.logger.Printf("โš ๏ธ Warning: failed to close listening socket for %s: %v", interfaceName, err) + } + + // Remove from listeners map + delete(cml.listeners, interfaceName) + + cml.logger.Printf("โœ… Stopped listening on %s", interfaceName) + return nil +} + +// listenOnInterface performs the actual message listening for an interface +func (cml *CanMessageListener) listenOnInterface(listener *interfaceListener) { + listener.isRunning = true + defer func() { + listener.isRunning = false + }() + + cml.logger.Printf("๐Ÿ‘‚ Listening thread started for %s", listener.interfaceName) + + buffer := make([]byte, 16) // Size of CAN frame + + for { + select { + case <-listener.stopChan: + cml.logger.Printf("๐Ÿ›‘ Stop signal received for %s", listener.interfaceName) + return + case <-cml.ctx.Done(): + cml.logger.Printf("๐Ÿ›‘ Context cancelled for %s", listener.interfaceName) + return + default: + // Set read timeout to avoid blocking indefinitely + tv := unix.Timeval{Sec: 1, Usec: 0} + if err := unix.SetsockoptTimeval(listener.socket, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &tv); err != nil { + cml.logger.Printf("โš ๏ธ Failed to set socket timeout for %s: %v", listener.interfaceName, err) + } + + // Try to read CAN frame + n, err := unix.Read(listener.socket, buffer) + if err != nil { + // Check if it's a timeout (expected) or real error + if errno, ok := err.(unix.Errno); ok && errno == unix.EAGAIN { + continue // Timeout, continue listening + } + cml.logger.Printf("โŒ Read error on %s: %v", listener.interfaceName, err) + continue + } + + if n >= 16 { // Minimum CAN frame size + // Parse CAN frame + frame := (*CanFrame)(unsafe.Pointer(&buffer[0])) + + // Create message log entry + data := make([]byte, frame.Length) + copy(data, frame.Data[:frame.Length]) + + msg := CanMessageLog{ + Interface: listener.interfaceName, + ID: frame.ID, + Data: data, + Length: frame.Length, + Timestamp: time.Now(), + Direction: "RX", + } + + // Add to buffer + listener.buffer.AddMessage(msg) + + // Log received message (with rate limiting to avoid spam) + if listener.buffer.totalReceived%100 == 1 || listener.buffer.totalReceived <= 10 { + cml.logger.Printf("๐Ÿ“จ %s RX: ID=0x%X, Data=[% X], Length=%d", + listener.interfaceName, msg.ID, msg.Data, msg.Length) + } + } + } + } +} + +// GetMessages returns messages for a specific interface +func (cml *CanMessageListener) GetMessages(interfaceName string) ([]CanMessageLog, error) { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + buffer, exists := cml.buffers[interfaceName] + if !exists { + return nil, fmt.Errorf("no message buffer for interface %s", interfaceName) + } + + return buffer.GetMessages(), nil +} + +// GetRecentMessages returns the last N messages for a specific interface +func (cml *CanMessageListener) GetRecentMessages(interfaceName string, count int) ([]CanMessageLog, error) { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + buffer, exists := cml.buffers[interfaceName] + if !exists { + return nil, fmt.Errorf("no message buffer for interface %s", interfaceName) + } + + return buffer.GetRecentMessages(count), nil +} + +// GetAllMessages returns messages for all interfaces +func (cml *CanMessageListener) GetAllMessages() map[string][]CanMessageLog { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + result := make(map[string][]CanMessageLog) + for ifName, buffer := range cml.buffers { + result[ifName] = buffer.GetMessages() + } + return result +} + +// GetStatistics returns statistics for all interfaces +func (cml *CanMessageListener) GetStatistics() map[string]interface{} { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + result := make(map[string]interface{}) + for ifName, buffer := range cml.buffers { + result[ifName] = buffer.GetStatistics() + } + return result +} + +// GetInterfaceStatistics returns statistics for a specific interface +func (cml *CanMessageListener) GetInterfaceStatistics(interfaceName string) (map[string]interface{}, error) { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + buffer, exists := cml.buffers[interfaceName] + if !exists { + return nil, fmt.Errorf("no message buffer for interface %s", interfaceName) + } + + return buffer.GetStatistics(), nil +} + +// ClearMessages clears message buffer for a specific interface +func (cml *CanMessageListener) ClearMessages(interfaceName string) error { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + buffer, exists := cml.buffers[interfaceName] + if !exists { + return fmt.Errorf("no message buffer for interface %s", interfaceName) + } + + buffer.Clear() + cml.logger.Printf("๐Ÿงน Cleared message buffer for %s", interfaceName) + return nil +} + +// ClearAllMessages clears message buffers for all interfaces +func (cml *CanMessageListener) ClearAllMessages() { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + for ifName, buffer := range cml.buffers { + buffer.Clear() + cml.logger.Printf("๐Ÿงน Cleared message buffer for %s", ifName) + } +} + +// IsListening checks if currently listening on an interface +func (cml *CanMessageListener) IsListening(interfaceName string) bool { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + listener, exists := cml.listeners[interfaceName] + return exists && listener.isRunning +} + +// GetListeningInterfaces returns list of interfaces currently being listened to +func (cml *CanMessageListener) GetListeningInterfaces() []string { + cml.buffersMutex.RLock() + defer cml.buffersMutex.RUnlock() + + var interfaces []string + for ifName, listener := range cml.listeners { + if listener.isRunning { + interfaces = append(interfaces, ifName) + } + } + return interfaces +} + +// Shutdown stops all listeners and cleans up resources +func (cml *CanMessageListener) Shutdown() error { + cml.logger.Printf("๐Ÿ›‘ Shutting down CAN message listener...") + + // Cancel context + cml.cancel() + + // Stop all listeners + cml.buffersMutex.Lock() + defer cml.buffersMutex.Unlock() + + var errors []string + for ifName := range cml.listeners { + if err := cml.stopListeningUnsafe(ifName); err != nil { + errors = append(errors, fmt.Sprintf("%s: %v", ifName, err)) + } + } + + if len(errors) > 0 { + return fmt.Errorf("errors during shutdown: %v", errors) + } + + cml.logger.Printf("โœ… CAN message listener shutdown complete") + return nil +} + +// stopListeningUnsafe stops listening without acquiring mutex (internal use) +func (cml *CanMessageListener) stopListeningUnsafe(interfaceName string) error { + listener, exists := cml.listeners[interfaceName] + if !exists { + return fmt.Errorf("not listening on interface %s", interfaceName) + } + + // Signal stop + if listener.isRunning { + listener.stopChan <- true + } + + // Close socket + if err := unix.Close(listener.socket); err != nil { + cml.logger.Printf("โš ๏ธ Warning: failed to close listening socket for %s: %v", interfaceName, err) + } + + // Remove from listeners map + delete(cml.listeners, interfaceName) + + return nil +} diff --git a/main.go b/main.go index 4ee87e4..ef96e56 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ type Service struct { setupManager *InterfaceSetupManager interfaceManager *InterfaceManager messageSender *MessageSender + messageListener *CanMessageListener watchdog *Watchdog monitor *Monitor apiHandler *APIHandler @@ -73,6 +74,12 @@ func (s *Service) Initialize() error { // We continue even if some interfaces failed } + // Start message listening for all active interfaces + if err := s.startMessageListening(); err != nil { + s.logger.Printf("Warning: message listening issues: %v", err) + // We continue even if some listeners failed to start + } + // Setup HTTP server s.setupHTTPServer() @@ -102,6 +109,10 @@ func (s *Service) initializeComponents() error { // Create message sender s.messageSender = NewMessageSender(s.interfaceManager, s.configProvider, socketProvider, s.logger) + // Create message listener (new component) + maxMessages := 100 // Configure maximum messages per interface + s.messageListener = NewCanMessageListener(maxMessages, s.logger) + // Create watchdog watchdogConfig := DefaultWatchdogConfig() s.watchdog = NewWatchdog(s.interfaceManager, watchdogConfig, s.logger) @@ -109,8 +120,14 @@ func (s *Service) initializeComponents() error { // Create monitor s.monitor = NewMonitor(s.interfaceManager, s.watchdog, s.configProvider) - // Create API handler with setup manager - s.apiHandler = NewAPIHandlerWithSetup(s.messageSender, s.monitor, s.setupManager, s.logger) + // Create API handler with setup manager and message listener + s.apiHandler = NewAPIHandlerWithSetupAndListener( + s.messageSender, + s.monitor, + s.setupManager, + s.messageListener, + s.logger, + ) return nil } @@ -162,6 +179,55 @@ func (s *Service) setupCanInterfaces() error { return nil } +// startMessageListening starts message listening for all active interfaces +func (s *Service) startMessageListening() error { + s.logger.Printf("๐Ÿ‘‚ Starting message listening for active interfaces...") + + var listeningErrors []string + successCount := 0 + + // Get all active interfaces from interface manager + activeInterfaces := s.interfaceManager.GetAllInterfaces() + + for ifName := range activeInterfaces { + s.logger.Printf("๐Ÿ‘‚ Starting listener for %s...", ifName) + + err := s.messageListener.StartListening(ifName) + if err != nil { + listeningErrors = append(listeningErrors, fmt.Sprintf("%s: %v", ifName, err)) + s.logger.Printf("โŒ Failed to start listening on %s: %v", ifName, err) + } else { + successCount++ + s.logger.Printf("โœ… Successfully started listening on %s", ifName) + } + } + + // Also try to start listening on configured ports that might become active later + for _, ifName := range s.config.CanPorts { + // Skip if already handled above + if _, exists := activeInterfaces[ifName]; exists { + continue + } + + s.logger.Printf("๐Ÿ‘‚ Attempting to start listener for configured interface %s...", ifName) + err := s.messageListener.StartListening(ifName) + if err != nil { + s.logger.Printf("โš ๏ธ Warning: could not start listening on %s (interface may not be ready): %v", ifName, err) + } else { + successCount++ + s.logger.Printf("โœ… Successfully started listening on %s", ifName) + } + } + + s.logger.Printf("๐ŸŽฏ Successfully started listening on %d interfaces", successCount) + + if len(listeningErrors) > 0 { + return fmt.Errorf("partial listening startup failure: %v", listeningErrors) + } + + return nil +} + // setupHTTPServer configures the HTTP server func (s *Service) setupHTTPServer() { // Set to production mode @@ -208,6 +274,7 @@ func (s *Service) Start(ctx context.Context) error { }() s.logger.Printf("โœ… CAN Communication Service started successfully") + s.logger.Printf("๐Ÿ“ก Message listening active on: %v", s.messageListener.GetListeningInterfaces()) return nil } @@ -215,6 +282,14 @@ func (s *Service) Start(ctx context.Context) error { func (s *Service) Stop(ctx context.Context) error { s.logger.Printf("๐Ÿ›‘ Stopping CAN Communication Service...") + // Stop message listening first + if s.messageListener != nil { + s.logger.Printf("๐Ÿ›‘ Stopping message listener...") + if err := s.messageListener.Shutdown(); err != nil { + s.logger.Printf("Warning: failed to stop message listener: %v", err) + } + } + // Stop watchdog if err := s.watchdog.Stop(); err != nil { s.logger.Printf("Warning: failed to stop watchdog: %v", err) @@ -283,12 +358,85 @@ func (s *Service) GetStatus() map[string]interface{} { setupStatus["interfaceStates"] = interfaceStates } + // Add message listener status + messageListenerStatus := make(map[string]interface{}) + if s.messageListener != nil { + messageListenerStatus["listeningInterfaces"] = s.messageListener.GetListeningInterfaces() + messageListenerStatus["statistics"] = s.messageListener.GetStatistics() + } + return map[string]interface{}{ "status": "running", "uptime": systemStatus.SystemUptime.String(), "activeInterfaces": systemStatus.ActiveInterfaces, "watchdogRunning": systemStatus.WatchdogStatus.Running, "setup": setupStatus, + "messageListener": messageListenerStatus, + } +} + +// RestartInterfaceWithListening restarts an interface and its message listening +func (s *Service) RestartInterfaceWithListening(ifName string) error { + s.logger.Printf("๐Ÿ”„ Restarting interface %s with message listening...", ifName) + + // Stop listening first + if s.messageListener != nil { + if err := s.messageListener.StopListening(ifName); err != nil { + s.logger.Printf("โš ๏ธ Warning: failed to stop listening on %s: %v", ifName, err) + } + } + + // Reset the interface + if err := s.setupManager.ResetInterface(ifName); err != nil { + return fmt.Errorf("failed to reset interface %s: %w", ifName, err) + } + + // Wait a moment for interface to stabilize + time.Sleep(1 * time.Second) + + // Restart listening + if s.messageListener != nil { + if err := s.messageListener.StartListening(ifName); err != nil { + s.logger.Printf("โš ๏ธ Warning: failed to restart listening on %s: %v", ifName, err) + return fmt.Errorf("interface reset successful but failed to restart listening: %w", err) + } + } + + s.logger.Printf("โœ… Successfully restarted interface %s with message listening", ifName) + return nil +} + +// GetMessageSummary returns a summary of message activity +func (s *Service) GetMessageSummary() map[string]interface{} { + if s.messageListener == nil { + return map[string]interface{}{ + "status": "message_listener_not_available", + } + } + + allStats := s.messageListener.GetStatistics() + listeningInterfaces := s.messageListener.GetListeningInterfaces() + + totalReceived := uint64(0) + totalBuffered := 0 + + for _, stats := range allStats { + if statsMap, ok := stats.(map[string]interface{}); ok { + if totalRx, ok := statsMap["totalReceived"].(uint64); ok { + totalReceived += totalRx + } + if buffered, ok := statsMap["bufferedCount"].(int); ok { + totalBuffered += buffered + } + } + } + + return map[string]interface{}{ + "listeningInterfaceCount": len(listeningInterfaces), + "listeningInterfaces": listeningInterfaces, + "totalMessagesReceived": totalReceived, + "totalMessagesBuffered": totalBuffered, + "interfaceStatistics": allStats, } } @@ -317,6 +465,18 @@ func main() { log.Fatalf("Failed to start service: %v", err) } + // Print startup summary + status := service.GetStatus() + log.Printf("๐ŸŽฏ Service startup summary:") + log.Printf(" - Active interfaces: %v", status["activeInterfaces"]) + log.Printf(" - Watchdog running: %v", status["watchdogRunning"]) + + if messageListener, ok := status["messageListener"].(map[string]interface{}); ok { + if listeningInterfaces, ok := messageListener["listeningInterfaces"].([]string); ok { + log.Printf(" - Listening on: %v", listeningInterfaces) + } + } + // Wait for interrupt signal for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) From 2a4084c3295667aece1d9064437299967d407b11 Mon Sep 17 00:00:00 2001 From: Su Yang Date: Tue, 10 Jun 2025 21:24:22 +0800 Subject: [PATCH 2/2] fix: rm already declared fn --- api.go | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/api.go b/api.go index d28d72d..e4e2393 100644 --- a/api.go +++ b/api.go @@ -802,42 +802,6 @@ func (h *APIHandler) handleClearAllMessages(c *gin.Context) { h.respondSuccess(c, "All message buffers cleared", data) } -// handleGetListenStatus returns listening status for a specific interface -func (h *APIHandler) handleGetListenStatus(c *gin.Context) { - if h.messageListener == nil { - h.respondError(c, http.StatusServiceUnavailable, "Message listener not available", nil) - return - } - - ifName := c.Param("interface") - if ifName == "" { - h.respondError(c, http.StatusBadRequest, "Interface name is required", nil) - return - } - - isListening := h.messageListener.IsListening(ifName) - - data := map[string]interface{}{ - "interface": ifName, - "isListening": isListening, - "status": func() string { - if isListening { - return "listening" - } - return "not_listening" - }(), - } - - // Add statistics if listening - if isListening { - if stats, err := h.messageListener.GetInterfaceStatistics(ifName); err == nil { - data["statistics"] = stats - } - } - - h.respondSuccess(c, "", data) -} - // handleGetAllListenStatus returns listening status for all interfaces func (h *APIHandler) handleGetAllListenStatus(c *gin.Context) { if h.messageListener == nil {