Skip to content

Commit

Permalink
Skip ACL creation for intents operator (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
NetanelBollag committed May 21, 2023
1 parent 297aa53 commit 2c779f1
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 42 deletions.
7 changes: 5 additions & 2 deletions src/operator/controllers/intents_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/otterize/intents-operator/src/operator/controllers/intents_reconcilers/otterizecloud"
"github.com/otterize/intents-operator/src/operator/controllers/kafkaacls"
"github.com/otterize/intents-operator/src/shared/reconcilergroup"
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -51,13 +52,15 @@ func NewIntentsReconciler(
endpointsReconciler *external_traffic.EndpointsReconciler,
restrictToNamespaces []string,
enforcementConfig EnforcementConfig,
otterizeClient otterizecloud.CloudClient) *IntentsReconciler {
otterizeClient otterizecloud.CloudClient,
operatorPodName string,
operatorPodNamespace string) *IntentsReconciler {
intentsReconciler := &IntentsReconciler{
group: reconcilergroup.NewGroup("intents-reconciler", client, scheme,
intents_reconcilers.NewCRDValidatorReconciler(client, scheme),
intents_reconcilers.NewPodLabelReconciler(client, scheme),
intents_reconcilers.NewNetworkPolicyReconciler(client, scheme, endpointsReconciler, restrictToNamespaces, enforcementConfig.EnableNetworkPolicy, enforcementConfig.EnforcementEnabledGlobally),
intents_reconcilers.NewKafkaACLReconciler(client, scheme, kafkaServerStore, enforcementConfig.EnableKafkaACL, kafkaacls.NewKafkaIntentsAdmin, enforcementConfig.EnforcementEnabledGlobally),
intents_reconcilers.NewKafkaACLReconciler(client, scheme, kafkaServerStore, enforcementConfig.EnableKafkaACL, kafkaacls.NewKafkaIntentsAdmin, enforcementConfig.EnforcementEnabledGlobally, operatorPodName, operatorPodNamespace, serviceidresolver.NewResolver(client)),
intents_reconcilers.NewIstioPolicyReconciler(client, scheme, restrictToNamespaces, enforcementConfig.EnableIstioPolicy, enforcementConfig.EnforcementEnabledGlobally),
)}

Expand Down
64 changes: 55 additions & 9 deletions src/operator/controllers/intents_reconcilers/kafka_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
otterizev1alpha2 "github.com/otterize/intents-operator/src/operator/api/v1alpha2"
"github.com/otterize/intents-operator/src/operator/controllers/kafkaacls"
"github.com/otterize/intents-operator/src/shared/injectablerecorder"
"github.com/otterize/intents-operator/src/shared/serviceidresolver"
"github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -16,14 +17,15 @@ import (
)

const (
KafkaACLsFinalizerName = "intents.otterize.com/kafka-finalizer"
ReasonCouldNotConnectToKafkaServer = "CouldNotConnectToKafkaServer"
ReasonCouldNotApplyIntentsOnKafkaServer = "CouldNotApplyIntentsOnKafkaServer"
ReasonKafkaACLCreationDisabled = "KafkaACLCreationDisabled"
ReasonKafkaServerNotConfigured = "KafkaServerNotConfigured"
ReasonRemovingKafkaACLsFailed = "RemovingKafkaACLsFailed"
ReasonApplyingKafkaACLsFailed = "ApplyingKafkaACLsFailed"
ReasonAppliedKafkaACLs = "AppliedKafkaACLs"
KafkaACLsFinalizerName = "intents.otterize.com/kafka-finalizer"
ReasonCouldNotConnectToKafkaServer = "CouldNotConnectToKafkaServer"
ReasonCouldNotApplyIntentsOnKafkaServer = "CouldNotApplyIntentsOnKafkaServer"
ReasonKafkaACLCreationDisabled = "KafkaACLCreationDisabled"
ReasonKafkaServerNotConfigured = "KafkaServerNotConfigured"
ReasonRemovingKafkaACLsFailed = "RemovingKafkaACLsFailed"
ReasonApplyingKafkaACLsFailed = "ApplyingKafkaACLsFailed"
ReasonAppliedKafkaACLs = "AppliedKafkaACLs"
ReasonIntentsOperatorIdentityResolveFailed = "IntentsOperatorIdentityResolveFailed"
)

type KafkaACLReconciler struct {
Expand All @@ -33,17 +35,33 @@ type KafkaACLReconciler struct {
enforcementEnabledGlobally bool
enableKafkaACLCreation bool
getNewKafkaIntentsAdmin kafkaacls.IntentsAdminFactoryFunction
operatorPodName string
operatorPodNamespace string
serviceResolver serviceidresolver.ServiceResolver
injectablerecorder.InjectableRecorder
}

func NewKafkaACLReconciler(client client.Client, scheme *runtime.Scheme, serversStore kafkaacls.ServersStore, enableKafkaACLCreation bool, factoryFunc kafkaacls.IntentsAdminFactoryFunction, enforcementEnabledGlobally bool) *KafkaACLReconciler {
func NewKafkaACLReconciler(
client client.Client,
scheme *runtime.Scheme,
serversStore kafkaacls.ServersStore,
enableKafkaACLCreation bool,
factoryFunc kafkaacls.IntentsAdminFactoryFunction,
enforcementEnabledGlobally bool,
operatorPodName string,
operatorPodNamespace string,
serviceResolver serviceidresolver.ServiceResolver,
) *KafkaACLReconciler {
return &KafkaACLReconciler{
client: client,
scheme: scheme,
KafkaServersStore: serversStore,
enableKafkaACLCreation: enableKafkaACLCreation,
getNewKafkaIntentsAdmin: factoryFunc,
enforcementEnabledGlobally: enforcementEnabledGlobally,
operatorPodName: operatorPodName,
operatorPodNamespace: operatorPodNamespace,
serviceResolver: serviceResolver,
}
}

Expand Down Expand Up @@ -147,6 +165,16 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}

clientIsOperator, err := r.isIntentsForTheIntentsOperator(ctx, intents)
if err != nil {
return ctrl.Result{}, err
}

if clientIsOperator {
logger.Info("Skipping ACLs creation for the intents operator")
return ctrl.Result{}, nil
}

var result ctrl.Result
result, err = r.applyAcls(logger, intents)
if err != nil {
Expand All @@ -157,6 +185,24 @@ func (r *KafkaACLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

}

func (r *KafkaACLReconciler) isIntentsForTheIntentsOperator(ctx context.Context, intents *otterizev1alpha2.ClientIntents) (bool, error) {
if r.operatorPodNamespace != intents.Namespace {
return false, nil
}

name, ok, err := r.serviceResolver.GetPodAnnotatedName(ctx, r.operatorPodName, r.operatorPodNamespace)
if err != nil {
return false, fmt.Errorf("failed resolving intents operator identity - %w", err)
}

if !ok {
r.RecordWarningEventf(intents, ReasonIntentsOperatorIdentityResolveFailed, "failed resolving intents operator identity - service name annotation required")
return false, fmt.Errorf("failed resolving intents operator identity - service name annotation required")
}

return name == intents.Spec.Service.Name, nil
}

func (r *KafkaACLReconciler) applyAcls(logger *logrus.Entry, intents *otterizev1alpha2.ClientIntents) (ctrl.Result, error) {
logger.Info("Applying new ACLs")
serverCount, err := r.applyACLs(intents)
Expand Down
71 changes: 62 additions & 9 deletions src/operator/controllers/intents_reconcilers/kafka_acls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
otterizev1alpha2 "github.com/otterize/intents-operator/src/operator/api/v1alpha2"
"github.com/otterize/intents-operator/src/operator/controllers/kafkaacls"
kafkaaclsmocks "github.com/otterize/intents-operator/src/operator/controllers/kafkaacls/mocks"
serviceidresolvermocks "github.com/otterize/intents-operator/src/shared/serviceidresolver/mocks"
"github.com/otterize/intents-operator/src/shared/testbase"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
Expand All @@ -22,18 +23,23 @@ import (
)

const (
kafkaServiceName string = "kafka"
kafkaTopicName string = "test-topic"
clientName string = "test-client"
intentsObjectName string = "test-client-intents"
usernameMapping string = "user-name-mapping-test"
kafkaServiceName string = "kafka"
kafkaTopicName string = "test-topic"
clientName string = "test-client"
intentsObjectName string = "test-client-intents"
usernameMapping string = "user-name-mapping-test"
operatorServiceName string = "intents-operator"
operatorPodName string = "operator-pod-name"
operatorPodNamespacePrefix string = "otterize-ns"
)

type KafkaACLReconcilerTestSuite struct {
testbase.ControllerManagerTestSuiteBase
Reconciler *KafkaACLReconciler
mockKafkaAdmin *kafkaaclsmocks.MockClusterAdmin
recorder *record.FakeRecorder
Reconciler *KafkaACLReconciler
mockKafkaAdmin *kafkaaclsmocks.MockClusterAdmin
recorder *record.FakeRecorder
mockServiceResolver *serviceidresolvermocks.MockServiceResolver
operatorNamespace string
}

func (s *KafkaACLReconcilerTestSuite) SetupSuite() {
Expand Down Expand Up @@ -89,14 +95,25 @@ func (s *KafkaACLReconcilerTestSuite) BeforeTest(_, testName string) {

controller := gomock.NewController(s.T())
s.mockKafkaAdmin = kafkaaclsmocks.NewMockClusterAdmin(controller)
s.mockServiceResolver = serviceidresolvermocks.NewMockServiceResolver(controller)

s.initKafkaIntentsAdmin(true, true)
}

func (s *KafkaACLReconcilerTestSuite) initKafkaIntentsAdmin(enableAclCreation bool, enforcementEnabledGlobally bool) {
kafkaServersStore := s.setupServerStore(kafkaServiceName)
newTestKafkaIntentsAdmin := getMockIntentsAdminFactory(s.mockKafkaAdmin, usernameMapping)
s.Reconciler = NewKafkaACLReconciler(s.Mgr.GetClient(), s.TestEnv.Scheme, kafkaServersStore, enableAclCreation, newTestKafkaIntentsAdmin, enforcementEnabledGlobally)
s.Reconciler = NewKafkaACLReconciler(
s.Mgr.GetClient(),
s.TestEnv.Scheme,
kafkaServersStore,
enableAclCreation,
newTestKafkaIntentsAdmin,
enforcementEnabledGlobally,
operatorPodName,
s.operatorNamespace,
s.mockServiceResolver,
)
s.recorder = record.NewFakeRecorder(100)
s.Reconciler.InjectRecorder(s.recorder)
}
Expand All @@ -111,6 +128,42 @@ func getMockIntentsAdminFactory(clusterAdmin sarama.ClusterAdmin, usernameMappin
}
}

func (s *KafkaACLReconcilerTestSuite) TestNoACLCreatedForIntentsOperator() {
s.initOperatorNamespace()

intentsName := "intents-operator-calls-to-kafka"
operatorIntents := []otterizev1alpha2.Intent{{
Name: kafkaServiceName,
Type: otterizev1alpha2.IntentTypeKafka,
Topics: []otterizev1alpha2.KafkaTopic{{
Name: "*",
Operations: []otterizev1alpha2.KafkaOperation{
otterizev1alpha2.KafkaOperationAlter,
otterizev1alpha2.KafkaOperationDescribe,
},
}},
}}

_, err := s.AddIntentsInNamespace(intentsName, operatorServiceName, s.operatorNamespace, operatorIntents)
s.Require().NoError(err)

s.mockServiceResolver.EXPECT().GetPodAnnotatedName(gomock.Any(), operatorPodName, s.operatorNamespace).Return(operatorServiceName, true, nil)

// Shouldn't creat ACLs for the operator intents therefore not expecting any call to mockKafkaAdmin

operatorNamespacedName := types.NamespacedName{
Name: intentsName,
Namespace: s.operatorNamespace,
}
s.reconcile(operatorNamespacedName)
}

func (s *KafkaACLReconcilerTestSuite) initOperatorNamespace() {
s.operatorNamespace = operatorPodNamespacePrefix + "-" + s.TestNamespace
s.CreateNamespace(s.operatorNamespace)
s.Reconciler.operatorPodNamespace = s.operatorNamespace
}

func (s *KafkaACLReconcilerTestSuite) TestKafkaACLGetCreatedAndUpdatedBasedOnIntents() {
// ACL Objects for produce-write and consume-read
resource := sarama.Resource{
Expand Down
9 changes: 5 additions & 4 deletions src/operator/controllers/kafkaserverconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type KafkaServerConfigReconciler struct {
operatorPodNamespace string
otterizeClient otterizecloud.CloudClient
injectablerecorder.InjectableRecorder
serviceResolver serviceidresolver.ServiceResolver
}

func NewKafkaServerConfigReconciler(
Expand All @@ -70,6 +71,7 @@ func NewKafkaServerConfigReconciler(
operatorPodName string,
operatorPodNameSpace string,
cloudClient otterizecloud.CloudClient,
serviceResolver serviceidresolver.ServiceResolver,
) *KafkaServerConfigReconciler {
return &KafkaServerConfigReconciler{
Client: client,
Expand All @@ -78,6 +80,7 @@ func NewKafkaServerConfigReconciler(
operatorPodName: operatorPodName,
operatorPodNamespace: operatorPodNameSpace,
otterizeClient: cloudClient,
serviceResolver: serviceResolver,
}
}

Expand Down Expand Up @@ -182,13 +185,11 @@ func (r *KafkaServerConfigReconciler) ensureFinalizerRegistered(
}

func (r *KafkaServerConfigReconciler) createIntentsFromOperatorToKafkaServer(ctx context.Context, config *otterizev1alpha2.KafkaServerConfig) error {
operatorPod := &v1.Pod{}
err := r.Get(ctx, types.NamespacedName{Name: r.operatorPodName, Namespace: r.operatorPodNamespace}, operatorPod)
annotatedServiceName, ok, err := r.serviceResolver.GetPodAnnotatedName(ctx, r.operatorPodName, r.operatorPodNamespace)
if err != nil {
return err
}

annotatedServiceName, ok := serviceidresolver.ResolvePodToServiceIdentityUsingAnnotationOnly(operatorPod)
if !ok {
r.RecordWarningEventf(config, ReasonIntentsOperatorIdentityResolveFailed, "failed resolving intents operator identity - service name annotation required")
return fmt.Errorf("failed resolving intents operator identity - service name annotation required")
Expand All @@ -197,7 +198,7 @@ func (r *KafkaServerConfigReconciler) createIntentsFromOperatorToKafkaServer(ctx
newIntents := &otterizev1alpha2.ClientIntents{
ObjectMeta: metav1.ObjectMeta{
Name: formatIntentsName(config),
Namespace: operatorPod.Namespace,
Namespace: r.operatorPodNamespace,
},
Spec: &otterizev1alpha2.IntentsSpec{
Service: otterizev1alpha2.Service{
Expand Down
Loading

0 comments on commit 2c779f1

Please sign in to comment.