Skip to content

Commit

Permalink
add unit tests for atlas streams (#1613)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-urbaniak committed May 27, 2024
1 parent af4981e commit c04eab8
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -95,6 +96,95 @@ func TestEnsureAtlasStreamConnection(t *testing.T) {
assert.Equal(t, ctrl.Result{}, result)
})

t.Run("should unset finalizer and delete if reconciliation is skipped", func(t *testing.T) {
now := metav1.Now()
streamConnection := &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stream-processing-connection",
Namespace: "default",
DeletionTimestamp: &now,
Finalizers: []string{"mongodbatlas/finalizer"},
Annotations: map[string]string{
customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip,
},
},
Spec: akov2.AtlasStreamConnectionSpec{
Name: "sample-conn",
ConnectionType: "Sample",
},
}
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(streamConnection).
Build()

reconciler := &AtlasStreamsConnectionReconciler{
Client: k8sClient,
Log: zaptest.NewLogger(t).Sugar(),
}

result, err := reconciler.Reconcile(
context.Background(),
ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stream-processing-connection",
Namespace: "default",
},
},
)
assert.NoError(t, err)
assert.Equal(t, ctrl.Result{}, result)
err = k8sClient.Delete(context.Background(), streamConnection)
assert.True(t, apierrors.IsNotFound(err))
})

t.Run("should terminate upon failed patching when unsetting finalizer", func(t *testing.T) {
now := metav1.Now()
streamConnection := &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stream-processing-connection",
Namespace: "default",
DeletionTimestamp: &now,
Finalizers: []string{"mongodbatlas/finalizer"},
Annotations: map[string]string{
customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip,
},
},
Spec: akov2.AtlasStreamConnectionSpec{
Name: "sample-conn",
ConnectionType: "Sample",
},
}
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(streamConnection).
WithInterceptorFuncs(interceptor.Funcs{Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return errors.New("failed to patch")
}}).
Build()

reconciler := &AtlasStreamsConnectionReconciler{
Client: k8sClient,
Log: zaptest.NewLogger(t).Sugar(),
}

result, err := reconciler.Reconcile(
context.Background(),
ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stream-processing-connection",
Namespace: "default",
},
},
)
assert.NoError(t, err)
assert.Equal(t, ctrl.Result{RequeueAfter: workflow.DefaultRetry}, result)
})

t.Run("should transition to invalid state when resource version is invalid", func(t *testing.T) {
streamConnection := &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Expand Down
200 changes: 200 additions & 0 deletions pkg/controller/atlasstream/atlasstream_instance_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

atlasmock "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
Expand Down Expand Up @@ -1069,6 +1070,78 @@ func TestFindStreamInstancesForStreamConnection(t *testing.T) {
requests,
)
})

t.Run("should return no keys if listing fails", func(t *testing.T) {
instance1 := &akov2.AtlasStreamInstance{
ObjectMeta: metav1.ObjectMeta{
Name: "instance1",
Namespace: "default",
},
Spec: akov2.AtlasStreamInstanceSpec{
Name: "instance1",
ConnectionRegistry: []common.ResourceRefNamespaced{
{
Name: "connection",
Namespace: "default",
},
},
},
}
instance2 := &akov2.AtlasStreamInstance{
ObjectMeta: metav1.ObjectMeta{
Name: "instance2",
Namespace: "other-ns",
},
Spec: akov2.AtlasStreamInstanceSpec{
Name: "instance2",
ConnectionRegistry: []common.ResourceRefNamespaced{
{
Name: "connection",
Namespace: "default",
},
},
},
}
connection := &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Name: "connection",
Namespace: "default",
},
Status: status.AtlasStreamConnectionStatus{
Instances: []common.ResourceRefNamespaced{
{
Namespace: "ns1",
Name: "instance1",
},
{
Namespace: "ns2",
Name: "instance2",
},
},
},
}
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
streamInstanceIndexer := indexer.NewAtlasStreamInstanceByConnectionIndexer(zaptest.NewLogger(t))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(connection, instance1, instance2).
WithInterceptorFuncs(interceptor.Funcs{List: func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error {
return errors.New("failed to list instances")
}}).
WithIndex(
streamInstanceIndexer.Object(),
streamInstanceIndexer.Name(),
streamInstanceIndexer.Keys,
).
Build()
reconciler := &AtlasStreamsInstanceReconciler{
Client: k8sClient,
Log: zaptest.NewLogger(t).Sugar(),
}

assert.Empty(t, reconciler.findStreamInstancesForStreamConnection(context.Background(), connection))
})
}

func TestFindStreamInstancesForSecret(t *testing.T) {
Expand All @@ -1084,6 +1157,133 @@ func TestFindStreamInstancesForSecret(t *testing.T) {
assert.Equal(t, "watching Secret but got *v1.AtlasProject", logs.All()[0].Message)
})

t.Run("should return no keys if listing fails", func(t *testing.T) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "connection-credentials",
Namespace: "default",
},
Data: map[string][]byte{
"username": []byte("my-user"),
"password": []byte("my-pass"),
},
}
connection := &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Name: "connection",
Namespace: "default",
},
Spec: akov2.AtlasStreamConnectionSpec{
Name: "connection1",
ConnectionType: "Kafka",
KafkaConfig: &akov2.StreamsKafkaConnection{
Authentication: akov2.StreamsKafkaAuthentication{
Credentials: common.ResourceRefNamespaced{
Name: "connection-credentials",
Namespace: "default",
},
},
},
},
}
instance := &akov2.AtlasStreamInstance{
ObjectMeta: metav1.ObjectMeta{
Name: "instance",
Namespace: "default",
},
Spec: akov2.AtlasStreamInstanceSpec{
Name: "instance1",
ConnectionRegistry: []common.ResourceRefNamespaced{
{
Name: "connection",
Namespace: "default",
},
},
},
}
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
assert.NoError(t, corev1.AddToScheme(testScheme))
connectionIndexer := indexer.NewAtlasStreamConnectionBySecretIndexer(zaptest.NewLogger(t))
streamInstanceIndexer := indexer.NewAtlasStreamInstanceByConnectionIndexer(zaptest.NewLogger(t))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(secret, connection, instance).
WithIndex(
streamInstanceIndexer.Object(),
streamInstanceIndexer.Name(),
streamInstanceIndexer.Keys,
).
WithIndex(
connectionIndexer.Object(),
connectionIndexer.Name(),
connectionIndexer.Keys,
).
WithInterceptorFuncs(interceptor.Funcs{List: func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error {
return errors.New("failed to list instances")
}}).
Build()
reconciler := &AtlasStreamsInstanceReconciler{
Client: k8sClient,
Log: zaptest.NewLogger(t).Sugar(),
}

assert.Empty(t, reconciler.findStreamInstancesForSecret(context.Background(), secret))
})

t.Run("should return no keys if no connections have been found", func(t *testing.T) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "connection-credentials",
Namespace: "default",
},
Data: map[string][]byte{
"username": []byte("my-user"),
"password": []byte("my-pass"),
},
}
instance := &akov2.AtlasStreamInstance{
ObjectMeta: metav1.ObjectMeta{
Name: "instance",
Namespace: "default",
},
Spec: akov2.AtlasStreamInstanceSpec{
Name: "instance1",
ConnectionRegistry: []common.ResourceRefNamespaced{
{
Name: "connection",
Namespace: "default",
},
},
},
}
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
assert.NoError(t, corev1.AddToScheme(testScheme))
connectionIndexer := indexer.NewAtlasStreamConnectionBySecretIndexer(zaptest.NewLogger(t))
streamInstanceIndexer := indexer.NewAtlasStreamInstanceByConnectionIndexer(zaptest.NewLogger(t))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(secret, instance).
WithIndex(
streamInstanceIndexer.Object(),
streamInstanceIndexer.Name(),
streamInstanceIndexer.Keys,
).
WithIndex(
connectionIndexer.Object(),
connectionIndexer.Name(),
connectionIndexer.Keys,
).
Build()
reconciler := &AtlasStreamsInstanceReconciler{
Client: k8sClient,
Log: zaptest.NewLogger(t).Sugar(),
}

assert.Empty(t, reconciler.findStreamInstancesForSecret(context.Background(), secret))
})

t.Run("should return slice of requests for instances for related credentials secret", func(t *testing.T) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit c04eab8

Please sign in to comment.