Skip to content

Commit

Permalink
wire context in streams processing integration test (#1611)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-urbaniak committed May 27, 2024
1 parent c04eab8 commit 4b40957
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions test/int/atlas_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ YJZC5C0=
-----END CERTIFICATE-----
`

BeforeEach(func() {
BeforeEach(func(ctx context.Context) {
By("Starting the operator", func() {
testNamespace, stopManager = prepareControllers(false)
Expect(testNamespace).ToNot(BeNil())
Expand All @@ -64,7 +64,7 @@ YJZC5C0=

By("Creating project connection secret", func() {
connectionSecret = buildConnectionSecret(fmt.Sprintf("%s-atlas-key", testNamespace.Name))
Expect(k8sClient.Create(context.Background(), &connectionSecret)).To(Succeed())
Expect(k8sClient.Create(ctx, &connectionSecret)).To(Succeed())
})

By("Creating a project", func() {
Expand All @@ -73,7 +73,7 @@ YJZC5C0=

testProject = akov2.NewProject(testNamespace.Name, projectName, projectName).
WithConnectionSecret(connectionSecret.Name)
Expect(k8sClient.Create(context.Background(), testProject)).To(Succeed())
Expect(k8sClient.Create(ctx, testProject)).To(Succeed())

Eventually(func() bool {
return resources.CheckCondition(k8sClient, testProject, api.TrueCondition(api.ReadyType))
Expand All @@ -82,7 +82,7 @@ YJZC5C0=
})

Describe("When creating a stream instance with 2 connections", func() {
It("Should successfully manage instance and connections", func() {
It("Should successfully manage instance and connections", func(ctx context.Context) {
var sampleConnection *akov2.AtlasStreamConnection
var kafkaConnection *akov2.AtlasStreamConnection

Expand All @@ -98,7 +98,7 @@ YJZC5C0=
},
}

Expect(k8sClient.Create(context.Background(), sampleConnection)).To(Succeed())
Expect(k8sClient.Create(ctx, sampleConnection)).To(Succeed())
Expect(sampleConnection.GetFinalizers()).To(BeEmpty())
})

Expand All @@ -116,7 +116,7 @@ YJZC5C0=
"password": "kafka_pass",
},
}
Expect(k8sClient.Create(context.Background(), kafkaUserPassSecret)).To(Succeed())
Expect(k8sClient.Create(ctx, kafkaUserPassSecret)).To(Succeed())

kafkaCertificateSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -130,7 +130,7 @@ YJZC5C0=
"certificate": certificate,
},
}
Expect(k8sClient.Create(context.Background(), kafkaCertificateSecret)).To(Succeed())
Expect(k8sClient.Create(ctx, kafkaCertificateSecret)).To(Succeed())

kafkaConnection = &akov2.AtlasStreamConnection{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -159,7 +159,7 @@ YJZC5C0=
},
},
}
Expect(k8sClient.Create(context.Background(), kafkaConnection)).To(Succeed())
Expect(k8sClient.Create(ctx, kafkaConnection)).To(Succeed())
Expect(kafkaConnection.GetFinalizers()).To(BeEmpty())
})

Expand Down Expand Up @@ -192,106 +192,106 @@ YJZC5C0=
},
},
}
Expect(k8sClient.Create(context.Background(), streamInstance)).To(Succeed())
Expect(k8sClient.Create(ctx, streamInstance)).To(Succeed())

checkInstanceIsReady(client.ObjectKeyFromObject(streamInstance))
checkInstanceIsReady(ctx, client.ObjectKeyFromObject(streamInstance))
})

By("Updating the instance", func() {
Eventually(func(g Gomega) {
streamInstance := &akov2.AtlasStreamInstance{}
g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())

streamInstance.Spec.Config.Region = "DUBLIN_IRL"
g.Expect(k8sClient.Update(context.Background(), streamInstance)).To(Succeed())
g.Expect(k8sClient.Update(ctx, streamInstance)).To(Succeed())
}).WithTimeout(time.Minute).WithPolling(PollingInterval)

checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
})

By("Updating a connection", func() {
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed())
kafkaConnection.Spec.KafkaConfig.BootstrapServers = "kafka.server1:9001,kafka.server2:9002,kafka.server3:9003"
g.Expect(k8sClient.Update(context.Background(), kafkaConnection)).To(Succeed())
g.Expect(k8sClient.Update(ctx, kafkaConnection)).To(Succeed())
}).WithTimeout(time.Minute).WithPolling(PollingInterval)

checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
})

By("Updating a secret", func() {
Eventually(func(g Gomega) {
s := corev1.Secret{}
g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, &s)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, &s)).To(Succeed())
s.Data["username"] = []byte("kafka_user_changed")
g.Expect(k8sClient.Update(context.Background(), &s)).To(Succeed())
g.Expect(k8sClient.Update(ctx, &s)).To(Succeed())
}).WithTimeout(time.Minute).WithPolling(PollingInterval)

checkInstanceIsReady(client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
checkInstanceIsReady(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name})
})

By("Releasing a connection when removed from instance", func() {
streamInstance := &akov2.AtlasStreamInstance{}
Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())

streamInstance.Spec.ConnectionRegistry = []common.ResourceRefNamespaced{
{
Name: sampleConnection.Name,
Namespace: sampleConnection.Namespace,
},
}
Expect(k8sClient.Update(context.Background(), streamInstance)).To(Succeed())
Expect(k8sClient.Update(ctx, streamInstance)).To(Succeed())

checkInstanceIsReady(client.ObjectKeyFromObject(streamInstance))
checkInstanceIsReady(ctx, client.ObjectKeyFromObject(streamInstance))

Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).To(Succeed())
g.Expect(kafkaConnection.GetFinalizers()).To(BeEmpty())
}).WithTimeout(2 * time.Minute).WithPolling(PollingInterval).Should(Succeed())
})

By("Deleting instance and connections", func() {
Expect(k8sClient.Delete(context.Background(), kafkaConnection)).To(Succeed())
Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).ToNot(Succeed())
Expect(k8sClient.Delete(ctx, kafkaConnection)).To(Succeed())
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(kafkaConnection), kafkaConnection)).ToNot(Succeed())

Expect(k8sClient.Delete(context.Background(), sampleConnection)).To(Succeed())
Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(sampleConnection), sampleConnection)).To(Succeed())
Expect(k8sClient.Delete(ctx, sampleConnection)).To(Succeed())
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleConnection), sampleConnection)).To(Succeed())
Expect(sampleConnection.DeletionTimestamp).ShouldNot(BeNil())

streamInstance := &akov2.AtlasStreamInstance{}
Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())
Expect(k8sClient.Delete(context.Background(), streamInstance))
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: resourceName, Namespace: testNamespace.Name}, streamInstance)).To(Succeed())
Expect(k8sClient.Delete(ctx, streamInstance))

Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(sampleConnection), sampleConnection)).ToNot(Succeed())
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(streamInstance), streamInstance)).ToNot(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleConnection), sampleConnection)).ToNot(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(streamInstance), streamInstance)).ToNot(Succeed())
}).WithTimeout(5 * time.Minute).WithPolling(PollingInterval).Should(Succeed())
})
})
})

AfterEach(func() {
AfterEach(func(ctx context.Context) {
By("Deleting stream connection secrets", func() {
Eventually(func(g Gomega) {
secret := &corev1.Secret{}
g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed())
g.Expect(k8sClient.Delete(context.Background(), secret)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaUserPassSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed())
g.Expect(k8sClient.Delete(ctx, secret)).To(Succeed())
}).WithTimeout(1 * time.Minute).WithPolling(PollingInterval).Should(Succeed())

Eventually(func(g Gomega) {
secret := &corev1.Secret{}
g.Expect(k8sClient.Get(context.Background(), client.ObjectKey{Name: kafkaCertificateSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed())
g.Expect(k8sClient.Delete(context.Background(), secret)).To(Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: kafkaCertificateSecretName, Namespace: testNamespace.Name}, secret)).To(Succeed())
g.Expect(k8sClient.Delete(ctx, secret)).To(Succeed())
}).WithTimeout(1 * time.Minute).WithPolling(PollingInterval).Should(Succeed())
})

By("Deleting project", func() {
if testProject != nil {
projectID := testProject.ID()
Expect(k8sClient.Delete(context.Background(), testProject)).To(Succeed())
Expect(k8sClient.Delete(ctx, testProject)).To(Succeed())

Eventually(func(g Gomega) {
_, r, err := atlasClient.ProjectsApi.GetProject(context.Background(), projectID).Execute()
_, r, err := atlasClient.ProjectsApi.GetProject(ctx, projectID).Execute()
g.Expect(err).ToNot(BeNil())
g.Expect(r).ToNot(BeNil())
g.Expect(r.StatusCode).To(Equal(http.StatusNotFound))
Expand All @@ -300,36 +300,36 @@ YJZC5C0=
})

By("Deleting project connection secret", func() {
Expect(k8sClient.Delete(context.Background(), &connectionSecret)).To(Succeed())
Expect(k8sClient.Delete(ctx, &connectionSecret)).To(Succeed())
})

By("Stopping the operator", func() {
stopManager()
err := k8sClient.Delete(context.Background(), testNamespace)
err := k8sClient.Delete(ctx, testNamespace)
Expect(err).ToNot(HaveOccurred())
})
})
})

func checkInstanceIsReady(instanceObjKey client.ObjectKey) {
func checkInstanceIsReady(ctx context.Context, instanceObjKey client.ObjectKey) {
readyConditions := conditions.MatchConditions(
api.TrueCondition(api.ReadyType),
api.TrueCondition(api.ResourceVersionStatus),
api.TrueCondition(api.StreamInstanceReadyType),
)
Eventually(func(g Gomega) {
streamInstance := &akov2.AtlasStreamInstance{}
g.Expect(k8sClient.Get(context.Background(), instanceObjKey, streamInstance)).To(Succeed())
g.Expect(k8sClient.Get(ctx, instanceObjKey, streamInstance)).To(Succeed())
g.Expect(streamInstance.Status.Conditions).To(ContainElements(readyConditions))
}).WithTimeout(5 * time.Minute).WithPolling(PollingInterval).Should(Succeed())

Eventually(func(g Gomega) {
streamInstance := &akov2.AtlasStreamInstance{}
g.Expect(k8sClient.Get(context.Background(), instanceObjKey, streamInstance)).To(Succeed())
g.Expect(k8sClient.Get(ctx, instanceObjKey, streamInstance)).To(Succeed())

for _, connectionRef := range streamInstance.Spec.ConnectionRegistry {
connection := &akov2.AtlasStreamConnection{}
g.Expect(k8sClient.Get(context.Background(), *connectionRef.GetObject(streamInstance.Namespace), connection)).To(Succeed())
g.Expect(k8sClient.Get(ctx, *connectionRef.GetObject(streamInstance.Namespace), connection)).To(Succeed())
g.Expect(connection.GetFinalizers()).ToNot(BeEmpty())
}
}).WithTimeout(2 * time.Minute).WithPolling(PollingInterval).Should(Succeed())
Expand Down

0 comments on commit 4b40957

Please sign in to comment.