Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MGMT-16037: fix messaging errors on big clusters #5628

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/bminventory/inventory.go
Expand Up @@ -511,7 +511,8 @@ func (b *bareMetalInventory) RegisterClusterInternal(
defer func() {
if success {
if err == nil && cluster != nil {
err = b.stream.Notify(ctx, cluster)
notifiableCluster := stream.GetNotifiableCluster(cluster)
err = b.stream.Notify(ctx, notifiableCluster)
if err != nil {
log.WithError(err).Warning("failed to notify cluster registration event")
}
Expand Down Expand Up @@ -1657,7 +1658,8 @@ func (b *bareMetalInventory) UpdateClusterInstallConfigInternal(ctx context.Cont
}
txSuccess = true
if err == nil {
err = b.stream.Notify(ctx, cluster)
notifiableCluster := stream.GetNotifiableCluster(cluster)
err = b.stream.Notify(ctx, notifiableCluster)
if err != nil {
log.WithError(err).Warning("failed to notify cluster update event")
}
Expand Down Expand Up @@ -2085,7 +2087,8 @@ func (b *bareMetalInventory) v2UpdateClusterInternal(ctx context.Context, params
}

if cluster != nil {
err = b.stream.Notify(ctx, cluster)
notifiableCluster := stream.GetNotifiableCluster(cluster)
err = b.stream.Notify(ctx, notifiableCluster)
if err != nil {
log.WithError(err).Warning("failed to notify cluster update event")
}
Expand Down
4 changes: 3 additions & 1 deletion internal/cluster/common.go
Expand Up @@ -191,9 +191,11 @@ func UpdateCluster(ctx context.Context, log logrus.FieldLogger, db *gorm.DB, not

cluster, err := common.GetClusterFromDB(db, clusterId, common.UseEagerLoading)
if err == nil {
if err = notificationStream.Notify(ctx, cluster); err != nil {
notifiableCluster := stream.GetNotifiableCluster(cluster)
if err = notificationStream.Notify(ctx, notifiableCluster); err != nil {
log.WithError(err).Warning("failed to notify cluster update event")
}
return cluster, nil
}
return cluster, err
}
Expand Down
14 changes: 14 additions & 0 deletions internal/stream/utils.go
@@ -0,0 +1,14 @@
package stream

import (
"github.com/openshift/assisted-service/internal/common"
"github.com/openshift/assisted-service/models"
)

func GetNotifiableCluster(cluster *common.Cluster) *common.Cluster {
// notify smaller cluster object. We already notify hosts updates that could become
// problematic in bigger clusters, because the underlying max size of a message
notifiableCluster := *cluster
notifiableCluster.Hosts = []*models.Host{}
return &notifiableCluster
}
40 changes: 40 additions & 0 deletions internal/stream/utils_test.go
@@ -0,0 +1,40 @@
package stream_test

import (
"github.com/go-openapi/strfmt"
"github.com/google/uuid"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/openshift/assisted-service/internal/common"
"github.com/openshift/assisted-service/internal/stream"
"github.com/openshift/assisted-service/models"
)

var _ = Describe("Get notifiable cluster", func() {
It("should remove hosts field", func() {
id := strfmt.UUID(uuid.New().String())
clusterId := strfmt.UUID(uuid.New().String())
clusterModel := models.Cluster{
ID: &clusterId,
Hosts: []*models.Host{
{
ID: &id,
},
},
}
cluster := &common.Cluster{
Cluster: clusterModel,
}
notifiableCluster := stream.GetNotifiableCluster(cluster)
Expect(notifiableCluster).ShouldNot(Equal(cluster))

clusterModelWithEmptyHosts := models.Cluster{
ID: &clusterId,
Hosts: []*models.Host{},
}
clusterWithEmptyHosts := &common.Cluster{
Cluster: clusterModelWithEmptyHosts,
}
Expect(notifiableCluster).Should(Equal(clusterWithEmptyHosts))
})
})
3 changes: 2 additions & 1 deletion internal/usage/manager.go
Expand Up @@ -68,7 +68,8 @@ func (m *UsageManager) Save(db *gorm.DB, clusterId strfmt.UUID, usages FeatureUs
m.log.WithError(err).Warning("error retrieving updated cluster for notification")
}

err = m.stream.Notify(context.Background(), cluster)
notifiableCluster := stream.GetNotifiableCluster(cluster)
err = m.stream.Notify(context.Background(), notifiableCluster)
if err != nil {
m.log.WithError(err).Warning("failed to notify cluster event (feature usage)")
}
Expand Down