From f51475596f06b97171af7372ca2e0a323595f3c3 Mon Sep 17 00:00:00 2001 From: Jeeva Kandasamy Date: Mon, 15 Sep 2025 11:15:23 +0530 Subject: [PATCH] enhance queue Signed-off-by: Jeeva Kandasamy --- pkg/service/deletion/service.go | 15 +- pkg/service/forward_payload/service.go | 9 +- pkg/service/gateway/listener.go | 5 +- pkg/service/gateway_msg_processor/service.go | 7 +- pkg/service/handler/message_listerner.go | 13 +- pkg/service/handler/service_listener.go | 6 +- pkg/service/resource/service.go | 3 +- pkg/service/scheduler/listener.go | 3 +- pkg/service/system_jobs/service_listener.go | 7 +- pkg/service/task/listener_events.go | 16 +- pkg/service/task/listener_tasks.go | 3 +- pkg/service/virtual_assistant/listener.go | 5 +- pkg/service/websocket/events_listener.go | 11 +- pkg/utils/queue/bonded_queue.go | 173 +++- pkg/utils/queue/bonded_queue_test.go | 767 ++++++++++++++++++ pkg/utils/queue/utils.go | 18 +- pkg/utils/queue/utils_test.go | 756 +++++++++++++++++ .../provider/mysensors_v2/event_listener.go | 7 +- plugin/gateway/provider/service.go | 26 +- 19 files changed, 1772 insertions(+), 78 deletions(-) create mode 100644 pkg/utils/queue/bonded_queue_test.go create mode 100644 pkg/utils/queue/utils_test.go diff --git a/pkg/service/deletion/service.go b/pkg/service/deletion/service.go index 64ed24b..12656de 100644 --- a/pkg/service/deletion/service.go +++ b/pkg/service/deletion/service.go @@ -92,18 +92,18 @@ func (svc *DeletionService) onEventReceive(busData *busTY.BusData) { } } -func (svc *DeletionService) processEvent(item interface{}) { +func (svc *DeletionService) processEvent(item interface{}) error { busData := item.(*busTY.BusData) event := &eventTY.Event{} err := busData.LoadData(event) if err != nil { svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err)) - return + return nil } // if it is not a deletion event, return from here if event.Type != eventTY.TypeDeleted { - return + return nil } svc.logger.Debug("received an deletion event", zap.Any("event", event)) @@ -116,7 +116,7 @@ func (svc *DeletionService) processEvent(item interface{}) { err = event.LoadEntity(gateway) if err != nil { svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err)) - return + return nil } svc.deleteNodes(gateway) @@ -125,7 +125,7 @@ func (svc *DeletionService) processEvent(item interface{}) { err = event.LoadEntity(node) if err != nil { svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err)) - return + return nil } svc.deleteSources(node) @@ -134,14 +134,15 @@ func (svc *DeletionService) processEvent(item interface{}) { err = event.LoadEntity(source) if err != nil { svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err)) - return + return nil } svc.deleteFields(source) default: // do not proceed further - return + return nil } + return nil } // deletes nodes diff --git a/pkg/service/forward_payload/service.go b/pkg/service/forward_payload/service.go index 8bf07ca..46985c4 100644 --- a/pkg/service/forward_payload/service.go +++ b/pkg/service/forward_payload/service.go @@ -127,13 +127,13 @@ func (svc *ForwardPayloadService) Close() error { } // processEvent from the queue -func (svc *ForwardPayloadService) processEvent(item interface{}) { +func (svc *ForwardPayloadService) processEvent(item interface{}) error { field := item.(*field.Field) quickID, err := quickIdUtils.GetQuickID(*field) if err != nil { svc.logger.Error("unable to get quick id", zap.Error(err), zap.String("gateway", field.GatewayID), zap.String("node", field.NodeID), zap.String("source", field.SourceID), zap.String("field", field.FieldID)) - return + return nil } // fetch mapped filed for this event @@ -145,11 +145,11 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) { response, err := svc.api.ForwardPayload().List(filters, pagination) if err != nil { svc.logger.Error("error getting mapping data from database", zap.Error(err)) - return + return nil } if response.Count == 0 { - return + return nil } svc.logger.Debug("Starting data forwarding", zap.Any("data", field)) @@ -167,4 +167,5 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) { } } } + return nil } diff --git a/pkg/service/gateway/listener.go b/pkg/service/gateway/listener.go index 079edc2..eaa7124 100644 --- a/pkg/service/gateway/listener.go +++ b/pkg/service/gateway/listener.go @@ -68,7 +68,7 @@ func (svc *GatewayService) onEvent(event *busTY.BusData) { } // processEvent from the queue -func (svc *GatewayService) processEvent(event interface{}) { +func (svc *GatewayService) processEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("Processing a request", zap.Any("event", reqEvent)) @@ -92,7 +92,7 @@ func (svc *GatewayService) processEvent(event interface{}) { if err != nil { svc.logger.Error("error on stopping a service", zap.Error(err), zap.String("id", reqEvent.ID)) } - return + return nil } gwCfg := svc.getGatewayConfig(reqEvent) if gwCfg != nil { @@ -129,6 +129,7 @@ func (svc *GatewayService) processEvent(event interface{}) { default: svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) } + return nil } func (svc *GatewayService) getGatewayConfig(reqEvent *rsTY.ServiceEvent) *gwTY.Config { diff --git a/pkg/service/gateway_msg_processor/service.go b/pkg/service/gateway_msg_processor/service.go index 3efb807..ed9a70e 100644 --- a/pkg/service/gateway_msg_processor/service.go +++ b/pkg/service/gateway_msg_processor/service.go @@ -131,7 +131,7 @@ func (svc *MessageProcessor) Close() error { } // processMessage from the queue -func (svc *MessageProcessor) processMessage(item interface{}) { +func (svc *MessageProcessor) processMessage(item interface{}) error { msg := item.(*msgTY.Message) svc.logger.Debug("Starting Message Processing", zap.Any("message", msg)) @@ -142,6 +142,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) { err := svc.setFieldData(msg) if err != nil { svc.logger.Error("error on field data set", zap.Error(err)) + return err // Requeue on error } // update last seen svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp) @@ -151,12 +152,14 @@ func (svc *MessageProcessor) processMessage(item interface{}) { err := svc.requestFieldData(msg) if err != nil { svc.logger.Error("error on field data request", zap.Error(err)) + return err // Requeue on error } case msgTY.TypePresentation: // update source data, like name or other details err := svc.updateSourceDetail(msg) if err != nil { svc.logger.Error("error on source data update", zap.Error(err)) + return err // Requeue on error } // update last seen svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp) @@ -172,6 +175,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) { err := svc.updateNodeData(msg) if err != nil { svc.logger.Error("error on node data update", zap.Error(err)) + return err // Requeue on error } // node last seen managed in updateNodeData @@ -194,6 +198,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) { } svc.logger.Debug("message processed", zap.String("timeTaken", time.Since(msg.Timestamp).String()), zap.Any("message", msg)) + return nil } // update node detail diff --git a/pkg/service/handler/message_listerner.go b/pkg/service/handler/message_listerner.go index dcf499b..8c3626d 100644 --- a/pkg/service/handler/message_listerner.go +++ b/pkg/service/handler/message_listerner.go @@ -47,7 +47,7 @@ func (svc *HandlerService) closeMessageListener() { svc.messageQueue.Close() } -func (svc *HandlerService) processHandlerMessage(item interface{}) { +func (svc *HandlerService) processHandlerMessage(item interface{}) error { msg := item.(*handlerTY.MessageWrapper) start := time.Now() @@ -56,16 +56,18 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) { handler := svc.store.Get(msg.ID) if handler == nil { svc.logger.Info("handler not available", zap.Any("handlerID", msg.ID), zap.Any("availableHandlers", svc.store.ListIDs())) - return + return nil // Don't requeue if handler not available } state := handler.State() err := handler.Post(msg.Data) if err != nil { - // if err == handlerTY.ErrReQueue { - // // TODO: requeue and try again - // } + if err == handlerTY.ErrReQueue { + // Requeue the message to try again + svc.logger.Info("requeuing message", zap.Any("handlerID", msg.ID)) + return err + } svc.logger.Warn("error from handler", zap.Any("handlerID", msg.ID), zap.Error(err)) state.Status = types.StatusError state.Message = err.Error() @@ -76,4 +78,5 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) { state.Since = time.Now() busUtils.SetHandlerState(svc.logger, svc.bus, msg.ID, *state) + return nil } diff --git a/pkg/service/handler/service_listener.go b/pkg/service/handler/service_listener.go index 4ad44cf..bc855b0 100644 --- a/pkg/service/handler/service_listener.go +++ b/pkg/service/handler/service_listener.go @@ -138,12 +138,13 @@ func (svc *HandlerService) onServiceEvent(event *busTY.BusData) { } // postProcessServiceEvent from the queue -func (svc *HandlerService) postProcessServiceEvent(event interface{}) { +func (svc *HandlerService) postProcessServiceEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("processing a request", zap.Any("event", reqEvent)) if reqEvent.Type != rsTY.TypeHandler { svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent)) + return nil } switch reqEvent.Command { @@ -162,7 +163,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) { if err != nil { svc.logger.Error("error on stopping a service", zap.Error(err)) } - return + return nil } cfg := svc.getConfig(reqEvent) if cfg != nil { @@ -187,6 +188,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) { default: svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) } + return nil } func (svc *HandlerService) getConfig(reqEvent *rsTY.ServiceEvent) *handlerTY.Config { diff --git a/pkg/service/resource/service.go b/pkg/service/resource/service.go index 93cf362..068a630 100644 --- a/pkg/service/resource/service.go +++ b/pkg/service/resource/service.go @@ -113,7 +113,7 @@ func (svc *ResourceService) onEvent(data *busTY.BusData) { } // processEvent from the queue -func (svc *ResourceService) processEvent(item interface{}) { +func (svc *ResourceService) processEvent(item interface{}) error { request := item.(*rsTY.ServiceEvent) svc.logger.Debug("processing an event", zap.Any("event", request)) start := time.Now() @@ -170,6 +170,7 @@ func (svc *ResourceService) processEvent(item interface{}) { svc.logger.Warn("unknown event type", zap.Any("event", request)) } svc.logger.Debug("completed a resource service", zap.String("timeTaken", time.Since(start).String()), zap.Any("data", request)) + return nil } func (svc *ResourceService) postResponse(topic string, response *rsTY.ServiceEvent) error { diff --git a/pkg/service/scheduler/listener.go b/pkg/service/scheduler/listener.go index f783b37..9594728 100644 --- a/pkg/service/scheduler/listener.go +++ b/pkg/service/scheduler/listener.go @@ -70,7 +70,7 @@ func (svc *SchedulerService) onServiceEvent(event *busTY.BusData) { } // processServiceEvent from the queue -func (svc *SchedulerService) processServiceEvent(event interface{}) { +func (svc *SchedulerService) processServiceEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("processing a request", zap.Any("event", reqEvent)) @@ -105,6 +105,7 @@ func (svc *SchedulerService) processServiceEvent(event interface{}) { default: svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) } + return nil } func (svc *SchedulerService) getConfig(reqEvent *rsTY.ServiceEvent) *schedulerTY.Config { diff --git a/pkg/service/system_jobs/service_listener.go b/pkg/service/system_jobs/service_listener.go index 27e2530..4ecb51a 100644 --- a/pkg/service/system_jobs/service_listener.go +++ b/pkg/service/system_jobs/service_listener.go @@ -52,18 +52,18 @@ func (svc *SystemJobsService) onEvent(event *busTY.BusData) { } // processEvent from the queue -func (svc *SystemJobsService) processEvent(event interface{}) { +func (svc *SystemJobsService) processEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("processing a request", zap.Any("event", reqEvent)) if reqEvent.Type != rsTY.TypeSystemJobs { svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent)) - return + return nil } if reqEvent.Command != rsTY.CommandReload { svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) - return + return nil } switch reqEvent.Data { @@ -77,4 +77,5 @@ func (svc *SystemJobsService) processEvent(event interface{}) { default: // NOOP } + return nil } diff --git a/pkg/service/task/listener_events.go b/pkg/service/task/listener_events.go index 52ed577..17be6fa 100644 --- a/pkg/service/task/listener_events.go +++ b/pkg/service/task/listener_events.go @@ -47,14 +47,14 @@ func (svc *TaskService) onEventReceive(busData *busTY.BusData) { } } -func (svc *TaskService) processPreEvent(item interface{}) { +func (svc *TaskService) processPreEvent(item interface{}) error { busData := item.(*busTY.BusData) event := &eventTY.Event{} err := busData.LoadData(event) if err != nil { svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err)) - return + return nil } var out interface{} @@ -78,13 +78,13 @@ func (svc *TaskService) processPreEvent(item interface{}) { default: // return do not proceed further - return + return nil } err = event.LoadEntity(out) if err != nil { svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err)) - return + return nil } event.Entity = out @@ -92,7 +92,7 @@ func (svc *TaskService) processPreEvent(item interface{}) { err = svc.resourcePreProcessor(resourceWrapper) if err != nil { svc.logger.Error("error on executing a resource", zap.Any("resource", resourceWrapper), zap.Error(err)) - return + return err } if len(resourceWrapper.Tasks) > 0 { @@ -101,6 +101,7 @@ func (svc *TaskService) processPreEvent(item interface{}) { svc.logger.Error("failed to post selected tasks on post processor queue") } } + return nil } func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error { @@ -119,11 +120,11 @@ func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error { return nil } -func (svc *TaskService) resourcePostProcessor(item interface{}) { +func (svc *TaskService) resourcePostProcessor(item interface{}) error { evntWrapper, ok := item.(*eventWrapper) if !ok { svc.logger.Warn("supplied item is not resourceWrapper", zap.Any("item", item)) - return + return nil } svc.logger.Debug("resourceWrapper received", zap.String("entityType", evntWrapper.Event.EntityType)) @@ -132,4 +133,5 @@ func (svc *TaskService) resourcePostProcessor(item interface{}) { task := evntWrapper.Tasks[index] svc.executeTask(&task, evntWrapper) } + return nil } diff --git a/pkg/service/task/listener_tasks.go b/pkg/service/task/listener_tasks.go index a54338a..d1c5c26 100644 --- a/pkg/service/task/listener_tasks.go +++ b/pkg/service/task/listener_tasks.go @@ -76,7 +76,7 @@ func (svc *TaskService) onServiceEvent(busData *busTY.BusData) { } // processServiceEvent from the queue -func (svc *TaskService) processServiceEvent(event interface{}) { +func (svc *TaskService) processServiceEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("processing a request", zap.Any("event", reqEvent)) @@ -116,6 +116,7 @@ func (svc *TaskService) processServiceEvent(event interface{}) { default: svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) } + return nil } func (svc *TaskService) getConfig(reqEvent *rsTY.ServiceEvent) *taskTY.Config { diff --git a/pkg/service/virtual_assistant/listener.go b/pkg/service/virtual_assistant/listener.go index 608d6de..7b73080 100644 --- a/pkg/service/virtual_assistant/listener.go +++ b/pkg/service/virtual_assistant/listener.go @@ -27,7 +27,7 @@ func (svc *VirtualAssistantService) onEvent(event *busTY.BusData) { } // processEvent from the queue -func (svc *VirtualAssistantService) processEvent(event interface{}) { +func (svc *VirtualAssistantService) processEvent(event interface{}) error { reqEvent := event.(*rsTY.ServiceEvent) svc.logger.Debug("processing a request", zap.Any("event", reqEvent)) @@ -51,7 +51,7 @@ func (svc *VirtualAssistantService) processEvent(event interface{}) { if err != nil { svc.logger.Error("error on stopping a service", zap.Error(err), zap.String("id", reqEvent.ID)) } - return + return nil } gwCfg := svc.getConfig(reqEvent) if gwCfg != nil { @@ -82,6 +82,7 @@ func (svc *VirtualAssistantService) processEvent(event interface{}) { default: svc.logger.Warn("unsupported command", zap.Any("event", reqEvent)) } + return nil } func (svc *VirtualAssistantService) getConfig(reqEvent *rsTY.ServiceEvent) *vaTY.Config { diff --git a/pkg/service/websocket/events_listener.go b/pkg/service/websocket/events_listener.go index f87a9da..2e73240 100644 --- a/pkg/service/websocket/events_listener.go +++ b/pkg/service/websocket/events_listener.go @@ -38,10 +38,10 @@ func (svc *WebsocketService) onEventReceive(data *busTY.BusData) { } } -func (svc *WebsocketService) processEvent(item interface{}) { +func (svc *WebsocketService) processEvent(item interface{}) error { // if there is no clients, just ignore the event if svc.store.getSize() == 0 { - return + return nil } data := item.(*busTY.BusData) @@ -50,7 +50,7 @@ func (svc *WebsocketService) processEvent(item interface{}) { err := data.LoadData(event) if err != nil { svc.logger.Warn("failed to convert to target type", zap.Any("topic", data.Topic), zap.Error(err)) - return + return nil } svc.logger.Debug("event received", zap.Any("event", event)) @@ -64,7 +64,7 @@ func (svc *WebsocketService) processEvent(item interface{}) { dataBytes, err := json.Marshal(response) if err != nil { svc.logger.Error("error on converting to json", zap.Error(err)) - return + return nil } wsClients := svc.store.getClients() @@ -76,7 +76,7 @@ func (svc *WebsocketService) processEvent(item interface{}) { if err != nil { svc.logger.Debug("error on setting write deadline", zap.Any("remoteAddress", client.RemoteAddr().String()), zap.Error(err)) svc.store.unregister(client) - return + return nil } err = client.WriteMessage(ws.TextMessage, dataBytes) if err != nil { @@ -84,4 +84,5 @@ func (svc *WebsocketService) processEvent(item interface{}) { svc.store.unregister(client) } } + return nil } diff --git a/pkg/utils/queue/bonded_queue.go b/pkg/utils/queue/bonded_queue.go index b77121a..e006acb 100644 --- a/pkg/utils/queue/bonded_queue.go +++ b/pkg/utils/queue/bonded_queue.go @@ -21,12 +21,13 @@ package queue import ( "sync" "sync/atomic" + "time" "unsafe" ) // Consumer consumes data from a bounded queue type Consumer interface { - Consume(item interface{}) + Consume(item interface{}) error } // BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, @@ -44,6 +45,14 @@ type BoundedQueue struct { onDroppedItem func(item interface{}) factory func() Consumer stopCh chan struct{} + retryConfig RetryConfig +} + +// RetryConfig holds configuration for retrying failed items +type RetryConfig struct { + isEnabled bool + maxCount uint32 + delay time.Duration } // NewBoundedQueue constructs the new queue of specified capacity, and with an optional @@ -59,6 +68,24 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde return bq } +// NewBoundedQueue constructs the new queue of specified capacity, and with an optional +// callback for dropped items (e.g. useful to emit metrics). +func NewBoundedQueueWithRetry(capacity int, onDroppedItem func(item interface{}), retryMaxCount uint32, retryDelay time.Duration) *BoundedQueue { + queue := make(chan interface{}, capacity) + bq := &BoundedQueue{ + onDroppedItem: onDroppedItem, + items: &queue, + stopCh: make(chan struct{}), + retryConfig: RetryConfig{ + isEnabled: true, + maxCount: retryMaxCount, + delay: retryDelay, + }, + } + bq.capacity.Store(uint32(capacity)) + return bq +} + // StartConsumersWithFactory creates a given number of consumers consuming items // from the queue in separate goroutines. func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) { @@ -73,19 +100,117 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume defer q.stopWG.Done() consumer := q.factory() queue := *q.items - for { - select { - case item, ok := <-queue: - if ok { - q.size.Add(-1) - consumer.Consume(item) + + if q.retryConfig.isEnabled { + // Create a local queue for retrying items at the front + var retryItem interface{} + hasRetry := false + retryDelay := time.Millisecond * 100 + retryAttemptNumber := uint32(0) + + for { + // If we have a retry item, process it first + if hasRetry { + if q.retryConfig.maxCount > 0 && retryAttemptNumber >= q.retryConfig.maxCount { + // Max retries exceeded, drop the item + q.size.Add(-1) + if q.onDroppedItem != nil { + q.onDroppedItem(retryItem) + } + hasRetry = false + retryItem = nil + retryDelay = time.Millisecond * 100 + retryAttemptNumber = 0 + continue + } + + select { + case <-time.After(retryDelay): + retryAttemptNumber++ + func() { + defer func() { + if r := recover(); r != nil { + // Panic occurred, treat as error by not clearing hasRetry + } + }() + err := consumer.Consume(retryItem) + if err != nil { + // Still failing, increase delay (exponential backoff) + retryDelay = retryDelay * 2 + if retryDelay > q.retryConfig.delay { + retryDelay = q.retryConfig.delay + } + } else { + // Success! Clear retry state and decrement size + hasRetry = false + retryItem = nil + retryDelay = time.Millisecond * 100 + retryAttemptNumber = 0 + q.size.Add(-1) + } + }() + case <-q.stopCh: + // Queue is closing + if hasRetry { + q.size.Add(-1) + } + return + } } else { - // channel closed, finish worker + // No retry item, get next from queue + select { + case item, ok := <-queue: + if ok { + func() { + defer func() { + if r := recover(); r != nil { + // Panic occurred, treat as error + retryItem = item + hasRetry = true + } + }() + err := consumer.Consume(item) + if err != nil { + // Failed, set as retry item + retryItem = item + hasRetry = true + } else { + // Success, decrement size + q.size.Add(-1) + } + }() + } else { + // channel closed, finish worker + return + } + case <-q.stopCh: + // the whole queue is closing, finish worker + return + } + } + } + } else { + for { + select { + case item, ok := <-queue: + if ok { + func() { + defer func() { + if r := recover(); r != nil { + // Panic occurred, but with retry disabled we still decrement size + } + }() + _ = consumer.Consume(item) + }() + q.size.Add(-1) + } else { + // channel closed, finish worker + return + } + case <-q.stopCh: + // the whole queue is closing, finish worker return } - case <-q.stopCh: - // the whole queue is closing, finish worker - return } } }() @@ -95,16 +220,16 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume // ConsumerFunc is an adapter to allow the use of // a consume function callback as a Consumer. -type ConsumerFunc func(item interface{}) +type ConsumerFunc func(item interface{}) error // Consume calls c(item) -func (c ConsumerFunc) Consume(item interface{}) { - c(item) +func (c ConsumerFunc) Consume(item interface{}) error { + return c(item) } // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. -func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{})) { +func (q *BoundedQueue) StartConsumers(num int, callback func(item interface{}) error) { q.StartConsumersWithFactory(num, func() Consumer { return ConsumerFunc(callback) }) @@ -119,7 +244,9 @@ func (q *BoundedQueue) StopConsumers() { // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. func (q *BoundedQueue) Produce(item interface{}) bool { if q.stopped.Load() != 0 { - q.onDroppedItem(item) + if q.onDroppedItem != nil { + q.onDroppedItem(item) + } return false } @@ -128,7 +255,9 @@ func (q *BoundedQueue) Produce(item interface{}) bool { // should match the capacity of the new queue if q.Size() >= q.Capacity() { // note that all items will be dropped if the capacity is 0 - q.onDroppedItem(item) + if q.onDroppedItem != nil { + q.onDroppedItem(item) + } return false } @@ -149,10 +278,12 @@ func (q *BoundedQueue) Produce(item interface{}) bool { // Stop stops all consumers, as well as the length reporter if started, // and releases the items channel. It blocks until all consumers have stopped. func (q *BoundedQueue) Stop() { - q.stopped.Store(1) // disable producer - close(q.stopCh) - q.stopWG.Wait() - close(*q.items) + // Use atomic CAS to ensure Stop is only executed once + if q.stopped.CompareAndSwap(0, 1) { + close(q.stopCh) + q.stopWG.Wait() + close(*q.items) + } } // Size returns the current size of the queue diff --git a/pkg/utils/queue/bonded_queue_test.go b/pkg/utils/queue/bonded_queue_test.go new file mode 100644 index 0000000..31032b2 --- /dev/null +++ b/pkg/utils/queue/bonded_queue_test.go @@ -0,0 +1,767 @@ +package queue + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" + "unsafe" +) + +// TestNewBoundedQueue tests basic queue creation +func TestNewBoundedQueue(t *testing.T) { + tests := []struct { + name string + capacity int + wantCap int + }{ + {"small capacity", 10, 10}, + {"medium capacity", 100, 100}, + {"large capacity", 1000, 1000}, + {"zero capacity", 0, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var droppedItems []interface{} + onDropped := func(item interface{}) { + droppedItems = append(droppedItems, item) + } + + q := NewBoundedQueue(tt.capacity, onDropped) + if q == nil { + t.Fatal("NewBoundedQueue returned nil") + } + + if q.Capacity() != tt.wantCap { + t.Errorf("Capacity() = %d, want %d", q.Capacity(), tt.wantCap) + } + + if q.Size() != 0 { + t.Errorf("Size() = %d, want 0", q.Size()) + } + + q.Stop() + }) + } +} + +// TestNewBoundedQueueWithRetry tests queue creation with retry +func TestNewBoundedQueueWithRetry(t *testing.T) { + tests := []struct { + name string + capacity int + retryMaxCount uint32 + retryDelay time.Duration + wantCap int + }{ + {"with limited retries", 10, 3, 100 * time.Millisecond, 10}, + {"with unlimited retries", 50, 0, 50 * time.Millisecond, 50}, + {"with high retry count", 100, 100, 10 * time.Millisecond, 100}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var droppedItems []interface{} + onDropped := func(item interface{}) { + droppedItems = append(droppedItems, item) + } + + q := NewBoundedQueueWithRetry(tt.capacity, onDropped, tt.retryMaxCount, tt.retryDelay) + if q == nil { + t.Fatal("NewBoundedQueueWithRetry returned nil") + } + + if q.Capacity() != tt.wantCap { + t.Errorf("Capacity() = %d, want %d", q.Capacity(), tt.wantCap) + } + + if q.Size() != 0 { + t.Errorf("Size() = %d, want 0", q.Size()) + } + + if !q.retryConfig.isEnabled { + t.Error("retryConfig.isEnabled should be true") + } + + if q.retryConfig.maxCount != tt.retryMaxCount { + t.Errorf("retryConfig.maxCount = %d, want %d", q.retryConfig.maxCount, tt.retryMaxCount) + } + + if q.retryConfig.delay != tt.retryDelay { + t.Errorf("retryConfig.delay = %v, want %v", q.retryConfig.delay, tt.retryDelay) + } + + q.Stop() + }) + } +} + +// TestBoundedQueueProduce tests item production +func TestBoundedQueueProduce(t *testing.T) { + tests := []struct { + name string + capacity int + items []interface{} + wantSize int + wantResult []bool + }{ + { + "within capacity", + 10, + []interface{}{1, 2, 3}, + 3, + []bool{true, true, true}, + }, + { + "at capacity", + 2, + []interface{}{1, 2}, + 2, + []bool{true, true}, + }, + { + "over capacity", + 2, + []interface{}{1, 2, 3, 4}, + 2, + []bool{true, true, false, false}, + }, + { + "zero capacity", + 0, + []interface{}{1, 2}, + 0, + []bool{false, false}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var droppedItems []interface{} + onDropped := func(item interface{}) { + droppedItems = append(droppedItems, item) + } + + q := NewBoundedQueue(tt.capacity, onDropped) + defer q.Stop() + + results := make([]bool, len(tt.items)) + for i, item := range tt.items { + results[i] = q.Produce(item) + } + + if q.Size() != tt.wantSize { + t.Errorf("Size() = %d, want %d", q.Size(), tt.wantSize) + } + + for i, result := range results { + if result != tt.wantResult[i] { + t.Errorf("Produce(%v) = %v, want %v", tt.items[i], result, tt.wantResult[i]) + } + } + + expectedDropped := len(tt.items) - tt.wantSize + if len(droppedItems) != expectedDropped { + t.Errorf("dropped items count = %d, want %d", len(droppedItems), expectedDropped) + } + }) + } +} + +// TestBoundedQueueConsumers tests consumer functionality +func TestBoundedQueueConsumers(t *testing.T) { + tests := []struct { + name string + capacity int + workers int + items []interface{} + consumerErr error + wantConsumed int + }{ + { + "successful consumption", + 10, + 2, + []interface{}{1, 2, 3, 4, 5}, + nil, + 5, + }, + { + "single worker", + 5, + 1, + []interface{}{"a", "b", "c"}, + nil, + 3, + }, + { + "multiple workers", + 20, + 5, + []interface{}{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + nil, + 10, + }, + { + "consumer with errors", + 10, + 1, + []interface{}{1, 2, 3}, + errors.New("consumer error"), + 3, // Items still processed even with errors when retry disabled + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var consumed int32 + var consumedItems []interface{} + var mu sync.Mutex + + consumer := func(item interface{}) error { + atomic.AddInt32(&consumed, 1) + mu.Lock() + consumedItems = append(consumedItems, item) + mu.Unlock() + return tt.consumerErr + } + + q := NewBoundedQueue(tt.capacity, nil) + defer q.Stop() + + q.StartConsumers(tt.workers, consumer) + + // Give consumers time to start + time.Sleep(10 * time.Millisecond) + + for _, item := range tt.items { + q.Produce(item) + } + + // Wait for processing + time.Sleep(100 * time.Millisecond) + + finalConsumed := atomic.LoadInt32(&consumed) + if int(finalConsumed) != tt.wantConsumed { + t.Errorf("consumed items = %d, want %d", finalConsumed, tt.wantConsumed) + } + + mu.Lock() + if len(consumedItems) != tt.wantConsumed { + t.Errorf("consumedItems length = %d, want %d", len(consumedItems), tt.wantConsumed) + } + mu.Unlock() + }) + } +} + +// TestBoundedQueueWithRetryFunctionality tests retry behavior +func TestBoundedQueueWithRetryFunctionality(t *testing.T) { + tests := []struct { + name string + maxRetryCount uint32 + retryDelay time.Duration + failCount int + wantAttempts int + }{ + { + "retry until success", + 5, + 10 * time.Millisecond, + 2, // Fail first 2, succeed on 3rd + 3, + }, + { + "exhaust retries", + 2, + 10 * time.Millisecond, + 10, // Always fail + 3, // 1 initial + 2 retries + }, + { + "unlimited retries", + 0, + 5 * time.Millisecond, + 4, // Fail first 4, succeed on 5th + 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var attempts int32 + var droppedItems []interface{} + + consumer := func(item interface{}) error { + attempt := atomic.AddInt32(&attempts, 1) + if int(attempt) <= tt.failCount { + return errors.New("simulated failure") + } + return nil + } + + onDropped := func(item interface{}) { + droppedItems = append(droppedItems, item) + } + + q := NewBoundedQueueWithRetry(10, onDropped, tt.maxRetryCount, tt.retryDelay) + defer q.Stop() + + q.StartConsumers(1, consumer) + time.Sleep(10 * time.Millisecond) + + q.Produce("test_item") + + // Wait for retries to complete + maxWait := time.Duration(tt.wantAttempts) * tt.retryDelay * 10 + if maxWait < 500*time.Millisecond { + maxWait = 500 * time.Millisecond + } + time.Sleep(maxWait) + + finalAttempts := atomic.LoadInt32(&attempts) + if int(finalAttempts) != tt.wantAttempts { + t.Errorf("attempts = %d, want %d", finalAttempts, tt.wantAttempts) + } + + // Check if item was dropped when retries exhausted + if tt.failCount >= tt.wantAttempts && tt.maxRetryCount > 0 { + if len(droppedItems) != 1 { + t.Errorf("expected 1 dropped item, got %d", len(droppedItems)) + } + } + }) + } +} + +// TestBoundedQueueResize tests queue resizing functionality +func TestBoundedQueueResize(t *testing.T) { + tests := []struct { + name string + initialCap int + newCap int + wantSuccess bool + }{ + {"increase capacity", 10, 20, true}, + {"decrease capacity", 20, 10, true}, + {"same capacity", 15, 15, false}, + {"zero to positive", 0, 10, true}, + {"positive to zero", 10, 0, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + consumer := func(item interface{}) error { + time.Sleep(10 * time.Millisecond) + return nil + } + + q := NewBoundedQueue(tt.initialCap, nil) + defer q.Stop() + + q.StartConsumers(1, consumer) + time.Sleep(10 * time.Millisecond) + + // Add some items + for i := 0; i < min(tt.initialCap, 5); i++ { + q.Produce(i) + } + + success := q.Resize(tt.newCap) + if success != tt.wantSuccess { + t.Errorf("Resize(%d) = %v, want %v", tt.newCap, success, tt.wantSuccess) + } + + if tt.wantSuccess { + if q.Capacity() != tt.newCap { + t.Errorf("Capacity() = %d, want %d", q.Capacity(), tt.newCap) + } + } else { + if q.Capacity() != tt.initialCap { + t.Errorf("Capacity() = %d, want %d (unchanged)", q.Capacity(), tt.initialCap) + } + } + }) + } +} + +// TestBoundedQueueStop tests queue stopping +func TestBoundedQueueStop(t *testing.T) { + t.Run("stop prevents production", func(t *testing.T) { + var droppedItems []interface{} + onDropped := func(item interface{}) { + droppedItems = append(droppedItems, item) + } + + q := NewBoundedQueue(10, onDropped) + q.StartConsumers(1, func(item interface{}) error { return nil }) + + // Produce before stop + if !q.Produce("before_stop") { + t.Error("Should be able to produce before stop") + } + + q.Stop() + + // Produce after stop should fail + if q.Produce("after_stop") { + t.Error("Should not be able to produce after stop") + } + + // Item should be dropped + if len(droppedItems) != 1 { + t.Errorf("Expected 1 dropped item, got %d", len(droppedItems)) + } + }) + + t.Run("multiple stops are safe", func(t *testing.T) { + q := NewBoundedQueue(10, nil) + q.StartConsumers(1, func(item interface{}) error { return nil }) + + // Multiple stops should not panic or block + q.Stop() + q.Stop() + q.Stop() + }) +} + +// TestBoundedQueueConcurrency tests concurrent operations +func TestBoundedQueueConcurrency(t *testing.T) { + t.Run("concurrent produce and consume", func(t *testing.T) { + const ( + capacity = 1000 // Increased capacity + producers = 5 + itemsEach = 50 // Reduced items to avoid queue overflow + consumers = 3 + ) + + var consumed int64 + var successful int64 + consumer := func(item interface{}) error { + atomic.AddInt64(&consumed, 1) + return nil + } + + q := NewBoundedQueue(capacity, nil) + defer q.Stop() + + q.StartConsumers(consumers, consumer) + time.Sleep(10 * time.Millisecond) + + var wg sync.WaitGroup + + // Start producers + for p := 0; p < producers; p++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for i := 0; i < itemsEach; i++ { + if q.Produce(id*itemsEach + i) { + atomic.AddInt64(&successful, 1) + } + } + }(p) + } + + wg.Wait() + + // Wait for all items to be consumed + time.Sleep(1 * time.Second) + + expectedTotal := atomic.LoadInt64(&successful) + actualConsumed := atomic.LoadInt64(&consumed) + + if actualConsumed != expectedTotal { + t.Errorf("consumed = %d, want %d (successful productions)", actualConsumed, expectedTotal) + } + }) + + t.Run("concurrent size and capacity checks", func(t *testing.T) { + q := NewBoundedQueue(50, nil) + defer q.Stop() + + consumer := func(item interface{}) error { + time.Sleep(1 * time.Millisecond) + return nil + } + q.StartConsumers(2, consumer) + + var wg sync.WaitGroup + + // Producer + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + q.Produce(i) + } + }() + + // Size checker + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + size := q.Size() + capacity := q.Capacity() + if size < 0 { + t.Errorf("Size should not be negative: %d", size) + } + if size > capacity { + t.Errorf("Size %d should not exceed capacity %d", size, capacity) + } + time.Sleep(2 * time.Millisecond) + } + }() + + wg.Wait() + }) +} + +// TestConsumerFunc tests the ConsumerFunc adapter +func TestConsumerFunc(t *testing.T) { + t.Run("consumer func adapter", func(t *testing.T) { + var processed []interface{} + var mu sync.Mutex + + callback := func(item interface{}) error { + mu.Lock() + processed = append(processed, item) + mu.Unlock() + if item == "error" { + return errors.New("test error") + } + return nil + } + + consumer := ConsumerFunc(callback) + + // Test successful consumption + err := consumer.Consume("success") + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // Test error case + err = consumer.Consume("error") + if err == nil { + t.Error("Expected error, got nil") + } + + mu.Lock() + defer mu.Unlock() + if len(processed) != 2 { + t.Errorf("Expected 2 processed items, got %d", len(processed)) + } + }) +} + +// TestBoundedQueuePanicHandling tests panic recovery in consumers +func TestBoundedQueuePanicHandling(t *testing.T) { + tests := []struct { + name string + retryEnabled bool + expectedMin int // Minimum expected processed items + }{ + {"panic with retry disabled", false, 1}, + {"panic with retry enabled", true, 0}, // With retry, panic item may be retried and dropped + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var processed int32 + consumer := func(item interface{}) error { + if item == "panic" { + panic("test panic") + } + atomic.AddInt32(&processed, 1) + return nil + } + + var q *BoundedQueue + if tt.retryEnabled { + q = NewBoundedQueueWithRetry(10, nil, 2, 10*time.Millisecond) + } else { + q = NewBoundedQueue(10, nil) + } + defer q.Stop() + + q.StartConsumers(1, consumer) + time.Sleep(10 * time.Millisecond) + + // This should not crash the test + q.Produce("panic") + time.Sleep(200 * time.Millisecond) // Longer wait for retry scenarios + + // Queue should still be functional + q.Produce("normal") + time.Sleep(200 * time.Millisecond) + + finalProcessed := atomic.LoadInt32(&processed) + if int(finalProcessed) < tt.expectedMin { + t.Errorf("Expected at least %d processed items, got %d", tt.expectedMin, finalProcessed) + } + }) + } +} + +// min helper function for Go versions that don't have it built-in +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// BenchmarkBoundedQueueProduce benchmarks queue production +func BenchmarkBoundedQueueProduce(b *testing.B) { + q := NewBoundedQueue(10000, nil) + defer q.Stop() + + consumer := func(item interface{}) error { return nil } + q.StartConsumers(4, consumer) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Produce(i) + } +} + +// BenchmarkBoundedQueueProduceWithRetry benchmarks queue with retry +func BenchmarkBoundedQueueProduceWithRetry(b *testing.B) { + q := NewBoundedQueueWithRetry(10000, nil, 3, 100*time.Millisecond) + defer q.Stop() + + consumer := func(item interface{}) error { return nil } + q.StartConsumers(4, consumer) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Produce(i) + } +} + +// BenchmarkBoundedQueueConcurrentAccess benchmarks concurrent operations +func BenchmarkBoundedQueueConcurrentAccess(b *testing.B) { + q := NewBoundedQueue(1000, nil) + defer q.Stop() + + consumer := func(item interface{}) error { return nil } + q.StartConsumers(4, consumer) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + q.Produce(i) + i++ + } + }) +} + +// TestBoundedQueueAtomicOperations tests thread safety of atomic operations +func TestBoundedQueueAtomicOperations(t *testing.T) { + t.Run("atomic size updates", func(t *testing.T) { + q := NewBoundedQueue(1000, nil) + defer q.Stop() + + // Slow consumer to allow queue to fill up + consumer := func(item interface{}) error { + time.Sleep(1 * time.Millisecond) + return nil + } + q.StartConsumers(1, consumer) + + var wg sync.WaitGroup + numGoroutines := 10 + + // Multiple producers + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 50; j++ { + q.Produce(j) + } + }() + } + + // Size checker + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + size := q.Size() + if size < 0 { + t.Errorf("Size should never be negative, got %d", size) + } + time.Sleep(1 * time.Millisecond) + } + }() + + wg.Wait() + }) +} + +// TestBoundedQueueMemoryManagement tests proper cleanup +func TestBoundedQueueMemoryManagement(t *testing.T) { + t.Run("cleanup after stop", func(t *testing.T) { + q := NewBoundedQueue(100, nil) + consumer := func(item interface{}) error { return nil } + q.StartConsumers(2, consumer) + + // Add items + for i := 0; i < 50; i++ { + q.Produce(i) + } + + // Stop should clean up properly + q.Stop() + + // Operations after stop should fail gracefully + if q.Produce("after_stop") { + t.Error("Produce should fail after stop") + } + + // Size should still be accessible + size := q.Size() + if size < 0 { + t.Errorf("Size should not be negative after stop, got %d", size) + } + }) +} + +// TestBoundedQueueUnsafePointerOperations tests unsafe pointer usage in Resize +func TestBoundedQueueUnsafePointerOperations(t *testing.T) { + t.Run("unsafe pointer in resize", func(t *testing.T) { + q := NewBoundedQueue(10, nil) + defer q.Stop() + + consumer := func(item interface{}) error { + time.Sleep(1 * time.Millisecond) + return nil + } + q.StartConsumers(1, consumer) + + // Get initial pointer + initialItems := q.items + + // Resize should change the pointer + success := q.Resize(20) + if !success { + t.Error("Resize should succeed") + } + + // Verify pointer changed + newItems := q.items + if unsafe.Pointer(initialItems) == unsafe.Pointer(newItems) { + t.Error("Items channel pointer should have changed after resize") + } + + if q.Capacity() != 20 { + t.Errorf("Capacity should be 20 after resize, got %d", q.Capacity()) + } + }) +} diff --git a/pkg/utils/queue/utils.go b/pkg/utils/queue/utils.go index 521537b..91da2d4 100644 --- a/pkg/utils/queue/utils.go +++ b/pkg/utils/queue/utils.go @@ -1,6 +1,8 @@ package queue import ( + "time" + "github.com/mycontroller-org/server/v2/pkg/utils/concurrency" "go.uber.org/zap" ) @@ -15,12 +17,24 @@ type Queue struct { } // New returns brandnew queue -func New(logger *zap.Logger, name string, limit int, consumer func(item interface{}), workers int) *Queue { +func New(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int) *Queue { + // Enable retry by default with unlimited retries (0 means unlimited) and 5 second max delay + return NewWithRetry(logger, name, limit, consumer, workers, true, 0, 5*time.Second) +} + +// New returns brandnew queue with retry options +func NewWithRetry(logger *zap.Logger, name string, limit int, consumer func(item interface{}) error, workers int, isRetryEnabled bool, maxRetryCount uint32, retryDelay time.Duration) *Queue { droppedItemHandler := func(item interface{}) { logger.Error("queue full. dropping item", zap.String("queueName", name), zap.Any("item", item)) } - queue := NewBoundedQueue(limit, droppedItemHandler) + var queue *BoundedQueue + if isRetryEnabled { + queue = NewBoundedQueueWithRetry(limit, droppedItemHandler, maxRetryCount, retryDelay) + } else { + queue = NewBoundedQueue(limit, droppedItemHandler) + } + queue.StartConsumers(workers, consumer) return &Queue{ diff --git a/pkg/utils/queue/utils_test.go b/pkg/utils/queue/utils_test.go new file mode 100644 index 0000000..9f67a1c --- /dev/null +++ b/pkg/utils/queue/utils_test.go @@ -0,0 +1,756 @@ +package queue + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "go.uber.org/zap" +) + +// TestNew tests the basic queue creation function +func TestNew(t *testing.T) { + tests := []struct { + name string + queueName string + limit int + workers int + wantLimit int + wantWorkers int + wantName string + }{ + { + name: "basic queue", + queueName: "test_queue", + limit: 100, + workers: 2, + wantLimit: 100, + wantWorkers: 2, + wantName: "test_queue", + }, + { + name: "single worker", + queueName: "single", + limit: 10, + workers: 1, + wantLimit: 10, + wantWorkers: 1, + wantName: "single", + }, + { + name: "high capacity", + queueName: "high_cap", + limit: 10000, + workers: 10, + wantLimit: 10000, + wantWorkers: 10, + wantName: "high_cap", + }, + { + name: "zero workers", + queueName: "zero_workers", + limit: 50, + workers: 0, + wantLimit: 50, + wantWorkers: 0, + wantName: "zero_workers", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + var processed []interface{} + var mu sync.Mutex + + consumer := func(item interface{}) error { + mu.Lock() + processed = append(processed, item) + mu.Unlock() + return nil + } + + q := New(logger, tt.queueName, tt.limit, consumer, tt.workers) + defer q.Close() + + if q.Name != tt.wantName { + t.Errorf("Name = %s, want %s", q.Name, tt.wantName) + } + if q.Limit != tt.wantLimit { + t.Errorf("Limit = %d, want %d", q.Limit, tt.wantLimit) + } + if q.Workers != tt.wantWorkers { + t.Errorf("Workers = %d, want %d", q.Workers, tt.wantWorkers) + } + if q.Queue == nil { + t.Error("Queue should not be nil") + } + + // Test basic functionality if workers > 0 + if tt.workers > 0 { + time.Sleep(10 * time.Millisecond) + q.Produce("test_item") + time.Sleep(100 * time.Millisecond) + + mu.Lock() + if len(processed) != 1 { + t.Errorf("Expected 1 processed item, got %d", len(processed)) + } + mu.Unlock() + } + }) + } +} + +// TestNewWithRetry tests queue creation with retry options +func TestNewWithRetry(t *testing.T) { + tests := []struct { + name string + queueName string + limit int + workers int + retryEnabled bool + maxRetryCount uint32 + retryDelay time.Duration + expectRetries bool + failureCount int + expectedFinalAttempts int + }{ + { + name: "retry enabled with limited retries", + queueName: "retry_limited", + limit: 10, + workers: 1, + retryEnabled: true, + maxRetryCount: 3, + retryDelay: 10 * time.Millisecond, + expectRetries: true, + failureCount: 2, // fail first 2, succeed on 3rd + expectedFinalAttempts: 3, + }, + { + name: "retry enabled unlimited", + queueName: "retry_unlimited", + limit: 10, + workers: 1, + retryEnabled: true, + maxRetryCount: 0, // unlimited + retryDelay: 10 * time.Millisecond, + expectRetries: true, + failureCount: 4, // fail first 4, succeed on 5th + expectedFinalAttempts: 5, + }, + { + name: "retry disabled", + queueName: "no_retry", + limit: 10, + workers: 1, + retryEnabled: false, + maxRetryCount: 0, + retryDelay: 0, + expectRetries: false, + failureCount: 10, // always fail + expectedFinalAttempts: 1, + }, + { + name: "retry exhausted", + queueName: "retry_exhausted", + limit: 10, + workers: 1, + retryEnabled: true, + maxRetryCount: 2, + retryDelay: 10 * time.Millisecond, + expectRetries: true, + failureCount: 10, // always fail + expectedFinalAttempts: 3, // 1 initial + 2 retries + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + var attempts int32 + + consumer := func(item interface{}) error { + attempt := atomic.AddInt32(&attempts, 1) + if int(attempt) <= tt.failureCount { + return fmt.Errorf("simulated failure %d", attempt) + } + return nil + } + + q := NewWithRetry(logger, tt.queueName, tt.limit, consumer, tt.workers, + tt.retryEnabled, tt.maxRetryCount, tt.retryDelay) + defer q.Close() + + // Verify queue properties + if q.Name != tt.queueName { + t.Errorf("Name = %s, want %s", q.Name, tt.queueName) + } + if q.Limit != tt.limit { + t.Errorf("Limit = %d, want %d", q.Limit, tt.limit) + } + if q.Workers != tt.workers { + t.Errorf("Workers = %d, want %d", q.Workers, tt.workers) + } + + if tt.workers > 0 { + time.Sleep(10 * time.Millisecond) + q.Produce("test_retry") + + // Wait for retries to complete + maxWait := time.Duration(tt.expectedFinalAttempts) * tt.retryDelay * 10 + if maxWait < 500*time.Millisecond { + maxWait = 500 * time.Millisecond + } + time.Sleep(maxWait) + + finalAttempts := atomic.LoadInt32(&attempts) + if int(finalAttempts) != tt.expectedFinalAttempts { + t.Errorf("attempts = %d, want %d", finalAttempts, tt.expectedFinalAttempts) + } + } + }) + } +} + +// TestQueueOperations tests Queue wrapper methods +func TestQueueOperations(t *testing.T) { + tests := []struct { + name string + capacity int + items []interface{} + expectedProduced int + expectedSize int + }{ + { + name: "within capacity", + capacity: 10, + items: []interface{}{1, 2, 3, 4, 5}, + expectedProduced: 5, + expectedSize: 5, + }, + { + name: "at capacity", + capacity: 3, + items: []interface{}{"a", "b", "c"}, + expectedProduced: 3, + expectedSize: 3, + }, + { + name: "over capacity", + capacity: 2, + items: []interface{}{1, 2, 3, 4, 5}, + expectedProduced: 2, + expectedSize: 2, + }, + { + name: "zero capacity", + capacity: 0, + items: []interface{}{1, 2}, + expectedProduced: 0, + expectedSize: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + blockChan := make(chan struct{}) + + consumer := func(item interface{}) error { + <-blockChan // Block processing to test queue filling + return nil + } + + q := New(logger, "test", tt.capacity, consumer, 1) + defer func() { + close(blockChan) // Unblock before closing + q.Close() + }() + + time.Sleep(10 * time.Millisecond) // Let consumers start + + produced := 0 + for _, item := range tt.items { + if q.Produce(item) { + produced++ + } + } + + if produced != tt.expectedProduced { + t.Errorf("produced = %d, want %d", produced, tt.expectedProduced) + } + + size := q.Size() + if size != tt.expectedSize { + t.Errorf("Size() = %d, want %d", size, tt.expectedSize) + } + }) + } +} + +// TestQueueClose tests queue closing behavior +func TestQueueClose(t *testing.T) { + tests := []struct { + name string + capacity int + workers int + closeFirst bool + }{ + {"normal close", 10, 2, false}, + {"close before produce", 10, 2, true}, + {"single worker close", 5, 1, false}, + {"multiple workers close", 20, 5, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + + q := New(logger, "test", tt.capacity, consumer, tt.workers) + + if tt.closeFirst { + q.Close() + // Should not be able to produce after close + if q.Produce("after_close") { + t.Error("Should not be able to produce after close") + } + } else { + time.Sleep(10 * time.Millisecond) + // Should be able to produce before close + if !q.Produce("before_close") { + t.Error("Should be able to produce before close") + } + q.Close() + // Should not be able to produce after close + if q.Produce("after_close") { + t.Error("Should not be able to produce after close") + } + } + + // Multiple closes should be safe + q.Close() + q.Close() + }) + } +} + +// TestQueueSpec tests the QueueSpec wrapper +func TestQueueSpec(t *testing.T) { + tests := []struct { + name string + topic string + subscriptionId int64 + capacity int + items []interface{} + }{ + { + name: "basic queue spec", + topic: "test_topic", + subscriptionId: 12345, + capacity: 10, + items: []interface{}{"msg1", "msg2", "msg3"}, + }, + { + name: "different topic", + topic: "another_topic", + subscriptionId: 67890, + capacity: 5, + items: []interface{}{1, 2}, + }, + { + name: "empty topic", + topic: "", + subscriptionId: 0, + capacity: 1, + items: []interface{}{"single"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + var processed []interface{} + var mu sync.Mutex + + consumer := func(item interface{}) error { + mu.Lock() + processed = append(processed, item) + mu.Unlock() + return nil + } + + q := New(logger, "test", tt.capacity, consumer, 1) + qs := &QueueSpec{ + Topic: tt.topic, + SubscriptionId: tt.subscriptionId, + Queue: q, + } + defer qs.Close() + + // Verify properties + if qs.Topic != tt.topic { + t.Errorf("Topic = %s, want %s", qs.Topic, tt.topic) + } + if qs.SubscriptionId != tt.subscriptionId { + t.Errorf("SubscriptionId = %d, want %d", qs.SubscriptionId, tt.subscriptionId) + } + + time.Sleep(10 * time.Millisecond) + + // Test operations through QueueSpec + for _, item := range tt.items { + if !qs.Produce(item) { + t.Errorf("Failed to produce item %v", item) + } + } + + time.Sleep(100 * time.Millisecond) + + // Verify processing + mu.Lock() + if len(processed) != len(tt.items) { + t.Errorf("processed count = %d, want %d", len(processed), len(tt.items)) + } + mu.Unlock() + + // Test size method + size := qs.Size() + if size < 0 { + t.Errorf("Size should not be negative, got %d", size) + } + }) + } +} + +// TestQueueConcurrency tests concurrent operations on Queue wrapper +func TestQueueConcurrency(t *testing.T) { + tests := []struct { + name string + capacity int + workers int + producers int + itemsEach int + consumerDelay time.Duration + }{ + { + name: "light concurrent load", + capacity: 100, + workers: 2, + producers: 3, + itemsEach: 20, + consumerDelay: time.Millisecond, + }, + { + name: "heavy concurrent load", + capacity: 500, + workers: 5, + producers: 10, + itemsEach: 50, + consumerDelay: 0, + }, + { + name: "single worker many producers", + capacity: 200, + workers: 1, + producers: 8, + itemsEach: 25, + consumerDelay: time.Microsecond * 100, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + var consumed int64 + var successful int64 + + consumer := func(item interface{}) error { + if tt.consumerDelay > 0 { + time.Sleep(tt.consumerDelay) + } + atomic.AddInt64(&consumed, 1) + return nil + } + + q := New(logger, "concurrent_test", tt.capacity, consumer, tt.workers) + defer q.Close() + + time.Sleep(10 * time.Millisecond) // Let consumers start + + var wg sync.WaitGroup + + // Start producers + for p := 0; p < tt.producers; p++ { + wg.Add(1) + go func(producerId int) { + defer wg.Done() + for i := 0; i < tt.itemsEach; i++ { + item := fmt.Sprintf("producer_%d_item_%d", producerId, i) + if q.Produce(item) { + atomic.AddInt64(&successful, 1) + } + } + }(p) + } + + // Monitor size concurrently + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + size := q.Size() + if size < 0 { + t.Errorf("Size should not be negative: %d", size) + return + } + if size > tt.capacity { + t.Errorf("Size %d should not exceed capacity %d", size, tt.capacity) + return + } + time.Sleep(time.Millisecond) + } + } + }() + + wg.Wait() + cancel() + + // Wait for processing to complete + time.Sleep(time.Duration(tt.workers) * 100 * time.Millisecond) + + finalSuccessful := atomic.LoadInt64(&successful) + finalConsumed := atomic.LoadInt64(&consumed) + + if finalConsumed != finalSuccessful { + t.Errorf("consumed = %d, successful = %d, should be equal", finalConsumed, finalSuccessful) + } + + // Final size should be reasonable + finalSize := q.Size() + if finalSize < 0 { + t.Errorf("Final size should not be negative: %d", finalSize) + } + }) + } +} + +// TestQueueErrorScenarios tests various error conditions +func TestQueueErrorScenarios(t *testing.T) { + tests := []struct { + name string + capacity int + workers int + consumerFunc func(item interface{}) error + expectProcessing bool + testPanic bool + }{ + { + name: "nil consumer", + capacity: 10, + workers: 1, + consumerFunc: nil, + expectProcessing: false, + testPanic: false, + }, + { + name: "consumer with error", + capacity: 10, + workers: 1, + consumerFunc: func(item interface{}) error { + return errors.New("consumer error") + }, + expectProcessing: true, // Items processed but with errors + testPanic: false, + }, + { + name: "consumer with panic", + capacity: 10, + workers: 1, + consumerFunc: func(item interface{}) error { + if item == "panic_item" { + panic("test panic") + } + return nil + }, + expectProcessing: true, + testPanic: true, + }, + { + name: "zero workers", + capacity: 10, + workers: 0, + consumerFunc: func(item interface{}) error { + return nil + }, + expectProcessing: false, + testPanic: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := zap.NewNop() + q := New(logger, "error_test", tt.capacity, tt.consumerFunc, tt.workers) + defer q.Close() + + time.Sleep(10 * time.Millisecond) + + if tt.testPanic { + // This should not crash the test + q.Produce("panic_item") + time.Sleep(100 * time.Millisecond) + + // Queue should still be functional + q.Produce("normal_item") + time.Sleep(100 * time.Millisecond) + } else { + q.Produce("test_item") + time.Sleep(100 * time.Millisecond) + } + + size := q.Size() + if !tt.expectProcessing && tt.workers > 0 { + // Items should remain in queue if not processing + if size <= 0 { + t.Errorf("Expected items to remain in queue, size = %d", size) + } + } + }) + } +} + +// TestQueueMemoryManagement tests proper cleanup and resource management +func TestQueueMemoryManagement(t *testing.T) { + t.Run("multiple queue creation and cleanup", func(t *testing.T) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + + queues := make([]*Queue, 10) + + // Create multiple queues + for i := 0; i < 10; i++ { + queues[i] = New(logger, fmt.Sprintf("queue_%d", i), 10, consumer, 1) + + // Add some items + queues[i].Produce(fmt.Sprintf("item_%d", i)) + } + + time.Sleep(50 * time.Millisecond) + + // Close all queues + for _, q := range queues { + q.Close() + + // Verify operations fail after close + if q.Produce("after_close") { + t.Error("Produce should fail after close") + } + } + }) + + t.Run("queue spec cleanup", func(t *testing.T) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + + queueSpecs := make([]*QueueSpec, 5) + + for i := 0; i < 5; i++ { + q := New(logger, fmt.Sprintf("spec_queue_%d", i), 5, consumer, 1) + queueSpecs[i] = &QueueSpec{ + Topic: fmt.Sprintf("topic_%d", i), + SubscriptionId: int64(i + 100), + Queue: q, + } + + queueSpecs[i].Produce(fmt.Sprintf("spec_item_%d", i)) + } + + time.Sleep(50 * time.Millisecond) + + for _, qs := range queueSpecs { + qs.Close() + + if qs.Produce("after_spec_close") { + t.Error("QueueSpec Produce should fail after close") + } + } + }) +} + +// BenchmarkQueueNew benchmarks queue creation +func BenchmarkQueueNew(b *testing.B) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q := New(logger, "bench", 100, consumer, 2) + q.Close() + } +} + +// BenchmarkQueueProduce benchmarks queue production through wrapper +func BenchmarkQueueProduce(b *testing.B) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + q := New(logger, "bench", 10000, consumer, 4) + defer q.Close() + + time.Sleep(10 * time.Millisecond) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Produce(i) + } +} + +// BenchmarkQueueSpecProduce benchmarks production through QueueSpec +func BenchmarkQueueSpecProduce(b *testing.B) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + q := New(logger, "bench", 10000, consumer, 4) + qs := &QueueSpec{ + Topic: "benchmark", + SubscriptionId: 1, + Queue: q, + } + defer qs.Close() + + time.Sleep(10 * time.Millisecond) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + qs.Produce(i) + } +} + +// BenchmarkConcurrentQueueAccess benchmarks concurrent access +func BenchmarkConcurrentQueueAccess(b *testing.B) { + logger := zap.NewNop() + consumer := func(item interface{}) error { return nil } + q := New(logger, "bench", 1000, consumer, 4) + defer q.Close() + + time.Sleep(10 * time.Millisecond) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + q.Produce(i) + i++ + } + }) +} \ No newline at end of file diff --git a/plugin/gateway/provider/mysensors_v2/event_listener.go b/plugin/gateway/provider/mysensors_v2/event_listener.go index 3c7c740..9054e65 100644 --- a/plugin/gateway/provider/mysensors_v2/event_listener.go +++ b/plugin/gateway/provider/mysensors_v2/event_listener.go @@ -83,7 +83,7 @@ func (p *Provider) onEvent(data *busTY.BusData) { } // processServiceEvent from the queue -func (p *Provider) processServiceEvent(item interface{}) { +func (p *Provider) processServiceEvent(item interface{}) error { event := item.(*eventTY.Event) p.logger.Debug("Processing a request", zap.Any("event", event)) @@ -94,7 +94,7 @@ func (p *Provider) processServiceEvent(item interface{}) { err := event.LoadEntity(&firmware) if err != nil { p.logger.Error("error on loading firmware entity", zap.String("eventQuickId", event.EntityQuickID), zap.Error(err)) - return + return nil // Don't requeue invalid events } fwRawStore.Remove(firmware.ID) fwStore.Remove(firmware.ID) @@ -104,7 +104,7 @@ func (p *Provider) processServiceEvent(item interface{}) { err := event.LoadEntity(&node) if err != nil { p.logger.Error("error on loading node entity", zap.String("eventQuickId", event.EntityQuickID), zap.Error(err)) - return + return nil // Don't requeue invalid events } localID := p.getNodeStoreID(node.GatewayID, node.NodeID) if nodeStore.IsAvailable(localID) { @@ -114,4 +114,5 @@ func (p *Provider) processServiceEvent(item interface{}) { default: p.logger.Info("received unsupported event", zap.Any("event", event)) } + return nil } diff --git a/plugin/gateway/provider/service.go b/plugin/gateway/provider/service.go index faf8426..2404d33 100644 --- a/plugin/gateway/provider/service.go +++ b/plugin/gateway/provider/service.go @@ -173,11 +173,11 @@ func (s *Service) startMessageListener() { } } -func (s *Service) messageConsumer(item interface{}) { +func (s *Service) messageConsumer(item interface{}) error { msg, ok := item.(*msgTY.Message) if !ok { s.logger.Error("invalid message type", zap.Any("received", item)) - return + return nil // Don't requeue invalid messages } // include timestamp, if not set @@ -188,19 +188,19 @@ func (s *Service) messageConsumer(item interface{}) { // if it is awake message send the sleeping queue messages if msg.Type == msgTY.TypeAction && len(msg.Payloads) > 0 && msg.Payloads[0].Value == nodeTY.ActionAwake { s.publishSleepingMessageQueue(msg.NodeID) - return + return nil } else if msg.IsSleepNode { // for sleeping node message to be added in to the sleeping queue // when node sends awake signal, queue message will be published s.addToSleepingMessageQueue(msg) - return + return nil } else { - s.postMessage(msg, s.GatewayConfig.QueueFailedMessage) + return s.postMessage(msg, s.GatewayConfig.QueueFailedMessage) } } // postMessage to the provider -func (s *Service) postMessage(msg *msgTY.Message, queueFailedMessage bool) { +func (s *Service) postMessage(msg *msgTY.Message, queueFailedMessage bool) error { err := s.provider.Post(msg) if err != nil { s.logger.Warn("error on sending", zap.String("gatewayId", s.GatewayConfig.ID), zap.Any("message", msg), zap.Error(err)) @@ -209,23 +209,26 @@ func (s *Service) postMessage(msg *msgTY.Message, queueFailedMessage bool) { isSleepingQueueDisabled := msg.Labels.IsExists(types.LabelNodeSleepQueueDisabled) && msg.Labels.GetBool(types.LabelNodeSleepQueueDisabled) if !isSleepingQueueDisabled { s.addToSleepingMessageQueue(msg) + return nil // Message was queued, no need to requeue } + return err // Return error to requeue the message } } + return nil } // process received raw messages from protocol -func (s *Service) rawMessageProcessor(data interface{}) { +func (s *Service) rawMessageProcessor(data interface{}) error { rawMsg := data.(*msgTY.RawMessage) s.logger.Debug("received rawMessage", zap.String("gatewayId", s.GatewayConfig.ID), zap.Any("rawMessage", rawMsg)) messages, err := s.provider.ConvertToMessages(rawMsg) if err != nil { s.logger.Warn("failed to parse", zap.String("gatewayId", s.GatewayConfig.ID), zap.Any("rawMessage", rawMsg), zap.Error(err)) - return + return nil // Don't requeue unparseable messages } if len(messages) == 0 { s.logger.Debug("messages not parsed", zap.String("gatewayId", s.GatewayConfig.ID), zap.Any("rawMessage", rawMsg)) - return + return nil } // update gatewayID if not found for index := 0; index < len(messages); index++ { @@ -237,10 +240,11 @@ func (s *Service) rawMessageProcessor(data interface{}) { err = s.bus.Publish(s.rawMessageQueue.Topic, msg) if err != nil { s.logger.Debug("failed to post on topic", zap.String("topic", s.rawMessageQueue.Topic), zap.String("gatewayId", s.GatewayConfig.ID), zap.Any("message", msg), zap.Error(err)) - return + return err // Return error to requeue the message } } } + return nil } // add message in to sleeping queue @@ -289,7 +293,7 @@ func (s *Service) publishSleepingMessageQueue(nodeID string) { // post messages for _, msg := range validMsgs { - s.postMessage(&msg, false) + _ = s.postMessage(&msg, false) // Ignore error for sleeping queue messages } // remove messages from the map