Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Cluster struct {
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
streamApplications []string
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
EBSVolumes map[string]volumes.VolumeProperties
VolumeResizer volumes.VolumeResizer
Expand Down
23 changes: 17 additions & 6 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,17 @@ func (c *Cluster) deleteStreams() error {
return nil
}

err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("could not delete event stream custom resource: %v", err)
errors := make([]string, 0)
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{})
if err != nil {
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err))
}
}

if len(errors) > 0 {
return fmt.Errorf("could not delete all event stream custom resources: %v", strings.Join(errors, `', '`))
}

return nil
Expand Down Expand Up @@ -265,6 +273,11 @@ func (c *Cluster) syncStreams() error {
return nil
}

// fetch different application IDs from streams section
// there will be a separate event stream resource for each ID
appIds := gatherApplicationIds(c.Spec.Streams)
c.streamApplications = appIds

slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)

Expand Down Expand Up @@ -329,9 +342,7 @@ func (c *Cluster) syncStreams() error {
}

func (c *Cluster) createOrUpdateStreams() error {

appIds := gatherApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func TestGenerateFabricEventStream(t *testing.T) {
_, err := cluster.createStatefulSet()
assert.NoError(t, err)

// createOrUpdateStreams will loop over existing apps
cluster.streamApplications = []string{appId}

// create the streams
err = cluster.createOrUpdateStreams()
assert.NoError(t, err)
Expand Down Expand Up @@ -327,6 +330,10 @@ func TestUpdateFabricEventStream(t *testing.T) {
_, err := cluster.KubeClient.Postgresqls(namespace).Create(
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)

// createOrUpdateStreams will loop over existing apps
cluster.streamApplications = []string{appId}

err = cluster.createOrUpdateStreams()
assert.NoError(t, err)

Expand Down