Skip to content

Commit

Permalink
feat(meshmultizoneservice): update status (#10648)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz committed Jun 28, 2024
1 parent a16b7dd commit 705f084
Show file tree
Hide file tree
Showing 16 changed files with 465 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ policies:
- meshtraces
- meshtrafficpermissions
coreResources:
status:
# How often we compute status of MeshMultiZoneService
meshMultiZoneServiceInterval: 5s # ENV: KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL
# How often we compute status of MeshService
meshServiceInterval: 5s # ENV: KUMA_CORE_RESOURCES_STATUS_MESH_SERVICE_INTERVAL
enabled: # ENV: KUMA_CORE_RESOURCES_ENABLED
- hostnamegenerators
- meshexternalservices
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ policies:
- meshtraces
- meshtrafficpermissions
coreResources:
status:
# How often we compute status of MeshMultiZoneService
meshMultiZoneServiceInterval: 5s # ENV: KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL
# How often we compute status of MeshService
meshServiceInterval: 5s # ENV: KUMA_CORE_RESOURCES_STATUS_MESH_SERVICE_INTERVAL
enabled: # ENV: KUMA_CORE_RESOURCES_ENABLED
- hostnamegenerators
- meshexternalservices
Expand Down
26 changes: 26 additions & 0 deletions pkg/config/core/resources/apis/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
package apis

import (
"time"

config_types "github.com/kumahq/kuma/pkg/config/types"
)

type Config struct {
// List of enabled core resources
Enabled []string `json:"enabled" envconfig:"KUMA_CORE_RESOURCES_ENABLED" default:""`
// Status of core resources
Status ConfigStatus `json:"status"`
}

type ConfigStatus struct {
// How often we compute status of MeshMultiZoneService
MeshMultiZoneServiceInterval config_types.Duration `json:"meshMultiZoneServiceInterval" envconfig:"KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL"`
// How often we compute status of MeshService
MeshServiceInterval config_types.Duration `json:"meshServiceInterval" envconfig:"KUMA_CORE_RESOURCES_STATUS_MESH_SERVICE_INTERVAL"`
}

func Default() *Config {
return &Config{
Enabled: DefaultEnabled,
Status: ConfigStatus{
MeshMultiZoneServiceInterval: config_types.Duration{Duration: 5 * time.Second},
MeshServiceInterval: config_types.Duration{Duration: 5 * time.Second},
},
}
}

func (c *Config) PostProcess() error {
Expand Down
6 changes: 0 additions & 6 deletions pkg/config/core/resources/apis/zz_generated.policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,3 @@ var DefaultEnabled = []string{
"meshmultizoneservices",
"meshservices",
}

func Default() *Config {
return &Config{
Enabled: DefaultEnabled,
}
}
9 changes: 9 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ var _ = Describe("Config loader", func() {
Expect(cfg.IPAM.MeshService.CIDR).To(Equal("251.0.0.0/8"))
Expect(cfg.IPAM.MeshExternalService.CIDR).To(Equal("252.0.0.0/8"))
Expect(cfg.IPAM.AllocationInterval.Duration).To(Equal(7 * time.Second))

Expect(cfg.CoreResources.Enabled).To(Equal([]string{"meshservice"}))
Expect(cfg.CoreResources.Status.MeshServiceInterval.Duration).To(Equal(6 * time.Second))
Expect(cfg.CoreResources.Status.MeshMultiZoneServiceInterval.Duration).To(Equal(7 * time.Second))
},
Entry("from config file", testCase{
envVars: map[string]string{},
Expand Down Expand Up @@ -745,6 +749,9 @@ eventBus:
coreResources:
enabled:
- meshservice
status:
meshServiceInterval: 6s
meshMultiZoneServiceInterval: 7s
policies:
pluginPoliciesEnabled:
- meshaccesslog
Expand Down Expand Up @@ -1038,6 +1045,8 @@ ipam:
"KUMA_EVENT_BUS_BUFFER_SIZE": "30",
"KUMA_PLUGIN_POLICIES_ENABLED": "meshaccesslog,meshcircuitbreaker",
"KUMA_CORE_RESOURCES_ENABLED": "meshservice",
"KUMA_CORE_RESOURCES_STATUS_MESH_SERVICE_INTERVAL": "6s",
"KUMA_CORE_RESOURCES_STATUS_MESH_MULTI_ZONE_SERVICE_INTERVAL": "7s",
"KUMA_IPAM_MESH_SERVICE_CIDR": "251.0.0.0/8",
"KUMA_IPAM_MESH_EXTERNAL_SERVICE_CIDR": "252.0.0.0/8",
"KUMA_IPAM_ALLOCATION_INTERVAL": "7s",
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/plugins/policies/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ func (c *Config) Sanitize() {
func (c *Config) Validate() error {
return nil
}

func Default() *Config {
return &Config{
Enabled: DefaultEnabled,
}
}
6 changes: 0 additions & 6 deletions pkg/config/plugins/policies/zz_generated.policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,3 @@ var DefaultEnabled = []string{
"meshtraces",
"meshtrafficpermissions",
}

func Default() *Config {
return &Config{
Enabled: DefaultEnabled,
}
}
139 changes: 139 additions & 0 deletions pkg/core/resources/apis/meshmultizoneservice/status_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package meshmultizoneservice

import (
"context"
"reflect"
"sort"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

meshmzservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshmultizoneservice/api/v1alpha1"
meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/core/user"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
util_maps "github.com/kumahq/kuma/pkg/util/maps"
)

type StatusUpdater struct {
roResManager manager.ReadOnlyResourceManager
resManager manager.ResourceManager
logger logr.Logger
metric prometheus.Summary
interval time.Duration
}

var _ component.Component = &StatusUpdater{}

func NewStatusUpdater(
logger logr.Logger,
roResManager manager.ReadOnlyResourceManager,
resManager manager.ResourceManager,
interval time.Duration,
metrics core_metrics.Metrics,
) (component.Component, error) {
metric := prometheus.NewSummary(prometheus.SummaryOpts{
Name: "component_mzms_status_updater",
Help: "Summary of MeshMultizoneService Updater component",
Objectives: core_metrics.DefaultObjectives,
})
if err := metrics.Register(metric); err != nil {
return nil, err
}
return &StatusUpdater{
roResManager: roResManager,
resManager: resManager,
logger: logger,
metric: metric,
interval: interval,
}, nil
}

func (s *StatusUpdater) Start(stop <-chan struct{}) error {
s.logger.Info("starting")
ticker := time.NewTicker(s.interval)
ctx := user.Ctx(context.Background(), user.ControlPlane)

for {
select {
case <-ticker.C:
start := time.Now()
if err := s.updateStatus(ctx); err != nil {
s.logger.Error(err, "could not update status of mesh multizone services")
}
s.metric.Observe(float64(time.Since(start).Milliseconds()))
case <-stop:
s.logger.Info("stopping")
return nil
}
}
}

func (s *StatusUpdater) updateStatus(ctx context.Context) error {
mzSvcList := &meshmzservice_api.MeshMultiZoneServiceResourceList{}
if err := s.roResManager.List(ctx, mzSvcList); err != nil {
return errors.Wrap(err, "could not list of MeshMultiZoneServices")
}
if len(mzSvcList.Items) == 0 {
// skip fetching other resources if MeshMultiZoneService is not used
return nil
}

msList := &meshservice_api.MeshServiceResourceList{}
if err := s.roResManager.List(ctx, msList); err != nil {
return errors.Wrap(err, "could not list of MeshServices")
}

for _, mzSvc := range mzSvcList.Items {
var matched []meshmzservice_api.MatchedMeshService
ports := map[uint32]meshservice_api.Port{}
for _, svc := range msList.Items {
if matchesService(mzSvc, svc) {
matched = append(matched, meshmzservice_api.MatchedMeshService{
Name: svc.Meta.GetName(),
})
for _, port := range svc.Spec.Ports {
ports[port.Port] = port
}
}
}

var sortedPorts []meshservice_api.Port
for _, port := range util_maps.SortedKeys(ports) {
sortedPorts = append(sortedPorts, ports[port])
}

sort.Slice(matched, func(i, j int) bool {
return matched[i].Name < matched[j].Name
})

if !reflect.DeepEqual(mzSvc.Status.Ports, sortedPorts) ||
!reflect.DeepEqual(mzSvc.Status.MeshServices, matched) {
log := s.logger.WithValues("meshmultizoneservice", mzSvc.Meta.GetName())
mzSvc.Status.Ports = sortedPorts
mzSvc.Status.MeshServices = matched
log.Info("updating ports and mesh services", "matchedMeshServices", matched, "ports", sortedPorts)
if err := s.resManager.Update(ctx, mzSvc); err != nil {
log.Error(err, "could not update ports and mesh services")
}
}
}
return nil
}

func matchesService(mzSvc *meshmzservice_api.MeshMultiZoneServiceResource, svc *meshservice_api.MeshServiceResource) bool {
for label, value := range mzSvc.Spec.Selector.MeshService.MatchLabels {
if svc.Meta.GetLabels()[label] != value {
return false
}
}
return true
}

func (s *StatusUpdater) NeedLeaderElection() bool {
return true
}
112 changes: 112 additions & 0 deletions pkg/core/resources/apis/meshmultizoneservice/status_updater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package meshmultizoneservice_test

import (
"context"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/apis/meshmultizoneservice"
meshmzservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshmultizoneservice/api/v1alpha1"
meshservice_api "github.com/kumahq/kuma/pkg/core/resources/apis/meshservice/api/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
test_metrics "github.com/kumahq/kuma/pkg/test/metrics"
"github.com/kumahq/kuma/pkg/test/resources/samples"
)

var _ = Describe("Updater", func() {
var stopCh chan struct{}
var resManager manager.ResourceManager
var metrics core_metrics.Metrics

BeforeEach(func() {
m, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
metrics = m
resManager = manager.NewResourceManager(memory.NewStore())

updater, err := meshmultizoneservice.NewStatusUpdater(logr.Discard(), resManager, resManager, 50*time.Millisecond, m)
Expect(err).ToNot(HaveOccurred())
stopCh = make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(updater.Start(stopCh)).To(Succeed())
}()

Expect(samples.MeshDefaultBuilder().Create(resManager)).To(Succeed())
})

AfterEach(func() {
close(stopCh)
})

It("should add mesh services and port to the status of multizone service", func() {
// when
ms1Builder := samples.MeshServiceBackendBuilder().
WithName("backend").
WithDataplaneTagsSelector(map[string]string{
"app": "backend",
}).
WithLabels(map[string]string{
mesh_proto.DisplayName: "backend",
mesh_proto.ZoneTag: "east",
})
Expect(ms1Builder.Create(resManager)).To(Succeed())
Expect(samples.MeshServiceWebBuilder().Create(resManager)).To(Succeed()) // to check if we ignore it
Expect(samples.MeshMultiZoneServiceBackendBuilder().Create(resManager)).To(Succeed())

// then
Eventually(func(g Gomega) {
mzsvc := meshmzservice_api.NewMeshMultiZoneServiceResource()

err := resManager.Get(context.Background(), mzsvc, store.GetByKey("backend", model.DefaultMesh))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mzsvc.Status.MeshServices).To(Equal([]meshmzservice_api.MatchedMeshService{{Name: "backend"}}))
g.Expect(mzsvc.Status.Ports).To(Equal(ms1Builder.Build().Spec.Ports))
}, "10s", "100ms").Should(Succeed())

// when new service is added
ms2Builder := samples.MeshServiceBackendBuilder().
WithName("backend-syncedhash").
WithDataplaneTagsSelector(map[string]string{
"app": "backend",
}).
AddIntPort(71, 8081, core_mesh.ProtocolHTTP).
WithLabels(map[string]string{
mesh_proto.DisplayName: "backend",
mesh_proto.ZoneTag: "west",
})
Expect(ms2Builder.Create(resManager)).To(Succeed())

// then
Eventually(func(g Gomega) {
mzsvc := meshmzservice_api.NewMeshMultiZoneServiceResource()

err := resManager.Get(context.Background(), mzsvc, store.GetByKey("backend", model.DefaultMesh))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mzsvc.Status.MeshServices).To(Equal([]meshmzservice_api.MatchedMeshService{
{Name: "backend"},
{Name: "backend-syncedhash"},
}))
// ports are sorted
g.Expect(mzsvc.Status.Ports).To(Equal([]meshservice_api.Port{
ms2Builder.Build().Spec.Ports[1],
ms1Builder.Build().Spec.Ports[0],
}))
}, "10s", "100ms").Should(Succeed())
})

It("should emit metric", func() {
Eventually(func(g Gomega) {
g.Expect(test_metrics.FindMetric(metrics, "component_mzms_status_updater")).ToNot(BeNil())
}, "10s", "100ms").Should(Succeed())
})
})
11 changes: 11 additions & 0 deletions pkg/core/resources/apis/meshmultizoneservice/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package meshmultizoneservice_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestMeshMultizoneService(t *testing.T) {
test.RunSpecs(t, "MeshMultiZoneService Suite")
}
Loading

0 comments on commit 705f084

Please sign in to comment.