From 4b409570d86df160270b96a55478013f1494cd0b Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Mon, 27 May 2024 15:32:47 +0200 Subject: [PATCH] wire context in streams processing integration test (#1611) --- test/int/atlas_streams_test.go | 88 +++++++++++++++++----------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/test/int/atlas_streams_test.go b/test/int/atlas_streams_test.go index ff7205fbca..efa4bcdb8b 100644 --- a/test/int/atlas_streams_test.go +++ b/test/int/atlas_streams_test.go @@ -55,7 +55,7 @@ YJZC5C0= -----END CERTIFICATE----- ` - BeforeEach(func() { + BeforeEach(func(ctx context.Context) { By("Starting the operator", func() { testNamespace, stopManager = prepareControllers(false) Expect(testNamespace).ToNot(BeNil()) @@ -64,7 +64,7 @@ YJZC5C0= By("Creating project connection secret", func() { connectionSecret = buildConnectionSecret(fmt.Sprintf("%s-atlas-key", testNamespace.Name)) - Expect(k8sClient.Create(context.Background(), &connectionSecret)).To(Succeed()) + Expect(k8sClient.Create(ctx, &connectionSecret)).To(Succeed()) }) By("Creating a project", func() { @@ -73,7 +73,7 @@ YJZC5C0= testProject = akov2.NewProject(testNamespace.Name, projectName, projectName). WithConnectionSecret(connectionSecret.Name) - Expect(k8sClient.Create(context.Background(), testProject)).To(Succeed()) + Expect(k8sClient.Create(ctx, testProject)).To(Succeed()) Eventually(func() bool { return resources.CheckCondition(k8sClient, testProject, api.TrueCondition(api.ReadyType)) @@ -82,7 +82,7 @@ YJZC5C0= }) Describe("When creating a stream instance with 2 connections", func() { - It("Should successfully manage instance and connections", func() { + It("Should successfully manage instance and connections", func(ctx context.Context) { var sampleConnection *akov2.AtlasStreamConnection var kafkaConnection *akov2.AtlasStreamConnection @@ -98,7 +98,7 @@ YJZC5C0= }, } - Expect(k8sClient.Create(context.Background(), sampleConnection)).To(Succeed()) + Expect(k8sClient.Create(ctx, sampleConnection)).To(Succeed()) Expect(sampleConnection.GetFinalizers()).To(BeEmpty()) }) @@ -116,7 +116,7 @@ YJZC5C0= "password": "kafka_pass", }, } - Expect(k8sClient.Create(context.Background(), kafkaUserPassSecret)).To(Succeed()) + Expect(k8sClient.Create(ctx, kafkaUserPassSecret)).To(Succeed()) kafkaCertificateSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -130,7 +130,7 @@ YJZC5C0= "certificate": certificate, }, } - Expect(k8sClient.Create(context.Background(), kafkaCertificateSecret)).To(Succeed()) + Expect(k8sClient.Create(ctx, kafkaCertificateSecret)).To(Succeed()) kafkaConnection = &akov2.AtlasStreamConnection{ ObjectMeta: metav1.ObjectMeta{ @@ -159,7 +159,7 @@ YJZC5C0= }, }, } - Expect(k8sClient.Create(context.Background(), kafkaConnection)).To(Succeed()) + Expect(k8sClient.Create(ctx, kafkaConnection)).To(Succeed()) Expect(kafkaConnection.GetFinalizers()).To(BeEmpty()) }) @@ -192,47 +192,47 @@ YJZC5C0= }, }, } - Expect(k8sClient.Create(context.Background(), streamInstance)).To(Succeed()) + Expect(k8sClient.Create(ctx, streamInstance)).To(Succeed()) - checkInstanceIsReady(client.ObjectKeyFromObject(streamInstance)) + checkInstanceIsReady(ctx, client.ObjectKeyFromObject(streamInstance)) }) By("Updating the instance", func() { Eventually(func(g Gomega) { streamInstance := &akov2.AtlasStreamInstance{} - g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) streamInstance.Spec.Config.Region = "DUBLIN_IRL" - g.Expect(k8sClient.Update(context.Background(), streamInstance)).To(Succeed()) + g.Expect(k8sClient.Update(ctx, streamInstance)).To(Succeed()) }).WithTimeout(time.Minute).WithPolling(PollingInterval) - checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) + checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) }) By("Updating a connection", func() { Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed()) kafkaConnection.Spec.KafkaConfig.BootstrapServers = "kafka.server1:9001,kafka.server2:9002,kafka.server3:9003" - g.Expect(k8sClient.Update(context.Background(), kafkaConnection)).To(Succeed()) + g.Expect(k8sClient.Update(ctx, kafkaConnection)).To(Succeed()) }).WithTimeout(time.Minute).WithPolling(PollingInterval) - checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) + checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) }) By("Updating a secret", func() { Eventually(func(g Gomega) { s := corev1.Secret{} - g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, &s)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, &s)).To(Succeed()) s.Data["username"] = []byte("kafka_user_changed") - g.Expect(k8sClient.Update(context.Background(), &s)).To(Succeed()) + g.Expect(k8sClient.Update(ctx, &s)).To(Succeed()) }).WithTimeout(time.Minute).WithPolling(PollingInterval) - checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) + checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}) }) By("Releasing a connection when removed from instance", func() { streamInstance := &akov2.AtlasStreamInstance{} - Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) + Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) streamInstance.Spec.ConnectionRegistry = []common.ResourceRefNamespaced{ { @@ -240,58 +240,58 @@ YJZC5C0= Namespace: sampleConnection.Namespace, }, } - Expect(k8sClient.Update(context.Background(), streamInstance)).To(Succeed()) + Expect(k8sClient.Update(ctx, streamInstance)).To(Succeed()) - checkInstanceIsReady(client.ObjectKeyFromObject(streamInstance)) + checkInstanceIsReady(ctx, client.ObjectKeyFromObject(streamInstance)) Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed()) g.Expect(kafkaConnection.GetFinalizers()).To(BeEmpty()) }).WithTimeout(2 * time.Minute).WithPolling(PollingInterval).Should(Succeed()) }) By("Deleting instance and connections", func() { - Expect(k8sClient.Delete(context.Background(), kafkaConnection)).To(Succeed()) - Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).ToNot(Succeed()) + Expect(k8sClient.Delete(ctx, kafkaConnection)).To(Succeed()) + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).ToNot(Succeed()) - Expect(k8sClient.Delete(context.Background(), sampleConnection)).To(Succeed()) - Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(sampleConnection), sampleConnection)).To(Succeed()) + Expect(k8sClient.Delete(ctx, sampleConnection)).To(Succeed()) + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleConnection), sampleConnection)).To(Succeed()) Expect(sampleConnection.DeletionTimestamp).ShouldNot(BeNil()) streamInstance := &akov2.AtlasStreamInstance{} - Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) - Expect(k8sClient.Delete(context.Background(), streamInstance)) + Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed()) + Expect(k8sClient.Delete(ctx, streamInstance)) Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(sampleConnection), sampleConnection)).ToNot(Succeed()) - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(streamInstance), streamInstance)).ToNot(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleConnection), sampleConnection)).ToNot(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(streamInstance), streamInstance)).ToNot(Succeed()) }).WithTimeout(5 * time.Minute).WithPolling(PollingInterval).Should(Succeed()) }) }) }) - AfterEach(func() { + AfterEach(func(ctx context.Context) { By("Deleting stream connection secrets", func() { Eventually(func(g Gomega) { secret := &corev1.Secret{} - g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed()) - g.Expect(k8sClient.Delete(context.Background(), secret)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed()) + g.Expect(k8sClient.Delete(ctx, secret)).To(Succeed()) }).WithTimeout(1 * time.Minute).WithPolling(PollingInterval).Should(Succeed()) Eventually(func(g Gomega) { secret := &corev1.Secret{} - g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaCertificateSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed()) - g.Expect(k8sClient.Delete(context.Background(), secret)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaCertificateSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed()) + g.Expect(k8sClient.Delete(ctx, secret)).To(Succeed()) }).WithTimeout(1 * time.Minute).WithPolling(PollingInterval).Should(Succeed()) }) By("Deleting project", func() { if testProject != nil { projectID := testProject.ID() - Expect(k8sClient.Delete(context.Background(), testProject)).To(Succeed()) + Expect(k8sClient.Delete(ctx, testProject)).To(Succeed()) Eventually(func(g Gomega) { - _, r, err := atlasClient.ProjectsApi.GetProject(context.Background(), projectID).Execute() + _, r, err := atlasClient.ProjectsApi.GetProject(ctx, projectID).Execute() g.Expect(err).ToNot(BeNil()) g.Expect(r).ToNot(BeNil()) g.Expect(r.StatusCode).To(Equal(http.StatusNotFound)) @@ -300,18 +300,18 @@ YJZC5C0= }) By("Deleting project connection secret", func() { - Expect(k8sClient.Delete(context.Background(), &connectionSecret)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &connectionSecret)).To(Succeed()) }) By("Stopping the operator", func() { stopManager() - err := k8sClient.Delete(context.Background(), testNamespace) + err := k8sClient.Delete(ctx, testNamespace) Expect(err).ToNot(HaveOccurred()) }) }) }) -func checkInstanceIsReady(instanceObjKey client.ObjectKey) { +func checkInstanceIsReady(ctx context.Context, instanceObjKey client.ObjectKey) { readyConditions := conditions.MatchConditions( api.TrueCondition(api.ReadyType), api.TrueCondition(api.ResourceVersionStatus), @@ -319,17 +319,17 @@ func checkInstanceIsReady(instanceObjKey client.ObjectKey) { ) Eventually(func(g Gomega) { streamInstance := &akov2.AtlasStreamInstance{} - g.Expect(k8sClient.Get(context.Background(), instanceObjKey, streamInstance)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, instanceObjKey, streamInstance)).To(Succeed()) g.Expect(streamInstance.Status.Conditions).To(ContainElements(readyConditions)) }).WithTimeout(5 * time.Minute).WithPolling(PollingInterval).Should(Succeed()) Eventually(func(g Gomega) { streamInstance := &akov2.AtlasStreamInstance{} - g.Expect(k8sClient.Get(context.Background(), instanceObjKey, streamInstance)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, instanceObjKey, streamInstance)).To(Succeed()) for _, connectionRef := range streamInstance.Spec.ConnectionRegistry { connection := &akov2.AtlasStreamConnection{} - g.Expect(k8sClient.Get(context.Background(), *connectionRef.GetObject(streamInstance.Namespace), connection)).To(Succeed()) + g.Expect(k8sClient.Get(ctx, *connectionRef.GetObject(streamInstance.Namespace), connection)).To(Succeed()) g.Expect(connection.GetFinalizers()).ToNot(BeEmpty()) } }).WithTimeout(2 * time.Minute).WithPolling(PollingInterval).Should(Succeed())