Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/service/deletion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ func (svc *DeletionService) onEventReceive(busData *busTY.BusData) {
}
}

func (svc *DeletionService) processEvent(item interface{}) {
func (svc *DeletionService) processEvent(item interface{}) error {
busData := item.(*busTY.BusData)
event := &eventTY.Event{}
err := busData.LoadData(event)
if err != nil {
svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err))
return
return nil
}

// if it is not a deletion event, return from here
if event.Type != eventTY.TypeDeleted {
return
return nil
}

svc.logger.Debug("received an deletion event", zap.Any("event", event))
Expand All @@ -116,7 +116,7 @@ func (svc *DeletionService) processEvent(item interface{}) {
err = event.LoadEntity(gateway)
if err != nil {
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
return
return nil
}
svc.deleteNodes(gateway)

Expand All @@ -125,7 +125,7 @@ func (svc *DeletionService) processEvent(item interface{}) {
err = event.LoadEntity(node)
if err != nil {
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
return
return nil
}
svc.deleteSources(node)

Expand All @@ -134,14 +134,15 @@ func (svc *DeletionService) processEvent(item interface{}) {
err = event.LoadEntity(source)
if err != nil {
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
return
return nil
}
svc.deleteFields(source)

default:
// do not proceed further
return
return nil
}
return nil
}

// deletes nodes
Expand Down
9 changes: 5 additions & 4 deletions pkg/service/forward_payload/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (svc *ForwardPayloadService) Close() error {
}

// processEvent from the queue
func (svc *ForwardPayloadService) processEvent(item interface{}) {
func (svc *ForwardPayloadService) processEvent(item interface{}) error {
field := item.(*field.Field)

quickID, err := quickIdUtils.GetQuickID(*field)
if err != nil {
svc.logger.Error("unable to get quick id", zap.Error(err), zap.String("gateway", field.GatewayID), zap.String("node", field.NodeID), zap.String("source", field.SourceID), zap.String("field", field.FieldID))
return
return nil
}

// fetch mapped filed for this event
Expand All @@ -145,11 +145,11 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) {
response, err := svc.api.ForwardPayload().List(filters, pagination)
if err != nil {
svc.logger.Error("error getting mapping data from database", zap.Error(err))
return
return nil
}

if response.Count == 0 {
return
return nil
}

svc.logger.Debug("Starting data forwarding", zap.Any("data", field))
Expand All @@ -167,4 +167,5 @@ func (svc *ForwardPayloadService) processEvent(item interface{}) {
}
}
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/service/gateway/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (svc *GatewayService) onEvent(event *busTY.BusData) {
}

// processEvent from the queue
func (svc *GatewayService) processEvent(event interface{}) {
func (svc *GatewayService) processEvent(event interface{}) error {
reqEvent := event.(*rsTY.ServiceEvent)
svc.logger.Debug("Processing a request", zap.Any("event", reqEvent))

Expand All @@ -92,7 +92,7 @@ func (svc *GatewayService) processEvent(event interface{}) {
if err != nil {
svc.logger.Error("error on stopping a service", zap.Error(err), zap.String("id", reqEvent.ID))
}
return
return nil
}
gwCfg := svc.getGatewayConfig(reqEvent)
if gwCfg != nil {
Expand Down Expand Up @@ -129,6 +129,7 @@ func (svc *GatewayService) processEvent(event interface{}) {
default:
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
}
return nil
}

func (svc *GatewayService) getGatewayConfig(reqEvent *rsTY.ServiceEvent) *gwTY.Config {
Expand Down
7 changes: 6 additions & 1 deletion pkg/service/gateway_msg_processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (svc *MessageProcessor) Close() error {
}

// processMessage from the queue
func (svc *MessageProcessor) processMessage(item interface{}) {
func (svc *MessageProcessor) processMessage(item interface{}) error {
msg := item.(*msgTY.Message)
svc.logger.Debug("Starting Message Processing", zap.Any("message", msg))

Expand All @@ -142,6 +142,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
err := svc.setFieldData(msg)
if err != nil {
svc.logger.Error("error on field data set", zap.Error(err))
return err // Requeue on error
}
// update last seen
svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp)
Expand All @@ -151,12 +152,14 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
err := svc.requestFieldData(msg)
if err != nil {
svc.logger.Error("error on field data request", zap.Error(err))
return err // Requeue on error
}

case msgTY.TypePresentation: // update source data, like name or other details
err := svc.updateSourceDetail(msg)
if err != nil {
svc.logger.Error("error on source data update", zap.Error(err))
return err // Requeue on error
}
// update last seen
svc.updateSourceLastSeen(msg.GatewayID, msg.NodeID, msg.SourceID, msg.Timestamp)
Expand All @@ -172,6 +175,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
err := svc.updateNodeData(msg)
if err != nil {
svc.logger.Error("error on node data update", zap.Error(err))
return err // Requeue on error
}
// node last seen managed in updateNodeData

Expand All @@ -194,6 +198,7 @@ func (svc *MessageProcessor) processMessage(item interface{}) {
}

svc.logger.Debug("message processed", zap.String("timeTaken", time.Since(msg.Timestamp).String()), zap.Any("message", msg))
return nil
}

// update node detail
Expand Down
13 changes: 8 additions & 5 deletions pkg/service/handler/message_listerner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (svc *HandlerService) closeMessageListener() {
svc.messageQueue.Close()
}

func (svc *HandlerService) processHandlerMessage(item interface{}) {
func (svc *HandlerService) processHandlerMessage(item interface{}) error {
msg := item.(*handlerTY.MessageWrapper)
start := time.Now()

Expand All @@ -56,16 +56,18 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) {
handler := svc.store.Get(msg.ID)
if handler == nil {
svc.logger.Info("handler not available", zap.Any("handlerID", msg.ID), zap.Any("availableHandlers", svc.store.ListIDs()))
return
return nil // Don't requeue if handler not available
}

state := handler.State()

err := handler.Post(msg.Data)
if err != nil {
// if err == handlerTY.ErrReQueue {
// // TODO: requeue and try again
// }
if err == handlerTY.ErrReQueue {
// Requeue the message to try again
svc.logger.Info("requeuing message", zap.Any("handlerID", msg.ID))
return err
}
svc.logger.Warn("error from handler", zap.Any("handlerID", msg.ID), zap.Error(err))
state.Status = types.StatusError
state.Message = err.Error()
Expand All @@ -76,4 +78,5 @@ func (svc *HandlerService) processHandlerMessage(item interface{}) {

state.Since = time.Now()
busUtils.SetHandlerState(svc.logger, svc.bus, msg.ID, *state)
return nil
}
6 changes: 4 additions & 2 deletions pkg/service/handler/service_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ func (svc *HandlerService) onServiceEvent(event *busTY.BusData) {
}

// postProcessServiceEvent from the queue
func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
func (svc *HandlerService) postProcessServiceEvent(event interface{}) error {
reqEvent := event.(*rsTY.ServiceEvent)
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))

if reqEvent.Type != rsTY.TypeHandler {
svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent))
return nil
}

switch reqEvent.Command {
Expand All @@ -162,7 +163,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
if err != nil {
svc.logger.Error("error on stopping a service", zap.Error(err))
}
return
return nil
}
cfg := svc.getConfig(reqEvent)
if cfg != nil {
Expand All @@ -187,6 +188,7 @@ func (svc *HandlerService) postProcessServiceEvent(event interface{}) {
default:
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
}
return nil
}

func (svc *HandlerService) getConfig(reqEvent *rsTY.ServiceEvent) *handlerTY.Config {
Expand Down
3 changes: 2 additions & 1 deletion pkg/service/resource/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (svc *ResourceService) onEvent(data *busTY.BusData) {
}

// processEvent from the queue
func (svc *ResourceService) processEvent(item interface{}) {
func (svc *ResourceService) processEvent(item interface{}) error {
request := item.(*rsTY.ServiceEvent)
svc.logger.Debug("processing an event", zap.Any("event", request))
start := time.Now()
Expand Down Expand Up @@ -170,6 +170,7 @@ func (svc *ResourceService) processEvent(item interface{}) {
svc.logger.Warn("unknown event type", zap.Any("event", request))
}
svc.logger.Debug("completed a resource service", zap.String("timeTaken", time.Since(start).String()), zap.Any("data", request))
return nil
}

func (svc *ResourceService) postResponse(topic string, response *rsTY.ServiceEvent) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/service/scheduler/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (svc *SchedulerService) onServiceEvent(event *busTY.BusData) {
}

// processServiceEvent from the queue
func (svc *SchedulerService) processServiceEvent(event interface{}) {
func (svc *SchedulerService) processServiceEvent(event interface{}) error {
reqEvent := event.(*rsTY.ServiceEvent)
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))

Expand Down Expand Up @@ -105,6 +105,7 @@ func (svc *SchedulerService) processServiceEvent(event interface{}) {
default:
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
}
return nil
}

func (svc *SchedulerService) getConfig(reqEvent *rsTY.ServiceEvent) *schedulerTY.Config {
Expand Down
7 changes: 4 additions & 3 deletions pkg/service/system_jobs/service_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ func (svc *SystemJobsService) onEvent(event *busTY.BusData) {
}

// processEvent from the queue
func (svc *SystemJobsService) processEvent(event interface{}) {
func (svc *SystemJobsService) processEvent(event interface{}) error {
reqEvent := event.(*rsTY.ServiceEvent)
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))

if reqEvent.Type != rsTY.TypeSystemJobs {
svc.logger.Warn("unsupported event type", zap.Any("event", reqEvent))
return
return nil
}

if reqEvent.Command != rsTY.CommandReload {
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
return
return nil
}

switch reqEvent.Data {
Expand All @@ -77,4 +77,5 @@ func (svc *SystemJobsService) processEvent(event interface{}) {
default:
// NOOP
}
return nil
}
16 changes: 9 additions & 7 deletions pkg/service/task/listener_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (svc *TaskService) onEventReceive(busData *busTY.BusData) {
}
}

func (svc *TaskService) processPreEvent(item interface{}) {
func (svc *TaskService) processPreEvent(item interface{}) error {
busData := item.(*busTY.BusData)

event := &eventTY.Event{}
err := busData.LoadData(event)
if err != nil {
svc.logger.Warn("error on convert to target type", zap.Any("topic", busData.Topic), zap.Error(err))
return
return nil
}

var out interface{}
Expand All @@ -78,21 +78,21 @@ func (svc *TaskService) processPreEvent(item interface{}) {

default:
// return do not proceed further
return
return nil
}

err = event.LoadEntity(out)
if err != nil {
svc.logger.Warn("error on loading entity", zap.Any("event", event), zap.Error(err))
return
return nil
}
event.Entity = out

resourceWrapper := &eventWrapper{Event: event}
err = svc.resourcePreProcessor(resourceWrapper)
if err != nil {
svc.logger.Error("error on executing a resource", zap.Any("resource", resourceWrapper), zap.Error(err))
return
return err
}

if len(resourceWrapper.Tasks) > 0 {
Expand All @@ -101,6 +101,7 @@ func (svc *TaskService) processPreEvent(item interface{}) {
svc.logger.Error("failed to post selected tasks on post processor queue")
}
}
return nil
}

func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error {
Expand All @@ -119,11 +120,11 @@ func (svc *TaskService) resourcePreProcessor(evntWrapper *eventWrapper) error {
return nil
}

func (svc *TaskService) resourcePostProcessor(item interface{}) {
func (svc *TaskService) resourcePostProcessor(item interface{}) error {
evntWrapper, ok := item.(*eventWrapper)
if !ok {
svc.logger.Warn("supplied item is not resourceWrapper", zap.Any("item", item))
return
return nil
}

svc.logger.Debug("resourceWrapper received", zap.String("entityType", evntWrapper.Event.EntityType))
Expand All @@ -132,4 +133,5 @@ func (svc *TaskService) resourcePostProcessor(item interface{}) {
task := evntWrapper.Tasks[index]
svc.executeTask(&task, evntWrapper)
}
return nil
}
3 changes: 2 additions & 1 deletion pkg/service/task/listener_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (svc *TaskService) onServiceEvent(busData *busTY.BusData) {
}

// processServiceEvent from the queue
func (svc *TaskService) processServiceEvent(event interface{}) {
func (svc *TaskService) processServiceEvent(event interface{}) error {
reqEvent := event.(*rsTY.ServiceEvent)
svc.logger.Debug("processing a request", zap.Any("event", reqEvent))

Expand Down Expand Up @@ -116,6 +116,7 @@ func (svc *TaskService) processServiceEvent(event interface{}) {
default:
svc.logger.Warn("unsupported command", zap.Any("event", reqEvent))
}
return nil
}

func (svc *TaskService) getConfig(reqEvent *rsTY.ServiceEvent) *taskTY.Config {
Expand Down
Loading
Loading