Skip to content

Commit

Permalink
fix(kuma-cp) subscription finalizer, rev 2 (#2526)
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lobkov <lobkovilya@yandex.ru>
  • Loading branch information
lobkovilya committed Aug 16, 2021
1 parent 2013197 commit 1343424
Show file tree
Hide file tree
Showing 36 changed files with 1,036 additions and 200 deletions.
21 changes: 21 additions & 0 deletions api/generic/insights.go
@@ -0,0 +1,21 @@
package generic

import (
"time"

"github.com/golang/protobuf/proto"
)

type Insight interface {
proto.Message
IsOnline() bool
GetLastSubscription() Subscription
UpdateSubscription(Subscription) error
}

type Subscription interface {
proto.Message
GetId() string
GetGeneration() uint32
SetDisconnectTime(time time.Time)
}
16 changes: 14 additions & 2 deletions api/mesh/v1alpha1/dataplane_insight.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/mesh/v1alpha1/dataplane_insight.proto
Expand Up @@ -69,6 +69,10 @@ message DiscoverySubscription {

// Version of Envoy and Kuma dataplane
Version version = 6;

// Generation is an integer number which is periodically increased by the
// status sink
uint32 generation = 7;
}

// DiscoverySubscriptionStatus defines status of an ADS subscription.
Expand Down
74 changes: 48 additions & 26 deletions api/mesh/v1alpha1/dataplane_insight_helpers.go
Expand Up @@ -4,9 +4,14 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &DataplaneInsight{}

func NewSubscriptionStatus() *DiscoverySubscriptionStatus {
return &DiscoverySubscriptionStatus{
Total: &DiscoveryServiceStats{},
Expand All @@ -32,74 +37,80 @@ func NewVersion() *Version {
}
}

func (ds *DataplaneInsight) IsOnline() bool {
for _, s := range ds.GetSubscriptions() {
if s.ConnectTime != nil && s.DisconnectTime == nil {
func (x *DataplaneInsight) IsOnline() bool {
for _, s := range x.GetSubscriptions() {
if s.GetConnectTime() != nil && s.GetDisconnectTime() == nil {
return true
}
}
return false
}

func (ds *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range ds.GetSubscriptions() {
func (x *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
}
}
return -1, nil
}

func (ds *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time) error {
if ds.MTLS == nil {
ds.MTLS = &DataplaneInsight_MTLS{}
func (x *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time) error {
if x.MTLS == nil {
x.MTLS = &DataplaneInsight_MTLS{}
}
ts := timestamppb.New(expiration)
if err := ts.CheckValid(); err != nil {
return err
}
ds.MTLS.CertificateExpirationTime = ts
ds.MTLS.CertificateRegenerations++
x.MTLS.CertificateExpirationTime = ts
x.MTLS.CertificateRegenerations++
ts = timestamppb.New(generation)
if err := ts.CheckValid(); err != nil {
return err
}
ds.MTLS.LastCertificateRegeneration = ts
x.MTLS.LastCertificateRegeneration = ts
return nil
}

func (ds *DataplaneInsight) UpdateSubscription(s *DiscoverySubscription) {
if ds == nil {
return
func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return nil
}
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return errors.Errorf("invalid type %T for DataplaneInsight", s)
}
i, old := ds.GetSubscription(s.Id)
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
ds.Subscriptions[i] = s
x.Subscriptions[i] = discoverySubscription
} else {
ds.finalizeSubscriptions()
ds.Subscriptions = append(ds.Subscriptions, s)
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
// Because of the way we process subscriptions the lack of DisconnectTime on old subscription
// will cause wrong status.
func (ds *DataplaneInsight) finalizeSubscriptions() {
func (x *DataplaneInsight) finalizeSubscriptions() {
now := timestamppb.Now()
for _, subscription := range ds.GetSubscriptions() {
for _, subscription := range x.GetSubscriptions() {
if subscription.DisconnectTime == nil {
subscription.DisconnectTime = now
}
}
}

func (ds *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(ds.GetSubscriptions()) == 0 {
// todo(lobkovilya): delete GetLatestSubscription, use GetLastSubscription instead
func (x *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(x.GetSubscriptions()) == 0 {
return nil, nil
}
var idx int = 0
var latest *time.Time
for i, s := range ds.GetSubscriptions() {
for i, s := range x.GetSubscriptions() {
if err := s.ConnectTime.CheckValid(); err != nil {
continue
}
Expand All @@ -109,12 +120,23 @@ func (ds *DataplaneInsight) GetLatestSubscription() (*DiscoverySubscription, *ti
latest = &t
}
}
return ds.Subscriptions[idx], latest
return x.Subscriptions[idx], latest
}

func (x *DataplaneInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return nil
}
return x.GetSubscriptions()[len(x.GetSubscriptions())-1]
}

func (x *DiscoverySubscription) SetDisconnectTime(t time.Time) {
x.DisconnectTime = timestamppb.New(t)
}

func (ds *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
func (x *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range ds.GetSubscriptions() {
for _, s := range x.GetSubscriptions() {
result += v(s)
}
return result
Expand Down
33 changes: 29 additions & 4 deletions api/mesh/v1alpha1/dataplane_insight_helpers_test.go
Expand Up @@ -9,6 +9,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
. "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("DataplaneHelpers", func() {
Expand Down Expand Up @@ -36,7 +37,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -75,7 +76,7 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
status.UpdateSubscription(subscription)
Expect(status.UpdateSubscription(subscription)).To(Succeed())

// then
Expect(util_proto.ToYAML(status)).To(MatchYAML(`
Expand Down Expand Up @@ -116,15 +117,39 @@ var _ = Describe("DataplaneHelpers", func() {
}

// when
dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Expect(dataplaneInsight.UpdateSubscription(&DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
dataplaneInsight := &DataplaneInsight{
Subscriptions: []*DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := dataplaneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for DataplaneInsight"))
})
})

Describe("GetLatestSubscription()", func() {
Expand Down
28 changes: 23 additions & 5 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers.go
Expand Up @@ -3,9 +3,14 @@ package v1alpha1
import (
"time"

"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/kumahq/kuma/api/generic"
)

var _ generic.Insight = &ZoneIngressInsight{}

func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
if s.Id == id {
Expand All @@ -15,17 +20,22 @@ func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscrip
return -1, nil
}

func (x *ZoneIngressInsight) UpdateSubscription(s *DiscoverySubscription) {
func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
if x == nil {
return
return nil
}
i, old := x.GetSubscription(s.Id)
discoverySubscription, ok := s.(*DiscoverySubscription)
if !ok {
return errors.Errorf("invalid type %T for ZoneIngressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = s
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, s)
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
}
return nil
}

// If Kuma CP was killed ungracefully then we can get a subscription without a DisconnectTime.
Expand All @@ -49,6 +59,14 @@ func (x *ZoneIngressInsight) IsOnline() bool {
return false
}

func (x *ZoneIngressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return nil
}
return x.GetSubscriptions()[len(x.GetSubscriptions())-1]
}

// todo(lobkovilya): delete GetLatestSubscription, use GetLastSubscription instead
func (x *ZoneIngressInsight) GetLatestSubscription() (*DiscoverySubscription, *time.Time) {
if len(x.GetSubscriptions()) == 0 {
return nil, nil
Expand Down
29 changes: 27 additions & 2 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers_test.go
Expand Up @@ -8,6 +8,7 @@ import (

util_proto "github.com/kumahq/kuma/api/internal/util/proto"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
)

var _ = Describe("Zone Ingress Insights", func() {
Expand All @@ -31,14 +32,38 @@ var _ = Describe("Zone Ingress Insights", func() {
}

// when
zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Expect(zoneInsight.UpdateSubscription(&mesh_proto.DiscoverySubscription{
Id: "3",
ConnectTime: util_proto.MustTimestampProto(t1.Add(3 * time.Hour)),
})
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
// given
zoneInsight := &mesh_proto.ZoneIngressInsight{
Subscriptions: []*mesh_proto.DiscoverySubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
},
},
}

// when
err := zoneInsight.UpdateSubscription(&system_proto.KDSSubscription{})

// then
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("invalid type *v1alpha1.KDSSubscription for ZoneIngressInsight"))
})
})
})

0 comments on commit 1343424

Please sign in to comment.