diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index dcef602b9..dcd5b97dd 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -91,6 +91,7 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + streamApplications []string ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects EBSVolumes map[string]volumes.VolumeProperties VolumeResizer volumes.VolumeResizer diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 7325fa857..0236925ca 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -45,9 +45,17 @@ func (c *Cluster) deleteStreams() error { return nil } - err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) - if err != nil { - return fmt.Errorf("could not delete event stream custom resource: %v", err) + errors := make([]string, 0) + for _, appId := range c.streamApplications { + fesName := fmt.Sprintf("%s-%s", c.Name, appId) + err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{}) + if err != nil { + errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err)) + } + } + + if len(errors) > 0 { + return fmt.Errorf("could not delete all event stream custom resources: %v", strings.Join(errors, `', '`)) } return nil @@ -265,6 +273,11 @@ func (c *Cluster) syncStreams() error { return nil } + // fetch different application IDs from streams section + // there will be a separate event stream resource for each ID + appIds := gatherApplicationIds(c.Spec.Streams) + c.streamApplications = appIds + slots := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) @@ -329,9 +342,7 @@ func (c *Cluster) syncStreams() error { } func (c *Cluster) createOrUpdateStreams() error { - - appIds := gatherApplicationIds(c.Spec.Streams) - for _, appId := range appIds { + for _, appId := range c.streamApplications { fesName := fmt.Sprintf("%s-%s", c.Name, appId) effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) if err != nil { diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 7ebb1c89a..0094708a4 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -196,6 +196,9 @@ func TestGenerateFabricEventStream(t *testing.T) { _, err := cluster.createStatefulSet() assert.NoError(t, err) + // createOrUpdateStreams will loop over existing apps + cluster.streamApplications = []string{appId} + // create the streams err = cluster.createOrUpdateStreams() assert.NoError(t, err) @@ -327,6 +330,10 @@ func TestUpdateFabricEventStream(t *testing.T) { _, err := cluster.KubeClient.Postgresqls(namespace).Create( context.TODO(), &pg, metav1.CreateOptions{}) assert.NoError(t, err) + + // createOrUpdateStreams will loop over existing apps + cluster.streamApplications = []string{appId} + err = cluster.createOrUpdateStreams() assert.NoError(t, err)