Skip to content

Commit

Permalink
Customize the client rate limiters of the discovery package
Browse files Browse the repository at this point in the history
  • Loading branch information
giorio94 committed Aug 16, 2021
1 parent 38cf37b commit 087a21a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 1 deletion.
4 changes: 3 additions & 1 deletion cmd/discovery/main.go
Expand Up @@ -21,6 +21,7 @@ import (
searchdomainoperator "github.com/liqotech/liqo/internal/discovery/search-domain-operator"
"github.com/liqotech/liqo/pkg/clusterid"
"github.com/liqotech/liqo/pkg/mapperUtils"
"github.com/liqotech/liqo/pkg/utils/restcfg"
)

var (
Expand Down Expand Up @@ -53,13 +54,14 @@ func main() {
flag.Int64Var(&dialTCPTimeout,
"dialTcpTimeout", 500, "Time to wait for a TCP connection to a remote cluster before to consider it as not reachable (milliseconds)")

restcfg.InitFlags(nil)
klog.InitFlags(nil)
flag.Parse()

klog.Info("Namespace: ", namespace)
klog.Info("RequeueAfter: ", requeueAfter)

config := ctrl.GetConfigOrDie()
config := restcfg.SetRateLimiter(ctrl.GetConfigOrDie())
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Errorf("Failed to create a new Kubernetes client: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/utils/restcfg/doc.go
@@ -0,0 +1,2 @@
// Package restcfg contains utility functions to deal with rest configs.
package restcfg
42 changes: 42 additions & 0 deletions pkg/utils/restcfg/ratelimiting.go
@@ -0,0 +1,42 @@
package restcfg

import (
"flag"

"k8s.io/client-go/rest"
)

const (
// DefaultQPS -> The default QPS value assigned to client-go clients.
DefaultQPS = uint(100)
// DefaultBurst -> The default burst value assigned to client-go clients.
DefaultBurst = uint(100)
)

var (
qps = DefaultQPS
burst = DefaultBurst
)

// InitFlags initializes the flags to configure the rate limiter parameters.
func InitFlags(flagset *flag.FlagSet) {
if flagset == nil {
flagset = flag.CommandLine
}

flagset.UintVar(&qps, "client-qps", qps, "The maximum number of queries per second performed towards the API server.")
flagset.UintVar(&burst, "client-max-burst", qps, "The maximum burst of requests in excess of the rate limit towards the API server.")
}

// SetRateLimiter configures the rate limiting parameters of the given rest configuration
// to the values obtained from the command line parameters.
func SetRateLimiter(cfg *rest.Config) *rest.Config {
return SetRateLimiterWithCustomParamenters(cfg, float32(qps), int(burst))
}

// SetRateLimiterWithCustomParamenters configures the rate limiting parameters of the given rest configuration.
func SetRateLimiterWithCustomParamenters(cfg *rest.Config, qps float32, burst int) *rest.Config {
cfg.QPS = qps
cfg.Burst = burst
return cfg
}
65 changes: 65 additions & 0 deletions pkg/utils/restcfg/ratelimiting_test.go
@@ -0,0 +1,65 @@
package restcfg_test

import (
"flag"
"strconv"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"

"github.com/liqotech/liqo/pkg/utils/restcfg"
)

var _ = Describe("The rate limiting utility functions", func() {

var (
cfg rest.Config
output *rest.Config
)

const (
qps = 67
burst = 89
)

Describe("the SetRateLimiter function", func() {
Context("configuring the rate limiting parameters", func() {
var fs flag.FlagSet

BeforeEach(func() {
fs = *flag.NewFlagSet("test-flags", flag.PanicOnError)
restcfg.InitFlags(&fs)
})
JustBeforeEach(func() { output = restcfg.SetRateLimiter(&cfg) })

When("using the default configuration", func() {
It("should return a pointer to the original object", func() { Expect(output).To(BeIdenticalTo(&cfg)) })
It("should set the default QPS value", func() { Expect(cfg.QPS).To(BeNumerically("==", restcfg.DefaultQPS)) })
It("should set the default burst value", func() { Expect(cfg.Burst).To(BeNumerically("==", restcfg.DefaultBurst)) })
})

When("specifying a custom configuration", func() {
BeforeEach(func() {
utilruntime.Must(fs.Set("client-qps", strconv.FormatInt(qps, 10)))
utilruntime.Must(fs.Set("client-max-burst", strconv.FormatInt(burst, 10)))
})

It("should return a pointer to the original object", func() { Expect(output).To(BeIdenticalTo(&cfg)) })
It("should set the desired QPS value", func() { Expect(cfg.QPS).To(BeNumerically("==", qps)) })
It("should set the desired burst value", func() { Expect(cfg.Burst).To(BeNumerically("==", burst)) })
})
})
})

Describe("the SetRateLimiterWithCustomParamenters function", func() {
Context("configuring the rate limiting parameters", func() {
JustBeforeEach(func() { output = restcfg.SetRateLimiterWithCustomParamenters(&cfg, qps, burst) })

It("should return a pointer to the original object", func() { Expect(output).To(BeIdenticalTo(&cfg)) })
It("should set the desired QPS value", func() { Expect(cfg.QPS).To(BeNumerically("==", qps)) })
It("should set the desired burst value", func() { Expect(cfg.Burst).To(BeNumerically("==", burst)) })
})
})
})
13 changes: 13 additions & 0 deletions pkg/utils/restcfg/restcfg_suite_test.go
@@ -0,0 +1,13 @@
package restcfg_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestRestcfg(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "RestConfig Suite")
}

0 comments on commit 087a21a

Please sign in to comment.