diff --git a/helm-charts b/helm-charts index 86beffecd..4a5692642 160000 --- a/helm-charts +++ b/helm-charts @@ -1 +1 @@ -Subproject commit 86beffecd2233e69b9230b806c8be66da914cbb5 +Subproject commit 4a5692642773c8c17778065f3c3050636e80ee88 diff --git a/src/operator/controllers/intents_controller.go b/src/operator/controllers/intents_controller.go index 3d31bf468..4af03fc33 100644 --- a/src/operator/controllers/intents_controller.go +++ b/src/operator/controllers/intents_controller.go @@ -24,6 +24,7 @@ import ( "github.com/otterize/intents-operator/src/operator/controllers/intents_reconcilers/otterizecloud" "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls" "github.com/otterize/intents-operator/src/shared/reconcilergroup" + "github.com/otterize/intents-operator/src/shared/serviceidresolver" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -51,13 +52,15 @@ func NewIntentsReconciler( endpointsReconciler *external_traffic.EndpointsReconciler, restrictToNamespaces []string, enforcementConfig EnforcementConfig, - otterizeClient otterizecloud.CloudClient) *IntentsReconciler { + otterizeClient otterizecloud.CloudClient, + operatorPodName string, + operatorPodNamespace string) *IntentsReconciler { intentsReconciler := &IntentsReconciler{ group: reconcilergroup.NewGroup("intents-reconciler", client, scheme, intents_reconcilers.NewCRDValidatorReconciler(client, scheme), intents_reconcilers.NewPodLabelReconciler(client, scheme), intents_reconcilers.NewNetworkPolicyReconciler(client, scheme, endpointsReconciler, restrictToNamespaces, enforcementConfig.EnableNetworkPolicy, enforcementConfig.EnforcementEnabledGlobally), - intents_reconcilers.NewKafkaACLReconciler(client, scheme, kafkaServerStore, enforcementConfig.EnableKafkaACL, kafkaacls.NewKafkaIntentsAdmin, enforcementConfig.EnforcementEnabledGlobally), + intents_reconcilers.NewKafkaACLReconciler(client, scheme, kafkaServerStore, enforcementConfig.EnableKafkaACL, kafkaacls.NewKafkaIntentsAdmin, enforcementConfig.EnforcementEnabledGlobally, operatorPodName, operatorPodNamespace, serviceidresolver.NewResolver(client)), intents_reconcilers.NewIstioPolicyReconciler(client, scheme, restrictToNamespaces, enforcementConfig.EnableIstioPolicy, enforcementConfig.EnforcementEnabledGlobally), )} diff --git a/src/operator/controllers/intents_reconcilers/kafka_acls.go b/src/operator/controllers/intents_reconcilers/kafka_acls.go index 077095645..ff62ad6a5 100644 --- a/src/operator/controllers/intents_reconcilers/kafka_acls.go +++ b/src/operator/controllers/intents_reconcilers/kafka_acls.go @@ -6,6 +6,7 @@ import ( otterizev1alpha2 "github.com/otterize/intents-operator/src/operator/api/v1alpha2" "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls" "github.com/otterize/intents-operator/src/shared/injectablerecorder" + "github.com/otterize/intents-operator/src/shared/serviceidresolver" "github.com/sirupsen/logrus" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -16,14 +17,15 @@ import ( ) const ( - KafkaACLsFinalizerName = "intents.otterize.com/kafka-finalizer" - ReasonCouldNotConnectToKafkaServer = "CouldNotConnectToKafkaServer" - ReasonCouldNotApplyIntentsOnKafkaServer = "CouldNotApplyIntentsOnKafkaServer" - ReasonKafkaACLCreationDisabled = "KafkaACLCreationDisabled" - ReasonKafkaServerNotConfigured = "KafkaServerNotConfigured" - ReasonRemovingKafkaACLsFailed = "RemovingKafkaACLsFailed" - ReasonApplyingKafkaACLsFailed = "ApplyingKafkaACLsFailed" - ReasonAppliedKafkaACLs = "AppliedKafkaACLs" + KafkaACLsFinalizerName = "intents.otterize.com/kafka-finalizer" + ReasonCouldNotConnectToKafkaServer = "CouldNotConnectToKafkaServer" + ReasonCouldNotApplyIntentsOnKafkaServer = "CouldNotApplyIntentsOnKafkaServer" + ReasonKafkaACLCreationDisabled = "KafkaACLCreationDisabled" + ReasonKafkaServerNotConfigured = "KafkaServerNotConfigured" + ReasonRemovingKafkaACLsFailed = "RemovingKafkaACLsFailed" + ReasonApplyingKafkaACLsFailed = "ApplyingKafkaACLsFailed" + ReasonAppliedKafkaACLs = "AppliedKafkaACLs" + ReasonIntentsOperatorIdentityResolveFailed = "IntentsOperatorIdentityResolveFailed" ) type KafkaACLReconciler struct { @@ -33,10 +35,23 @@ type KafkaACLReconciler struct { enforcementEnabledGlobally bool enableKafkaACLCreation bool getNewKafkaIntentsAdmin kafkaacls.IntentsAdminFactoryFunction + operatorPodName string + operatorPodNamespace string + serviceResolver serviceidresolver.ServiceResolver injectablerecorder.InjectableRecorder } -func NewKafkaACLReconciler(client client.Client, scheme *runtime.Scheme, serversStore kafkaacls.ServersStore, enableKafkaACLCreation bool, factoryFunc kafkaacls.IntentsAdminFactoryFunction, enforcementEnabledGlobally bool) *KafkaACLReconciler { +func NewKafkaACLReconciler( + client client.Client, + scheme *runtime.Scheme, + serversStore kafkaacls.ServersStore, + enableKafkaACLCreation bool, + factoryFunc kafkaacls.IntentsAdminFactoryFunction, + enforcementEnabledGlobally bool, + operatorPodName string, + operatorPodNamespace string, + serviceResolver serviceidresolver.ServiceResolver, +) *KafkaACLReconciler { return &KafkaACLReconciler{ client: client, scheme: scheme, @@ -44,6 +59,9 @@ func NewKafkaACLReconciler(client client.Client, scheme *runtime.Scheme, servers enableKafkaACLCreation: enableKafkaACLCreation, getNewKafkaIntentsAdmin: factoryFunc, enforcementEnabledGlobally: enforcementEnabledGlobally, + operatorPodName: operatorPodName, + operatorPodNamespace: operatorPodNamespace, + serviceResolver: serviceResolver, } } @@ -147,6 +165,16 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } } + clientIsOperator, err := r.isIntentsForTheIntentsOperator(ctx, intents) + if err != nil { + return ctrl.Result{}, err + } + + if clientIsOperator { + logger.Info("Skipping ACLs creation for the intents operator") + return ctrl.Result{}, nil + } + var result ctrl.Result result, err = r.applyAcls(logger, intents) if err != nil { @@ -157,6 +185,24 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } +func (r *KafkaACLReconciler) isIntentsForTheIntentsOperator(ctx context.Context, intents *otterizev1alpha2.ClientIntents) (bool, error) { + if r.operatorPodNamespace != intents.Namespace { + return false, nil + } + + name, ok, err := r.serviceResolver.GetPodAnnotatedName(ctx, r.operatorPodName, r.operatorPodNamespace) + if err != nil { + return false, fmt.Errorf("failed resolving intents operator identity - %w", err) + } + + if !ok { + r.RecordWarningEventf(intents, ReasonIntentsOperatorIdentityResolveFailed, "failed resolving intents operator identity - service name annotation required") + return false, fmt.Errorf("failed resolving intents operator identity - service name annotation required") + } + + return name == intents.Spec.Service.Name, nil +} + func (r *KafkaACLReconciler) applyAcls(logger *logrus.Entry, intents *otterizev1alpha2.ClientIntents) (ctrl.Result, error) { logger.Info("Applying new ACLs") serverCount, err := r.applyACLs(intents) diff --git a/src/operator/controllers/intents_reconcilers/kafka_acls_test.go b/src/operator/controllers/intents_reconcilers/kafka_acls_test.go index d9810c7c2..464c2bcc5 100644 --- a/src/operator/controllers/intents_reconcilers/kafka_acls_test.go +++ b/src/operator/controllers/intents_reconcilers/kafka_acls_test.go @@ -8,6 +8,7 @@ import ( otterizev1alpha2 "github.com/otterize/intents-operator/src/operator/api/v1alpha2" "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls" kafkaaclsmocks "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls/mocks" + serviceidresolvermocks "github.com/otterize/intents-operator/src/shared/serviceidresolver/mocks" "github.com/otterize/intents-operator/src/shared/testbase" "github.com/samber/lo" "github.com/stretchr/testify/assert" @@ -22,18 +23,23 @@ import ( ) const ( - kafkaServiceName string = "kafka" - kafkaTopicName string = "test-topic" - clientName string = "test-client" - intentsObjectName string = "test-client-intents" - usernameMapping string = "user-name-mapping-test" + kafkaServiceName string = "kafka" + kafkaTopicName string = "test-topic" + clientName string = "test-client" + intentsObjectName string = "test-client-intents" + usernameMapping string = "user-name-mapping-test" + operatorServiceName string = "intents-operator" + operatorPodName string = "operator-pod-name" + operatorPodNamespacePrefix string = "otterize-ns" ) type KafkaACLReconcilerTestSuite struct { testbase.ControllerManagerTestSuiteBase - Reconciler *KafkaACLReconciler - mockKafkaAdmin *kafkaaclsmocks.MockClusterAdmin - recorder *record.FakeRecorder + Reconciler *KafkaACLReconciler + mockKafkaAdmin *kafkaaclsmocks.MockClusterAdmin + recorder *record.FakeRecorder + mockServiceResolver *serviceidresolvermocks.MockServiceResolver + operatorNamespace string } func (s *KafkaACLReconcilerTestSuite) SetupSuite() { @@ -89,6 +95,7 @@ func (s *KafkaACLReconcilerTestSuite) BeforeTest(_, testName string) { controller := gomock.NewController(s.T()) s.mockKafkaAdmin = kafkaaclsmocks.NewMockClusterAdmin(controller) + s.mockServiceResolver = serviceidresolvermocks.NewMockServiceResolver(controller) s.initKafkaIntentsAdmin(true, true) } @@ -96,7 +103,17 @@ func (s *KafkaACLReconcilerTestSuite) BeforeTest(_, testName string) { func (s *KafkaACLReconcilerTestSuite) initKafkaIntentsAdmin(enableAclCreation bool, enforcementEnabledGlobally bool) { kafkaServersStore := s.setupServerStore(kafkaServiceName) newTestKafkaIntentsAdmin := getMockIntentsAdminFactory(s.mockKafkaAdmin, usernameMapping) - s.Reconciler = NewKafkaACLReconciler(s.Mgr.GetClient(), s.TestEnv.Scheme, kafkaServersStore, enableAclCreation, newTestKafkaIntentsAdmin, enforcementEnabledGlobally) + s.Reconciler = NewKafkaACLReconciler( + s.Mgr.GetClient(), + s.TestEnv.Scheme, + kafkaServersStore, + enableAclCreation, + newTestKafkaIntentsAdmin, + enforcementEnabledGlobally, + operatorPodName, + s.operatorNamespace, + s.mockServiceResolver, + ) s.recorder = record.NewFakeRecorder(100) s.Reconciler.InjectRecorder(s.recorder) } @@ -111,6 +128,42 @@ func getMockIntentsAdminFactory(clusterAdmin sarama.ClusterAdmin, usernameMappin } } +func (s *KafkaACLReconcilerTestSuite) TestNoACLCreatedForIntentsOperator() { + s.initOperatorNamespace() + + intentsName := "intents-operator-calls-to-kafka" + operatorIntents := []otterizev1alpha2.Intent{{ + Name: kafkaServiceName, + Type: otterizev1alpha2.IntentTypeKafka, + Topics: []otterizev1alpha2.KafkaTopic{{ + Name: "*", + Operations: []otterizev1alpha2.KafkaOperation{ + otterizev1alpha2.KafkaOperationAlter, + otterizev1alpha2.KafkaOperationDescribe, + }, + }}, + }} + + _, err := s.AddIntentsInNamespace(intentsName, operatorServiceName, s.operatorNamespace, operatorIntents) + s.Require().NoError(err) + + s.mockServiceResolver.EXPECT().GetPodAnnotatedName(gomock.Any(), operatorPodName, s.operatorNamespace).Return(operatorServiceName, true, nil) + + // Shouldn't creat ACLs for the operator intents therefore not expecting any call to mockKafkaAdmin + + operatorNamespacedName := types.NamespacedName{ + Name: intentsName, + Namespace: s.operatorNamespace, + } + s.reconcile(operatorNamespacedName) +} + +func (s *KafkaACLReconcilerTestSuite) initOperatorNamespace() { + s.operatorNamespace = operatorPodNamespacePrefix + "-" + s.TestNamespace + s.CreateNamespace(s.operatorNamespace) + s.Reconciler.operatorPodNamespace = s.operatorNamespace +} + func (s *KafkaACLReconcilerTestSuite) TestKafkaACLGetCreatedAndUpdatedBasedOnIntents() { // ACL Objects for produce-write and consume-read resource := sarama.Resource{ diff --git a/src/operator/controllers/kafkaserverconfig_controller.go b/src/operator/controllers/kafkaserverconfig_controller.go index 5f70a9aea..da7265b76 100644 --- a/src/operator/controllers/kafkaserverconfig_controller.go +++ b/src/operator/controllers/kafkaserverconfig_controller.go @@ -61,6 +61,7 @@ type KafkaServerConfigReconciler struct { operatorPodNamespace string otterizeClient otterizecloud.CloudClient injectablerecorder.InjectableRecorder + serviceResolver serviceidresolver.ServiceResolver } func NewKafkaServerConfigReconciler( @@ -70,6 +71,7 @@ func NewKafkaServerConfigReconciler( operatorPodName string, operatorPodNameSpace string, cloudClient otterizecloud.CloudClient, + serviceResolver serviceidresolver.ServiceResolver, ) *KafkaServerConfigReconciler { return &KafkaServerConfigReconciler{ Client: client, @@ -78,6 +80,7 @@ func NewKafkaServerConfigReconciler( operatorPodName: operatorPodName, operatorPodNamespace: operatorPodNameSpace, otterizeClient: cloudClient, + serviceResolver: serviceResolver, } } @@ -182,13 +185,11 @@ func (r *KafkaServerConfigReconciler) ensureFinalizerRegistered( } func (r *KafkaServerConfigReconciler) createIntentsFromOperatorToKafkaServer(ctx context.Context, config *otterizev1alpha2.KafkaServerConfig) error { - operatorPod := &v1.Pod{} - err := r.Get(ctx, types.NamespacedName{Name: r.operatorPodName, Namespace: r.operatorPodNamespace}, operatorPod) + annotatedServiceName, ok, err := r.serviceResolver.GetPodAnnotatedName(ctx, r.operatorPodName, r.operatorPodNamespace) if err != nil { return err } - annotatedServiceName, ok := serviceidresolver.ResolvePodToServiceIdentityUsingAnnotationOnly(operatorPod) if !ok { r.RecordWarningEventf(config, ReasonIntentsOperatorIdentityResolveFailed, "failed resolving intents operator identity - service name annotation required") return fmt.Errorf("failed resolving intents operator identity - service name annotation required") @@ -197,7 +198,7 @@ func (r *KafkaServerConfigReconciler) createIntentsFromOperatorToKafkaServer(ctx newIntents := &otterizev1alpha2.ClientIntents{ ObjectMeta: metav1.ObjectMeta{ Name: formatIntentsName(config), - Namespace: operatorPod.Namespace, + Namespace: r.operatorPodNamespace, }, Spec: &otterizev1alpha2.IntentsSpec{ Service: otterizev1alpha2.Service{ diff --git a/src/operator/controllers/kafkaserverconfig_controller_test.go b/src/operator/controllers/kafkaserverconfig_controller_test.go index df070e7df..a967e1d46 100644 --- a/src/operator/controllers/kafkaserverconfig_controller_test.go +++ b/src/operator/controllers/kafkaserverconfig_controller_test.go @@ -3,12 +3,14 @@ package controllers import ( "context" "errors" + "fmt" "github.com/golang/mock/gomock" otterizev1alpha2 "github.com/otterize/intents-operator/src/operator/api/v1alpha2" "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls" kafkaaclsmocks "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls/mocks" "github.com/otterize/intents-operator/src/shared/otterizecloud/graphqlclient" "github.com/otterize/intents-operator/src/shared/otterizecloud/mocks" + serviceidresolvermocks "github.com/otterize/intents-operator/src/shared/serviceidresolver/mocks" "github.com/otterize/intents-operator/src/shared/testbase" "github.com/stretchr/testify/suite" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -22,19 +24,22 @@ import ( ) const ( - kafkaServiceName string = "kafka" - kafkaTopicName string = "test-topic" - clientName string = "test-client" - intentsObjectName string = "test-client-intents" - usernameMapping string = "user-name-mapping-test" - operatorPodName string = "operator-pod-name" + kafkaServiceName string = "kafka" + kafkaTopicName string = "test-topic" + clientName string = "test-client" + intentsObjectName string = "test-client-intents" + usernameMapping string = "user-name-mapping-test" + operatorPodName string = "operator-pod-name" + operatorPodNamespacePrefix string = "otterize" ) type KafkaServerConfigReconcilerTestSuite struct { testbase.ControllerManagerTestSuiteBase - reconciler *KafkaServerConfigReconciler - mockCloudClient *otterizecloudmocks.MockCloudClient - mockIntentsAdmin *kafkaaclsmocks.MockKafkaIntentsAdmin + reconciler *KafkaServerConfigReconciler + mockCloudClient *otterizecloudmocks.MockCloudClient + mockIntentsAdmin *kafkaaclsmocks.MockKafkaIntentsAdmin + mockServiceResolver *serviceidresolvermocks.MockServiceResolver + operatorNamespace string } func (s *KafkaServerConfigReconcilerTestSuite) SetupSuite() { @@ -93,12 +98,28 @@ func (s *KafkaServerConfigReconcilerTestSuite) BeforeTest(_, testName string) { kafkaServersStore := s.setupServerStore(kafkaServiceName, controller) s.mockCloudClient = otterizecloudmocks.NewMockCloudClient(controller) - s.reconciler = NewKafkaServerConfigReconciler(s.Mgr.GetClient(), s.TestEnv.Scheme, kafkaServersStore, operatorPodName, s.TestNamespace, s.mockCloudClient) + s.mockServiceResolver = serviceidresolvermocks.NewMockServiceResolver(controller) + + s.reconciler = NewKafkaServerConfigReconciler( + s.Mgr.GetClient(), + s.TestEnv.Scheme, + kafkaServersStore, + operatorPodName, + s.operatorNamespace, + s.mockCloudClient, + s.mockServiceResolver, + ) recorder := s.Mgr.GetEventRecorderFor("intents-operator") s.reconciler.InjectRecorder(recorder) } +func (s *KafkaServerConfigReconcilerTestSuite) initOperatorNamespace() { + s.operatorNamespace = operatorPodNamespacePrefix + "-" + s.TestNamespace + s.CreateNamespace(s.operatorNamespace) + s.reconciler.operatorPodNamespace = s.operatorNamespace +} + func getMockIntentsAdminFactory(mockIntentsAdmin *kafkaaclsmocks.MockKafkaIntentsAdmin) kafkaacls.IntentsAdminFactoryFunction { return func(kafkaServer otterizev1alpha2.KafkaServerConfig, _ otterizev1alpha2.TLSSource, enableKafkaACLCreation bool, enforcementEnabledGlobally bool) (kafkaacls.KafkaIntentsAdmin, error) { return mockIntentsAdmin, nil @@ -239,6 +260,72 @@ func (s *KafkaServerConfigReconcilerTestSuite) TestKafkaServerConfigDelete() { Namespace: s.TestNamespace, }) } + +func (s *KafkaServerConfigReconcilerTestSuite) TestIntentsGeneratedForOperator() { + s.initOperatorNamespace() + + operatorServiceName := "intents-operator-service" + s.mockServiceResolver.EXPECT().GetPodAnnotatedName(gomock.Any(), operatorPodName, s.operatorNamespace).Return(operatorServiceName, true, nil) + + kafkaServerConfig := s.generateKafkaServerConfig() + kafkaServerConfig.SetNamespace(s.TestNamespace) + kafkaServerConfig.Spec.NoAutoCreateIntentsForOperator = false + s.AddKafkaServerConfig(&kafkaServerConfig) + + // Set go mock expectations + expectedConfigs := s.getExpectedKafkaServerConfigs(kafkaServerConfig) + s.mockCloudClient.EXPECT().ReportKafkaServerConfig(gomock.Any(), s.TestNamespace, gomock.Eq(expectedConfigs)).Return(nil) + s.mockIntentsAdmin.EXPECT().ApplyServerTopicsConf(kafkaServerConfig.Spec.Topics).Return(nil) + s.mockIntentsAdmin.EXPECT().Close() + + s.reconcile(types.NamespacedName{ + Name: kafkaServiceName, + Namespace: s.TestNamespace, + }) + + operatorIntentsObjectName := fmt.Sprintf("operator-to-kafkaserverconfig-kafka-namespace-%s", s.TestNamespace) + intents := otterizev1alpha2.ClientIntents{ + ObjectMeta: metav1.ObjectMeta{ + Name: operatorIntentsObjectName, + Namespace: s.operatorNamespace, + }, + Spec: &otterizev1alpha2.IntentsSpec{ + Service: otterizev1alpha2.Service{ + Name: operatorServiceName, + }, + Calls: []otterizev1alpha2.Intent{ + { + Name: fmt.Sprintf("%s.%s", kafkaServiceName, s.TestNamespace), + Type: otterizev1alpha2.IntentTypeKafka, + Topics: []otterizev1alpha2.KafkaTopic{{ + Name: "*", + Operations: []otterizev1alpha2.KafkaOperation{ + otterizev1alpha2.KafkaOperationDescribe, + otterizev1alpha2.KafkaOperationAlter, + }, + }}, + }, + }, + }, + } + + var actualIntents otterizev1alpha2.ClientIntents + err := s.Mgr.GetClient().Get(context.Background(), types.NamespacedName{ + Name: operatorIntentsObjectName, + Namespace: s.operatorNamespace, + }, &actualIntents) + s.Require().NoError(err) + + s.Require().Equal(intents.ObjectMeta.Name, actualIntents.ObjectMeta.Name) + s.Require().Equal(intents.ObjectMeta.Namespace, actualIntents.ObjectMeta.Namespace) + s.Require().Equal(intents.Spec.Service.Name, actualIntents.Spec.Service.Name) + s.Require().Equal(intents.Spec.Calls[0].Name, actualIntents.Spec.Calls[0].Name) + s.Require().Equal(intents.Spec.Calls[0].Type, actualIntents.Spec.Calls[0].Type) + s.Require().Equal(intents.Spec.Calls[0].Topics[0].Name, actualIntents.Spec.Calls[0].Topics[0].Name) + s.Require().Equal(intents.Spec.Calls[0].Topics[0].Operations[0], actualIntents.Spec.Calls[0].Topics[0].Operations[0]) + s.Require().Equal(intents.Spec.Calls[0].Topics[0].Operations[1], actualIntents.Spec.Calls[0].Topics[0].Operations[1]) +} + func TestKafkaACLReconcilerTestSuite(t *testing.T) { suite.Run(t, new(KafkaServerConfigReconcilerTestSuite)) } diff --git a/src/operator/main.go b/src/operator/main.go index 4b57c714a..5b3dd33c9 100644 --- a/src/operator/main.go +++ b/src/operator/main.go @@ -30,6 +30,7 @@ import ( "github.com/otterize/intents-operator/src/shared/operatorconfig" "github.com/otterize/intents-operator/src/shared/otterizecloud/graphqlclient" "github.com/otterize/intents-operator/src/shared/otterizecloud/otterizecloudclient" + "github.com/otterize/intents-operator/src/shared/serviceidresolver" "github.com/otterize/intents-operator/src/shared/telemetries/telemetriesgql" "github.com/otterize/intents-operator/src/shared/telemetries/telemetrysender" "github.com/sirupsen/logrus" @@ -195,6 +196,8 @@ func main() { watchedNamespaces, enforcementConfig, otterizeCloudClient, + podName, + podNamespace, ) if err = intentsReconciler.InitIntentsServerIndices(mgr); err != nil { @@ -234,7 +237,15 @@ func main() { } } - kafkaServerConfigReconciler := controllers.NewKafkaServerConfigReconciler(mgr.GetClient(), mgr.GetScheme(), kafkaServersStore, podName, podNamespace, otterizeCloudClient) + kafkaServerConfigReconciler := controllers.NewKafkaServerConfigReconciler( + mgr.GetClient(), + mgr.GetScheme(), + kafkaServersStore, + podName, + podNamespace, + otterizeCloudClient, + serviceidresolver.NewResolver(mgr.GetClient()), + ) if err = kafkaServerConfigReconciler.SetupWithManager(mgr); err != nil { logrus.WithError(err).Fatal("unable to create controller", "controller", "KafkaServerConfig") diff --git a/src/shared/otterizecloud/graphqlclient/schema.graphql b/src/shared/otterizecloud/graphqlclient/schema.graphql index a48aad296..6e38e7e58 100644 --- a/src/shared/otterizecloud/graphqlclient/schema.graphql +++ b/src/shared/otterizecloud/graphqlclient/schema.graphql @@ -160,6 +160,7 @@ type EdgeAccessStatus { useKafkaPoliciesInAccessGraphStates: Boolean! verdict: EdgeAccessStatusVerdict! reason: EdgeAccessStatusReason! + reasons: [EdgeAccessStatusReason!]! } enum EdgeAccessStatusReason { @@ -175,6 +176,9 @@ enum EdgeAccessStatusReason { BLOCKED_BY_APPLIED_INTENTS_KAFKA_RESOURCE_MISMATCH BLOCKED_BY_KAFKA_ENFORCEMENT_CONFIG_MISSING_APPLIED_INTENTS BLOCKED_BY_DEFAULT_DENY + SHARED_SERVICE_ACCOUNT + CLIENT_ISTIO_SIDECAR_MISSING + SERVER_ISTIO_SIDECAR_MISSING INTENTS_OPERATOR_NOT_ENFORCING INTENTS_OPERATOR_NOT_ENFORCING_MISSING_APPLIED_INTENT INTENTS_OPERATOR_NOT_ENFORCING_KAFKA_INTENTS_NOT_REQUIRED_FOR_TOPIC @@ -741,12 +745,26 @@ type ServerProtectionStatus { reason: ServerProtectionStatusReason! } +type ServerProtectionStatuses { + networkPolicies: ServerProtectionStatus! + kafkaACLs: ServerProtectionStatus! + istioPolicies: ServerProtectionStatus! +} + enum ServerProtectionStatusReason { INTENTS_OPERATOR_NEVER_CONNECTED INTENTS_OPERATOR_NOT_ENFORCING SERVER_HAS_NO_NETWORK_POLICY + SERVER_HAS_NO_ISTIO_POLICY + SERVER_HAS_NO_ISTIO_SIDECAR PROTECTED_BY_DEFAULT_DENY PROTECTED_BY_SERVER_NETWORK_POLICY + PROTECTED_BY_SERVER_ISTIO_POLICY + PROTECTED_BY_KAFKA_IDENTITY_REQUIRED_NO_INTENTS_REQUIRED + PROTECTED_BY_KAFKA_INTENTS_REQUIRED + SERVER_HAS_KAFKASERVERCONFIG_NO_ENFORCEMENT + SERVER_HAS_NO_KAFKA_SERVER_CONFIG + IGNORED_IN_CALCULATION } enum ServerProtectionStatusVerdict { @@ -764,6 +782,7 @@ type Service { environment: Environment! """If service is Kafka, its KafkaServerConfig.""" kafkaServerConfig: KafkaServerConfig + serviceAccount: String } type ServiceAccessGraph { @@ -778,6 +797,7 @@ type ServiceAccessStatus { useKafkaACLsInAccessGraphStates: Boolean! useIstioPoliciesInAccessGraphStates: Boolean! protectionStatus: ServerProtectionStatus! + protectionStatuses: ServerProtectionStatuses! blockingStatus: ServerBlockingStatus! hasAppliedIntents: Boolean! } diff --git a/src/shared/serviceidresolver/generate.go b/src/shared/serviceidresolver/generate.go index 26a40346a..e40a78d85 100644 --- a/src/shared/serviceidresolver/generate.go +++ b/src/shared/serviceidresolver/generate.go @@ -1,3 +1,4 @@ package serviceidresolver //go:generate go run github.com/golang/mock/mockgen@v1.6.0 -destination=./mocks/mock_k8s_client.go -package=serviceidresolvermocks sigs.k8s.io/controller-runtime/pkg/client Client +//go:generate go run github.com/golang/mock/mockgen@v1.6.0 -destination=./mocks/mock_service_resolver.go -package=serviceidresolvermocks -source=serviceidresolver.go ServiceResolver diff --git a/src/shared/serviceidresolver/mocks/mock_service_resolver.go b/src/shared/serviceidresolver/mocks/mock_service_resolver.go new file mode 100644 index 000000000..0b0d80c7c --- /dev/null +++ b/src/shared/serviceidresolver/mocks/mock_service_resolver.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: serviceidresolver.go + +// Package serviceidresolvermocks is a generated GoMock package. +package serviceidresolvermocks + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockServiceResolver is a mock of ServiceResolver interface. +type MockServiceResolver struct { + ctrl *gomock.Controller + recorder *MockServiceResolverMockRecorder +} + +// MockServiceResolverMockRecorder is the mock recorder for MockServiceResolver. +type MockServiceResolverMockRecorder struct { + mock *MockServiceResolver +} + +// NewMockServiceResolver creates a new mock instance. +func NewMockServiceResolver(ctrl *gomock.Controller) *MockServiceResolver { + mock := &MockServiceResolver{ctrl: ctrl} + mock.recorder = &MockServiceResolverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockServiceResolver) EXPECT() *MockServiceResolverMockRecorder { + return m.recorder +} + +// GetPodAnnotatedName mocks base method. +func (m *MockServiceResolver) GetPodAnnotatedName(ctx context.Context, podName, podNamespace string) (string, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodAnnotatedName", ctx, podName, podNamespace) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetPodAnnotatedName indicates an expected call of GetPodAnnotatedName. +func (mr *MockServiceResolverMockRecorder) GetPodAnnotatedName(ctx, podName, podNamespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAnnotatedName", reflect.TypeOf((*MockServiceResolver)(nil).GetPodAnnotatedName), ctx, podName, podNamespace) +} diff --git a/src/shared/serviceidresolver/serviceidresolver.go b/src/shared/serviceidresolver/serviceidresolver.go index 21507fef2..d5b16bd9b 100644 --- a/src/shared/serviceidresolver/serviceidresolver.go +++ b/src/shared/serviceidresolver/serviceidresolver.go @@ -19,6 +19,10 @@ const ( var PodNotFound = errors.New("pod not found") +type ServiceResolver interface { + GetPodAnnotatedName(ctx context.Context, podName string, podNamespace string) (string, bool, error) +} + type Resolver struct { client client.Client } @@ -32,6 +36,17 @@ func ResolvePodToServiceIdentityUsingAnnotationOnly(pod *corev1.Pod) (string, bo return annotation, ok } +func (r *Resolver) GetPodAnnotatedName(ctx context.Context, podName string, podNamespace string) (string, bool, error) { + var pod corev1.Pod + err := r.client.Get(ctx, types.NamespacedName{Name: podName, Namespace: podNamespace}, &pod) + if err != nil { + return "", false, err + } + + annotation, ok := ResolvePodToServiceIdentityUsingAnnotationOnly(&pod) + return annotation, ok, nil +} + type ServiceIdentity struct { Name string // OwnerObject used to resolve the service name. May be nil if service name was resolved using annotation. diff --git a/src/shared/serviceidresolver/serviceidresolver_test.go b/src/shared/serviceidresolver/serviceidresolver_test.go index 0a3f8a473..6de93f89f 100644 --- a/src/shared/serviceidresolver/serviceidresolver_test.go +++ b/src/shared/serviceidresolver/serviceidresolver_test.go @@ -2,12 +2,14 @@ package serviceidresolver import ( "context" + "errors" "github.com/golang/mock/gomock" "github.com/otterize/intents-operator/src/operator/api/v1alpha2" serviceidresolvermocks "github.com/otterize/intents-operator/src/shared/serviceidresolver/mocks" "github.com/stretchr/testify/suite" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "testing" ) @@ -87,6 +89,46 @@ func (s *ServiceIdResolverTestSuite) TestResolveClientIntentToPod_PodDoesntExist s.Require().Equal(corev1.Pod{}, pod) } +func (s *ServiceIdResolverTestSuite) TestGetPodAnnotatedName_PodExists() { + podName := "coolpod" + podNamespace := "coolnamespace" + serviceName := "coolservice" + + s.Client.EXPECT().Get(gomock.Any(), types.NamespacedName{Name: podName, Namespace: podNamespace}, gomock.AssignableToTypeOf(&corev1.Pod{})).Do( + func(_ context.Context, _ types.NamespacedName, pod *corev1.Pod, _ ...any) { + pod.Annotations = map[string]string{ServiceNameAnnotation: serviceName} + }).Return(nil) + + name, found, err := s.Resolver.GetPodAnnotatedName(context.Background(), podName, podNamespace) + s.Require().NoError(err) + s.Require().True(found) + s.Require().Equal(serviceName, name) +} + +func (s *ServiceIdResolverTestSuite) TestGetPodAnnotatedName_PodMissingAnnotation() { + podName := "coolpod" + podNamespace := "coolnamespace" + + s.Client.EXPECT().Get(gomock.Any(), types.NamespacedName{Name: podName, Namespace: podNamespace}, gomock.AssignableToTypeOf(&corev1.Pod{})).Return(nil) + + name, found, err := s.Resolver.GetPodAnnotatedName(context.Background(), podName, podNamespace) + s.Require().NoError(err) + s.Require().False(found) + s.Require().Equal("", name) +} + +func (s *ServiceIdResolverTestSuite) TestGetPodAnnotatedName_PodMCallFailed() { + podName := "coolpod" + podNamespace := "coolnamespace" + + s.Client.EXPECT().Get(gomock.Any(), types.NamespacedName{Name: podName, Namespace: podNamespace}, gomock.AssignableToTypeOf(&corev1.Pod{})).Return(errors.New("generic error")) + + name, found, err := s.Resolver.GetPodAnnotatedName(context.Background(), podName, podNamespace) + s.Require().Error(err) + s.Require().False(found) + s.Require().Equal("", name) +} + func TestServiceIdResolverTestSuite(t *testing.T) { suite.Run(t, new(ServiceIdResolverTestSuite)) } diff --git a/src/shared/telemetries/telemetriesgql/schema.graphql b/src/shared/telemetries/telemetriesgql/schema.graphql index 80c16eeed..ea64905ea 100644 --- a/src/shared/telemetries/telemetriesgql/schema.graphql +++ b/src/shared/telemetries/telemetriesgql/schema.graphql @@ -21,6 +21,23 @@ directive @specifiedBy( """The `Boolean` scalar type represents `true` or `false`.""" scalar Boolean +input CLICommand { + noun: String! + verb: String! + modifiers: [String!] +} + +input CLIIdentifier { + version: String! + contextId: String! + cloudClientId: String +} + +input CLITelemetry { + identifier: CLIIdentifier! + command: CLICommand! +} + input Component { componentType: ComponentType! componentInstanceId: ID! @@ -70,6 +87,9 @@ type Mutation { sendTelemetries( telemetries: [TelemetryInput!]! ): Boolean! + sendCLITelemetries( + telemetries: [CLITelemetry!]! + ): Boolean! } type Query { diff --git a/src/shared/testbase/testsuitebase.go b/src/shared/testbase/testsuitebase.go index 4e2b666b3..2590fd2bf 100644 --- a/src/shared/testbase/testsuitebase.go +++ b/src/shared/testbase/testsuitebase.go @@ -63,11 +63,7 @@ func (s *ControllerManagerTestSuiteBase) BeforeTest(_, testName string) { }() s.TestNamespace = strings.ToLower(fmt.Sprintf("%s-%s", testName, time.Now().Format("20060102150405"))) - testNamespaceObj := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: s.TestNamespace}, - } - _, err := s.K8sDirectClient.CoreV1().Namespaces().Create(context.Background(), testNamespaceObj, metav1.CreateOptions{}) - s.Require().NoError(err) + s.CreateNamespace(s.TestNamespace) } func (s *ControllerManagerTestSuiteBase) TearDownTest() { @@ -118,6 +114,14 @@ func (s *ControllerManagerTestSuiteBase) WaitForDeletionToBeMarked(obj client.Ob })) } +func (s *ControllerManagerTestSuiteBase) CreateNamespace(namespace string) { + testNamespaceObj := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: namespace}, + } + _, err := s.K8sDirectClient.CoreV1().Namespaces().Create(context.Background(), testNamespaceObj, metav1.CreateOptions{}) + s.Require().NoError(err) +} + func (s *ControllerManagerTestSuiteBase) AddPod(name string, podIp string, labels map[string]string, annotations map[string]string) { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: s.TestNamespace, Labels: labels, Annotations: annotations}, @@ -408,9 +412,17 @@ func (s *ControllerManagerTestSuiteBase) AddIntents( objName, clientName string, callList []otterizev1alpha2.Intent) (*otterizev1alpha2.ClientIntents, error) { + return s.AddIntentsInNamespace(objName, clientName, s.TestNamespace, callList) +} + +func (s *ControllerManagerTestSuiteBase) AddIntentsInNamespace( + objName, + clientName string, + namespace string, + callList []otterizev1alpha2.Intent) (*otterizev1alpha2.ClientIntents, error) { intents := &otterizev1alpha2.ClientIntents{ - ObjectMeta: metav1.ObjectMeta{Name: objName, Namespace: s.TestNamespace}, + ObjectMeta: metav1.ObjectMeta{Name: objName, Namespace: namespace}, Spec: &otterizev1alpha2.IntentsSpec{ Service: otterizev1alpha2.Service{Name: clientName}, Calls: callList,