From b3fdec14cc3031b8e2e72445c4affac520937a59 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:00:02 +0100 Subject: [PATCH 1/3] Empty From 9e5fd99f76d2214fdee9ec2a47f1ca6bacdc3e4c Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:20:56 +0100 Subject: [PATCH 2/3] Send new ACK messages in parrallel to the old ones --- .../service/pipeline/nodeindex/pipeline.go | 29 +++--- .../pipeline/nodeindex/pipeline_test.go | 91 +++++++++++++++++++ .../pipeline/nodeinventory/pipeline.go | 38 ++++++-- .../pipeline/nodeinventory/pipeline_test.go | 90 +++++++++++++++++- 4 files changed, 226 insertions(+), 22 deletions(-) diff --git a/central/sensor/service/pipeline/nodeindex/pipeline.go b/central/sensor/service/pipeline/nodeindex/pipeline.go index 17b83ccf1807a..4ca548f435c05 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline.go @@ -130,23 +130,30 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, injector common. if injector == nil { return } - reply := replyCompliance(node.GetClusterId(), node.GetName(), central.NodeInventoryACK_ACK) - if err := injector.InjectMessage(ctx, reply); err != nil { - log.Warnf("Failed sending node-indexing-ACK to Sensor for %s: %v", nodeDatastore.NodeString(node), err) - } else { - log.Debugf("Sent node-indexing-ACK for %s", nodeDatastore.NodeString(node)) + // Always send SensorACK (new path). + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_SensorAck{ + SensorAck: ¢ral.SensorACK{ + Action: central.SensorACK_ACK, + MessageType: central.SensorACK_NODE_INDEX_REPORT, + ResourceId: node.GetName(), + }, + }, + }); err != nil { + log.Warnf("Failed injecting SensorACK for node index report (node=%s): %v", node.GetName(), err) } -} -func replyCompliance(clusterID, nodeName string, t central.NodeInventoryACK_Action) *central.MsgToSensor { - return ¢ral.MsgToSensor{ + // Always send legacy NodeInventoryACK for backward compatibility. + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ Msg: ¢ral.MsgToSensor_NodeInventoryAck{ NodeInventoryAck: ¢ral.NodeInventoryACK{ - ClusterId: clusterID, - NodeName: nodeName, - Action: t, + ClusterId: node.GetClusterId(), + NodeName: node.GetName(), + Action: central.NodeInventoryACK_ACK, MessageType: central.NodeInventoryACK_NodeIndexer, }, }, + }); err != nil { + log.Warnf("Failed injecting legacy NodeInventoryACK for node index report (node=%s): %v", node.GetName(), err) } } diff --git a/central/sensor/service/pipeline/nodeindex/pipeline_test.go b/central/sensor/service/pipeline/nodeindex/pipeline_test.go index 7a1a78a34f8b3..63fb654bfb325 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline_test.go @@ -9,8 +9,10 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/features" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" + "github.com/stackrox/rox/pkg/protoassert" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" ) @@ -77,6 +79,58 @@ func TestPipelineEnrichesAndUpserts(t *testing.T) { } } +func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) { + t.Setenv(features.NodeIndexEnabled.EnvVar(), "true") + t.Setenv(features.ScannerV4.EnvVar(), "true") + + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + node := storage.Node{ + Id: "1", + Name: "node-name", + ClusterId: "cluster-id", + } + msg := createMsg(mockIndexReport) + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), nil, gomock.Any()).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + injector := &recordingInjector{} + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + riskManager: riskManager, + enricher: enricher, + } + + err := p.Run(t.Context(), node.GetClusterId(), msg, injector) + assert.NoError(t, err) + + protoassert.SlicesEqual(t, []*central.SensorACK{ + { + Action: central.SensorACK_ACK, + MessageType: central.SensorACK_NODE_INDEX_REPORT, + ResourceId: node.GetName(), + }, + }, injector.getSentSensorACKs()) + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: node.GetClusterId(), + NodeName: node.GetName(), + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeIndexer, + }, + }, injector.getSentACKs()) +} + func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { return ¢ral.MsgFromSensor{ Msg: ¢ral.MsgFromSensor_Event{ @@ -90,6 +144,43 @@ func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { } } +type recordingInjector struct { + legacy []*central.NodeInventoryACK + sensor []*central.SensorACK +} + +func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { + if ack := msg.GetNodeInventoryAck(); ack != nil { + r.legacy = append(r.legacy, ack.CloneVT()) + } + if ack := msg.GetSensorAck(); ack != nil { + r.sensor = append(r.sensor, ack.CloneVT()) + } + return nil +} + +func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} + +func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { + out := make([]*central.NodeInventoryACK, 0, len(r.legacy)) + for _, ack := range r.legacy { + if ack != nil { + out = append(out, ack) + } + } + return out +} + +func (r *recordingInjector) getSentSensorACKs() []*central.SensorACK { + out := make([]*central.SensorACK, 0, len(r.sensor)) + for _, ack := range r.sensor { + if ack != nil { + out = append(out, ack) + } + } + return out +} + var ( mockIndexReport = &v4.IndexReport{ HashId: "", diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline.go b/central/sensor/service/pipeline/nodeinventory/pipeline.go index 90b0ef3b02157..20e04a296e263 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline.go @@ -142,16 +142,29 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, ninv *storage.No if injector == nil { return } - reply := replyCompliance(node.GetClusterId(), ninv.GetNodeName(), central.NodeInventoryACK_ACK) - if err := injector.InjectMessage(ctx, reply); err != nil { - log.Warnf("Failed sending node-inventory-ACK to Sensor for %s: %v", nodeDatastore.NodeString(node), err) - } else { - log.Debugf("Sent node-inventory-ACK for %s", nodeDatastore.NodeString(node)) - } + replyCompliance(ctx, node.GetClusterId(), ninv.GetNodeName(), central.NodeInventoryACK_ACK, injector) } -func replyCompliance(clusterID, nodeName string, t central.NodeInventoryACK_Action) *central.MsgToSensor { - return ¢ral.MsgToSensor{ +func replyCompliance(ctx context.Context, clusterID, nodeName string, t central.NodeInventoryACK_Action, injector common.MessageInjector) { + if injector == nil { + return + } + + // Always send SensorACK (new path). + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_SensorAck{ + SensorAck: ¢ral.SensorACK{ + Action: convertLegacyActionToSensor(t), + MessageType: central.SensorACK_NODE_INVENTORY, + ResourceId: nodeName, + }, + }, + }); err != nil { + log.Warnf("Failed injecting SensorACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) + } + + // Always send legacy NodeInventoryACK for backward compatibility. + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ Msg: ¢ral.MsgToSensor_NodeInventoryAck{ NodeInventoryAck: ¢ral.NodeInventoryACK{ ClusterId: clusterID, @@ -160,7 +173,16 @@ func replyCompliance(clusterID, nodeName string, t central.NodeInventoryACK_Acti MessageType: central.NodeInventoryACK_NodeInventory, }, }, + }); err != nil { + log.Warnf("Failed injecting legacy NodeInventoryACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) } } func (p *pipelineImpl) OnFinish(_ string) {} + +func convertLegacyActionToSensor(action central.NodeInventoryACK_Action) central.SensorACK_Action { + if action == central.NodeInventoryACK_ACK { + return central.SensorACK_ACK + } + return central.SensorACK_NACK +} diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go index eb6777d7740e2..3763043c8e7a6 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go @@ -167,24 +167,92 @@ func Test_pipelineImpl_Run(t *testing.T) { if len(tt.wantInjectorContain) == 0 { assert.Len(t, inj.getSentACKs(), 0) } else { - protoassert.SlicesEqual(t, tt.wantInjectorContain, inj.getSentACKs()) + protoassert.SlicesEqual(t, tt.wantInjectorContain, inj.getSentACKs(), "sent ACKs: %v", inj.getSentACKs()) } } }) } } +func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) { + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + clusterID := "cluster-1" + nodeName := "node-name" + node := storage.Node{ + Id: "node-id", + ClusterId: clusterID, + } + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_Event{ + Event: ¢ral.SensorEvent{ + Action: central.ResourceAction_CREATE_RESOURCE, + Resource: ¢ral.SensorEvent_NodeInventory{ + NodeInventory: &storage.NodeInventory{ + NodeId: node.GetId(), + NodeName: nodeName, + }, + }, + }, + }, + } + injector := &recordingInjector{} + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), gomock.Any(), nil).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + enricher: enricher, + riskManager: riskManager, + } + + err := p.Run(context.Background(), clusterID, msg, injector) + assert.NoError(t, err) + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: clusterID, + NodeName: nodeName, + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeInventory, + }, + }, injector.getSentACKs(), "legacy ACKs") + + protoassert.SlicesEqual(t, []*central.SensorACK{ + { + Action: central.SensorACK_ACK, + MessageType: central.SensorACK_NODE_INVENTORY, + ResourceId: nodeName, + }, + }, injector.getSentSensorACKs(), "sensor ACKs") +} + var _ common.MessageInjector = (*recordingInjector)(nil) type recordingInjector struct { lock sync.Mutex messages []*central.NodeInventoryACK + sensor []*central.SensorACK } func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { r.lock.Lock() defer r.lock.Unlock() - r.messages = append(r.messages, msg.GetNodeInventoryAck().CloneVT()) + if ack := msg.GetNodeInventoryAck(); ack != nil { + r.messages = append(r.messages, ack.CloneVT()) + } + if ack := msg.GetSensorAck(); ack != nil { + r.sensor = append(r.sensor, ack.CloneVT()) + } return nil } @@ -194,6 +262,22 @@ func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { r.lock.Lock() defer r.lock.Unlock() copied := make([]*central.NodeInventoryACK, 0, len(r.messages)) - copied = append(copied, r.messages...) + for _, m := range r.messages { + if m != nil { + copied = append(copied, m) + } + } + return copied +} + +func (r *recordingInjector) getSentSensorACKs() []*central.SensorACK { + r.lock.Lock() + defer r.lock.Unlock() + copied := make([]*central.SensorACK, 0, len(r.sensor)) + for _, m := range r.sensor { + if m != nil { + copied = append(copied, m) + } + } return copied } From 5db0e0684fbdf22cbd4ccdafbc8881958bc89378 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Wed, 4 Mar 2026 12:37:26 +0100 Subject: [PATCH 3/3] Self-review: Unify ack behavior among the pipelines --- .../sensor/service/pipeline/nodeindex/pipeline.go | 2 ++ .../service/pipeline/nodeindex/pipeline_test.go | 8 ++++++++ .../service/pipeline/nodeinventory/pipeline.go | 14 +++++--------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/central/sensor/service/pipeline/nodeindex/pipeline.go b/central/sensor/service/pipeline/nodeindex/pipeline.go index 4ca548f435c05..77e867b4fd49c 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline.go @@ -156,4 +156,6 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, injector common. }); err != nil { log.Warnf("Failed injecting legacy NodeInventoryACK for node index report (node=%s): %v", node.GetName(), err) } + + log.Debugf("Sent node-indexing ACKs for node %s in cluster %s", node.GetName(), node.GetClusterId()) } diff --git a/central/sensor/service/pipeline/nodeindex/pipeline_test.go b/central/sensor/service/pipeline/nodeindex/pipeline_test.go index 63fb654bfb325..b62b4342a49e8 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline_test.go @@ -13,6 +13,7 @@ import ( "github.com/stackrox/rox/pkg/features" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" "github.com/stackrox/rox/pkg/protoassert" + "github.com/stackrox/rox/pkg/sync" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" ) @@ -145,11 +146,14 @@ func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { } type recordingInjector struct { + lock sync.Mutex legacy []*central.NodeInventoryACK sensor []*central.SensorACK } func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { + r.lock.Lock() + defer r.lock.Unlock() if ack := msg.GetNodeInventoryAck(); ack != nil { r.legacy = append(r.legacy, ack.CloneVT()) } @@ -162,6 +166,8 @@ func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.M func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { + r.lock.Lock() + defer r.lock.Unlock() out := make([]*central.NodeInventoryACK, 0, len(r.legacy)) for _, ack := range r.legacy { if ack != nil { @@ -172,6 +178,8 @@ func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { } func (r *recordingInjector) getSentSensorACKs() []*central.SensorACK { + r.lock.Lock() + defer r.lock.Unlock() out := make([]*central.SensorACK, 0, len(r.sensor)) for _, ack := range r.sensor { if ack != nil { diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline.go b/central/sensor/service/pipeline/nodeinventory/pipeline.go index 20e04a296e263..965c1f6a1802f 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline.go @@ -94,7 +94,7 @@ func (p *pipelineImpl) Run(ctx context.Context, _ string, msg *central.MsgFromSe if shouldDiscardMsg(node) { // To prevent resending the inventory, still acknowledge receipt of it - sendComplianceAck(ctx, node, ninv, injector) + replyCompliance(ctx, node.GetClusterId(), ninv.GetNodeName(), central.NodeInventoryACK_ACK, injector) log.Debug("Discarding v2 NodeScan in favor of v4 NodeScan") return nil } @@ -115,7 +115,7 @@ func (p *pipelineImpl) Run(ctx context.Context, _ string, msg *central.MsgFromSe return err } - sendComplianceAck(ctx, node, ninv, injector) + replyCompliance(ctx, node.GetClusterId(), ninv.GetNodeName(), central.NodeInventoryACK_ACK, injector) return nil } @@ -138,13 +138,7 @@ func shouldDiscardMsg(node *storage.Node) bool { return false } -func sendComplianceAck(ctx context.Context, node *storage.Node, ninv *storage.NodeInventory, injector common.MessageInjector) { - if injector == nil { - return - } - replyCompliance(ctx, node.GetClusterId(), ninv.GetNodeName(), central.NodeInventoryACK_ACK, injector) -} - +// replyCompliance uses injector to send a SensorACK and NodeInventoryACK to Compliance. func replyCompliance(ctx context.Context, clusterID, nodeName string, t central.NodeInventoryACK_Action, injector common.MessageInjector) { if injector == nil { return @@ -176,6 +170,8 @@ func replyCompliance(ctx context.Context, clusterID, nodeName string, t central. }); err != nil { log.Warnf("Failed injecting legacy NodeInventoryACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) } + + log.Debugf("Sent node-inventory ACKs for node %s in cluster %s", nodeName, clusterID) } func (p *pipelineImpl) OnFinish(_ string) {}