diff --git a/central/sensor/service/common/sensor_ack_test.go b/central/sensor/service/common/sensor_ack_test.go new file mode 100644 index 0000000000000..673d0aae1c719 --- /dev/null +++ b/central/sensor/service/common/sensor_ack_test.go @@ -0,0 +1,59 @@ +package common + +import ( + "testing" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stretchr/testify/assert" +) + +func TestSendSensorACK_NACK(t *testing.T) { + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + SendSensorACK(t.Context(), central.SensorACK_NACK, central.SensorACK_VM_INDEX_REPORT, "vm-nack", centralsensor.SensorACKReasonRateLimited, injector) + + assert.Len(t, injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + assert.NotNil(t, ack) + assert.Equal(t, central.SensorACK_NACK, ack.GetAction()) + assert.Equal(t, central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + assert.Equal(t, "vm-nack", ack.GetResourceId()) + assert.Equal(t, centralsensor.SensorACKReasonRateLimited, ack.GetReason()) +} + +func TestSendSensorACK_NilInjector(t *testing.T) { + assert.NotPanics(t, func() { + SendSensorACK(t.Context(), central.SensorACK_ACK, central.SensorACK_VM_INDEX_REPORT, "vm-1", "", nil) + }) +} + +func TestSendSensorACK_InjectorWithoutCapabilitySupport(t *testing.T) { + injector := &mockInjector{} + + SendSensorACK(t.Context(), central.SensorACK_ACK, central.SensorACK_VM_INDEX_REPORT, "vm-1", "", injector) + + assert.Empty(t, injector.messages, "should not send when SensorACKSupport capability is not advertised") +} + +type mockInjector struct { + messages []*central.MsgToSensor + injectErr error + capabilities map[centralsensor.SensorCapability]bool +} + +func (m *mockInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { + m.messages = append(m.messages, msg) + return m.injectErr +} + +func (m *mockInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} + +func (m *mockInjector) HasCapability(cap centralsensor.SensorCapability) bool { + return m.capabilities[cap] +} diff --git a/central/sensor/service/connection/connection_impl.go b/central/sensor/service/connection/connection_impl.go index f5103d60566a2..1fdf37376f31b 100644 --- a/central/sensor/service/connection/connection_impl.go +++ b/central/sensor/service/connection/connection_impl.go @@ -185,6 +185,9 @@ func (c *sensorConnection) multiplexedPush(ctx context.Context, msg *central.Msg reason, ) c.emitRateLimitedAdminEvent(c.clusterID, reason) + if vmReport := msg.GetEvent().GetVirtualMachineIndexReport(); vmReport != nil { + common.SendSensorACK(ctx, central.SensorACK_NACK, central.SensorACK_VM_INDEX_REPORT, vmReport.GetId(), centralsensor.SensorACKReasonRateLimited, c) + } return } diff --git a/central/sensor/service/pipeline/virtualmachineindex/pipeline.go b/central/sensor/service/pipeline/virtualmachineindex/pipeline.go index 7c91848821402..0e090230089fb 100644 --- a/central/sensor/service/pipeline/virtualmachineindex/pipeline.go +++ b/central/sensor/service/pipeline/virtualmachineindex/pipeline.go @@ -60,10 +60,22 @@ func (p *pipelineImpl) Match(msg *central.MsgFromSensor) bool { return msg.GetEvent().GetVirtualMachineIndexReport() != nil } +// sendVMIndexACK is a convenience wrapper around common.SendSensorACK for VM index reports. +func sendVMIndexACK(ctx context.Context, resourceID, reason string, injector common.MessageInjector) { + common.SendSensorACK(ctx, central.SensorACK_ACK, central.SensorACK_VM_INDEX_REPORT, resourceID, reason, injector) +} + +// sendVMIndexNACK is a convenience wrapper around common.SendSensorACK for VM index reports. +func sendVMIndexNACK(ctx context.Context, resourceID, reason string, injector common.MessageInjector) { + common.SendSensorACK(ctx, central.SensorACK_NACK, central.SensorACK_VM_INDEX_REPORT, resourceID, reason, injector) +} + func (p *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.MsgFromSensor, injector common.MessageInjector) error { defer countMetrics.IncrementResourceProcessedCounter(pipeline.ActionToOperation(msg.GetEvent().GetAction()), metrics.VirtualMachineIndex) if !features.VirtualMachines.Enabled() { + // ACK to prevent the sender from retrying when the feature is disabled on Central. + sendVMIndexACK(ctx, msg.GetEvent().GetVirtualMachineIndexReport().GetId(), centralsensor.SensorACKReasonFeatureDisabled, injector) return nil } event := msg.GetEvent() @@ -77,12 +89,14 @@ func (p *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.M event.GetAction().String(), central.ResourceAction_SYNC_RESOURCE.String(), ) + sendVMIndexNACK(ctx, index.GetId(), centralsensor.SensorACKReasonUnsupportedAction, injector) return nil } log.Debugf("Received virtual machine index report: %s", index.GetId()) if clusterID == "" { + sendVMIndexNACK(ctx, index.GetId(), centralsensor.SensorACKReasonMissingClusterID, injector) return errors.New("missing cluster ID in pipeline context") } @@ -92,22 +106,26 @@ func (p *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.M // Extract Scanner V4 index report from VM index report event indexV4 := index.GetIndex().GetIndexV4() if indexV4 == nil { + sendVMIndexNACK(ctx, index.GetId(), centralsensor.SensorACKReasonMissingScanData, injector) return errors.Errorf("VM index report %s missing Scanner V4 index data", index.GetId()) } // Enrich VM with vulnerabilities err := p.enricher.EnrichVirtualMachineWithVulnerabilities(vm, indexV4) if err != nil { + sendVMIndexNACK(ctx, index.GetId(), centralsensor.SensorACKReasonEnrichmentFailed, injector) return errors.Wrapf(err, "failed to enrich VM %s with vulnerabilities", index.GetId()) } // Store enriched VM if err := p.vmDatastore.UpdateVirtualMachineScan(ctx, vm.GetId(), vm.GetScan()); err != nil { + sendVMIndexNACK(ctx, index.GetId(), centralsensor.SensorACKReasonStorageFailed, injector) return errors.Wrapf(err, "failed to upsert VM %s to datastore", index.GetId()) } log.Debugf("Successfully enriched and stored VM %s with %d components", vm.GetId(), len(vm.GetScan().GetComponents())) + sendVMIndexACK(ctx, index.GetId(), "", injector) return nil } diff --git a/central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go b/central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go index 1635e2b45ec11..65e2185bf2b8a 100644 --- a/central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go +++ b/central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go @@ -12,6 +12,7 @@ import ( v1 "github.com/stackrox/rox/generated/internalapi/virtualmachine/v1" "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/features" vmEnricherMocks "github.com/stackrox/rox/pkg/virtualmachine/enricher/mocks" "github.com/stretchr/testify/assert" @@ -323,6 +324,209 @@ func TestPipelineEdgeCases(t *testing.T) { }) } +// mockInjector records InjectMessage calls. +type mockInjector struct { + messages []*central.MsgToSensor + injectErr error + capabilities map[centralsensor.SensorCapability]bool +} + +func (m *mockInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { + m.messages = append(m.messages, msg) + return m.injectErr +} + +func (m *mockInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} + +func (m *mockInjector) HasCapability(cap centralsensor.SensorCapability) bool { + return m.capabilities[cap] +} + +func (suite *PipelineTestSuite) TestRun_SendsACKOnSuccess() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + vmID := "vm-ack-test" + msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE) + + suite.enricher.EXPECT(). + EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()). + Return(nil) + suite.vmDatastore.EXPECT(). + UpdateVirtualMachineScan(ctx, vmID, gomock.Any()). + Return(nil) + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := suite.pipeline.Run(ctx, testClusterID, msg, injector) + suite.NoError(err) + + suite.Require().Len(injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + suite.Require().NotNil(ack) + suite.Equal(central.SensorACK_ACK, ack.GetAction()) + suite.Equal(central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + suite.Equal(vmID, ack.GetResourceId()) + suite.Empty(ack.GetReason()) +} + +func (suite *PipelineTestSuite) TestRun_NoACKWhenCapabilityMissing() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + vmID := "vm-no-cap" + msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE) + + suite.enricher.EXPECT(). + EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()). + Return(nil) + suite.vmDatastore.EXPECT(). + UpdateVirtualMachineScan(ctx, vmID, gomock.Any()). + Return(nil) + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{}, + } + + err := suite.pipeline.Run(ctx, testClusterID, msg, injector) + suite.NoError(err) + suite.Empty(injector.messages, "should not send ACK when SensorACKSupport is missing") +} + +func (suite *PipelineTestSuite) TestRun_NACKOnDBError() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + vmID := "vm-error" + msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE) + + suite.enricher.EXPECT(). + EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()). + Return(nil) + suite.vmDatastore.EXPECT(). + UpdateVirtualMachineScan(ctx, vmID, gomock.Any()). + Return(errors.New("db error")) + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := suite.pipeline.Run(ctx, testClusterID, msg, injector) + suite.Error(err) + + suite.Require().Len(injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + suite.Require().NotNil(ack) + suite.Equal(central.SensorACK_NACK, ack.GetAction()) + suite.Equal(central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + suite.Equal(vmID, ack.GetResourceId()) + suite.Equal(centralsensor.SensorACKReasonStorageFailed, ack.GetReason()) +} + +func (suite *PipelineTestSuite) TestRun_NACKOnEnrichmentError() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + vmID := "vm-enrich-fail" + msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE) + + suite.enricher.EXPECT(). + EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()). + Return(errors.New("scanner unavailable")) + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := suite.pipeline.Run(ctx, testClusterID, msg, injector) + suite.Error(err) + + suite.Require().Len(injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + suite.Require().NotNil(ack) + suite.Equal(central.SensorACK_NACK, ack.GetAction()) + suite.Equal(central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + suite.Equal(vmID, ack.GetResourceId()) + suite.Equal(centralsensor.SensorACKReasonEnrichmentFailed, ack.GetReason()) +} + +func (suite *PipelineTestSuite) TestRun_NACKOnMissingClusterID() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + vmID := "vm-no-cluster" + msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE) + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := suite.pipeline.Run(ctx, "", msg, injector) + suite.ErrorContains(err, "missing cluster ID") + + suite.Require().Len(injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + suite.Require().NotNil(ack) + suite.Equal(central.SensorACK_NACK, ack.GetAction()) + suite.Equal(central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + suite.Equal(vmID, ack.GetResourceId()) + suite.Equal(centralsensor.SensorACKReasonMissingClusterID, ack.GetReason()) +} + +func (suite *PipelineTestSuite) TestRun_NACKOnMissingScannerIndexPayload() { + suite.T().Setenv(features.VirtualMachines.EnvVar(), "true") + tests := []struct { + name string + index *v1.IndexReport + }{ + { + name: "nil Index", + index: nil, + }, + { + name: "Index without Scanner V4 payload", + index: &v1.IndexReport{}, + }, + } + + for _, tt := range tests { + suite.Run(tt.name, func() { + vmID := "vm-missing-payload-" + tt.name + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_Event{ + Event: ¢ral.SensorEvent{ + Id: vmID, + Action: central.ResourceAction_SYNC_RESOURCE, + Resource: ¢ral.SensorEvent_VirtualMachineIndexReport{ + VirtualMachineIndexReport: &v1.IndexReportEvent{ + Id: vmID, + Index: tt.index, + }, + }, + }, + }, + } + + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := suite.pipeline.Run(ctx, testClusterID, msg, injector) + suite.ErrorContains(err, "missing Scanner V4 index data") + + suite.Require().Len(injector.messages, 1) + ack := injector.messages[0].GetSensorAck() + suite.Require().NotNil(ack) + suite.Equal(central.SensorACK_NACK, ack.GetAction()) + suite.Equal(central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + suite.Equal(vmID, ack.GetResourceId()) + suite.Equal(centralsensor.SensorACKReasonMissingScanData, ack.GetReason()) + }) + } +} + func TestPipelineRun_DisabledFeature(t *testing.T) { t.Setenv(features.VirtualMachines.EnvVar(), "false") ctrl := gomock.NewController(t) @@ -338,7 +542,20 @@ func TestPipelineRun_DisabledFeature(t *testing.T) { vmID := "vm-1" msg := createVMIndexMessage(vmID, central.ResourceAction_CREATE_RESOURCE) - err := pipeline.Run(ctx, testClusterID, msg, nil) + injector := &mockInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } + + err := pipeline.Run(ctx, testClusterID, msg, injector) assert.NoError(t, err) + assert.Len(t, injector.messages, 1, "should ACK to prevent retries when feature is disabled") + ack := injector.messages[0].GetSensorAck() + assert.NotNil(t, ack) + assert.Equal(t, central.SensorACK_ACK, ack.GetAction()) + assert.Equal(t, central.SensorACK_VM_INDEX_REPORT, ack.GetMessageType()) + assert.Equal(t, vmID, ack.GetResourceId()) + assert.Equal(t, centralsensor.SensorACKReasonFeatureDisabled, ack.GetReason()) } diff --git a/pkg/centralsensor/sensor_ack.go b/pkg/centralsensor/sensor_ack.go new file mode 100644 index 0000000000000..6ecb2563c1d24 --- /dev/null +++ b/pkg/centralsensor/sensor_ack.go @@ -0,0 +1,13 @@ +package centralsensor + +// Reason constants for SensorACK messages (used with both ACK and NACK actions). +// Shared between Central (sender) and Sensor (receiver) for consistent handling. +const ( + SensorACKReasonRateLimited = "central rate limit exceeded" + SensorACKReasonEnrichmentFailed = "enrichment failed" + SensorACKReasonStorageFailed = "storage failed" + SensorACKReasonMissingScanData = "missing scanner index data" + SensorACKReasonMissingClusterID = "missing cluster ID" + SensorACKReasonUnsupportedAction = "unsupported action" + SensorACKReasonFeatureDisabled = "feature disabled on central" +)