Skip to content

Commit

Permalink
feat:support rls to push xds client
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed May 26, 2023
1 parent 15f06fe commit cdaa103
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 86 deletions.
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedClientsJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedClients] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err := decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedClients] parse config err: %v", err)
return err
}
if cfg.ClientCleanTimeout < 2*time.Minute {
cfg.ClientCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedInstances] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err = decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] parse config err: %v", err)
return err
}
if cfg.InstanceCleanTimeout < 2*time.Minute {
cfg.InstanceCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
36 changes: 18 additions & 18 deletions admin/job/delete_empty_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ import (
"github.com/polarismesh/polaris/store"
)

type DeleteEmptyAutoCreatedServiceJobConfig struct {
type DeleteEmptyServiceJobConfig struct {
ServiceDeleteTimeout time.Duration `mapstructure:"serviceDeleteTimeout"`
}

type deleteEmptyAutoCreatedServiceJob struct {
cfg *DeleteEmptyAutoCreatedServiceJobConfig
type deleteEmptyServiceJob struct {
cfg *DeleteEmptyServiceJobConfig
namingServer service.DiscoverServer
cacheMgn *cache.CacheManager
storage store.Store
emptyServices map[string]time.Time
}

func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{
func (job *deleteEmptyServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyServiceJobConfig{
ServiceDeleteTimeout: 30 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
Expand All @@ -53,40 +53,40 @@ func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) er
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] parse config err: %v", err)
return err
}
job.cfg = cfg
job.emptyServices = map[string]time.Time{}
return nil
}

func (job *deleteEmptyAutoCreatedServiceJob) execute() {
err := job.deleteEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) execute() {
err := job.deleteEmptyServices()
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] delete empty autocreated services, err: %v", err)
}
}

func (job *deleteEmptyAutoCreatedServiceJob) interval() time.Duration {
func (job *deleteEmptyServiceJob) interval() time.Duration {
return job.cfg.ServiceDeleteTimeout
}

func (job *deleteEmptyAutoCreatedServiceJob) clear() {
func (job *deleteEmptyServiceJob) clear() {
job.emptyServices = map[string]time.Time{}
}

func (job *deleteEmptyAutoCreatedServiceJob) getEmptyAutoCreatedServices() []*model.Service {
services := job.getAllEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) getEmptyServices() []*model.Service {
services := job.getAllEmptyServices()
return job.filterToDeletedServices(services, time.Now(), job.cfg.ServiceDeleteTimeout)
}

func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []*model.Service {
func (job *deleteEmptyServiceJob) getAllEmptyServices() []*model.Service {
var res []*model.Service
_ = job.cacheMgn.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) {
if svc.IsAlias() {
Expand All @@ -101,7 +101,7 @@ func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []
return res
}

func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []*model.Service,
func (job *deleteEmptyServiceJob) filterToDeletedServices(services []*model.Service,
now time.Time, timeout time.Duration) []*model.Service {
var toDeleteServices []*model.Service
m := map[string]time.Time{}
Expand All @@ -122,8 +122,8 @@ func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []
return toDeleteServices
}

func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() error {
emptyServices := job.getEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) deleteEmptyServices() error {
emptyServices := job.getEmptyServices()

deleteBatchSize := 100
for i := 0; i < len(emptyServices); i += deleteBatchSize {
Expand Down
12 changes: 6 additions & 6 deletions admin/job/delete_empty_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInit(t *testing.T) {
"serviceDeleteTimeout": "1m",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err != nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config, err: %v", err)
t.Errorf("init deleteEmptyServiceJob config, err: %v", err)
}

if job.cfg.ServiceDeleteTimeout != expectValue {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config. expect: %s, actual: %s",
t.Errorf("init deleteEmptyServiceJob config. expect: %s, actual: %s",
expectValue, job.cfg.ServiceDeleteTimeout)
}
}
Expand All @@ -47,15 +47,15 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInitErr(t *testing.T) {
"serviceDeleteTimeout": "xx",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err == nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config should err")
t.Errorf("init deleteEmptyServiceJob config should err")
}
}

func Test_FilterToDeletedServices(t *testing.T) {
job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
t1, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:01:00")
t2, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:02:00")
job.emptyServices = map[string]time.Time{
Expand Down
45 changes: 30 additions & 15 deletions admin/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
jobs: map[string]maintainJob{
"DeleteUnHealthyInstance": &deleteUnHealthyInstanceJob{
namingServer: namingServer, storage: storage},
"DeleteEmptyAutoCreatedService": &deleteEmptyAutoCreatedServiceJob{
"DeleteEmptyService": &deleteEmptyServiceJob{
namingServer: namingServer, cacheMgn: cacheMgn, storage: storage},
"CleanDeletedInstances": &cleanDeletedInstancesJob{
storage: storage},
Expand All @@ -68,30 +68,45 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
log.Infof("[Maintain][Job] job (%s) not enable", cfg.Name)
continue
}
job, ok := mj.jobs[cfg.Name]
jobName := parseJobName(cfg.Name)
job, ok := mj.findAdminJob(jobName)
if !ok {
return fmt.Errorf("[Maintain][Job] job (%s) not exist", cfg.Name)
return fmt.Errorf("[Maintain][Job] job (%s) not exist", jobName)
}
_, ok = mj.startedJobs[cfg.Name]
if ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", cfg.Name)
if _, ok := mj.startedJobs[jobName]; ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", jobName)
}
err := job.init(cfg.Option)
if err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", cfg.Name, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", cfg.Name)
if err := job.init(cfg.Option); err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", jobName, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", jobName)
}
err = mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + cfg.Name)
if err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err)
if err := mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + jobName); err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", jobName, err)
return err
}
runAdminJob(ctx, cfg.Name, job.interval(), job, mj.storage)
mj.startedJobs[cfg.Name] = job
runAdminJob(ctx, jobName, job.interval(), job, mj.storage)
mj.startedJobs[jobName] = job
}
return nil
}

func parseJobName(name string) string {
// 兼容老配置
if name == "DeleteEmptyAutoCreatedService" {
name = "DeleteEmptyService"
}
return name
}

func (mj *MaintainJobs) findAdminJob(name string) (maintainJob, bool) {
job, ok := mj.jobs[name]
if !ok {
return nil, false
}

return job, true
}

// StopMaintainJobs
func (mj *MaintainJobs) StopMaintainJobs() {
if mj.cancel != nil {
Expand Down
87 changes: 46 additions & 41 deletions release/conf/polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ bootstrap:
key: sz # Global lock
# Register as Arctic Star Service
polaris_service:
# Obtain the IP of the VM or POD where Polaris is located by making a TCP connection with the probe_adreess address
# probe_address: ##DB_ADDR##
enable_register: true
isolated: false
Expand All @@ -217,11 +218,13 @@ apiservers:
whiteList: 127.0.0.1
purgeCounterInterval: 10s
purgeCounterExpired: 5s
- name: api-http # Agreement name, the only global situation
- name: api-http
option:
listenIP: "0.0.0.0"
listenPort: 8090
enablePprof: true # debug pprof
# debug pprof switch
enablePprof: true
# swagger docs switch
enableSwagger: true
connLimit:
openConnLimit: false
Expand Down Expand Up @@ -286,32 +289,21 @@ apiservers:
# clusterName: cl5.discover
# Core logic configuration
auth:
# Inspection plug -in
name: defaultAuth
option:
# Token encrypted SALT, you need to rely on this SALT to decrypt the information of the Token when analyzing the Token
# The length of SALT needs to satisfy the following one:len(salt) in [16, 24, 32]
salt: polarismesh@2021
# Console power switch, open default
consoleOpen: true
# Customer inspection ability switch, default shutdown
clientOpen: false
# auth:
# # auth's option has migrated to auth.user and auth.strategy
# # it's still available when filling auth.option, but you will receive warning log that auth.option has deprecated.
# user:
# name: defaultUser
# option:
# # Token encrypted SALT, you need to rely on this SALT to decrypt the information of the Token when analyzing the Token
# # The length of SALT needs to satisfy the following one:len(salt) in [16, 24, 32]
# salt: polarismesh@2021
# strategy:
# name: defaultStrategy
# option:
# # Console power switch, open default
# consoleOpen: true
# # Customer inspection ability switch, default close
# clientOpen: false
# auth's option has migrated to auth.user and auth.strategy
# it's still available when filling auth.option, but you will receive warning log that auth.option has deprecated.
user:
name: defaultUser
option:
# Token encrypted SALT, you need to rely on this SALT to decrypt the information of the Token when analyzing the Token
# The length of SALT needs to satisfy the following one:len(salt) in [16, 24, 32]
salt: polarismesh@2021
strategy:
name: defaultStrategy
option:
# Console power switch, open default
consoleOpen: true
# Customer inspection ability switch, default close
clientOpen: false
namespace:
# Whether to allow automatic creation of naming space
autoCreate: true
Expand Down Expand Up @@ -419,7 +411,8 @@ cache:
- name: users # Load user and user group data
- name: strategyRule # Loading the rules of appraisal
- name: namespace # Load the naming space data
- name: client # Load Client-SDK instance data
# Load Client-SDK instance data
- name: client
- name: configFile
option:
# Configuration file cache expires time, unit S
Expand Down Expand Up @@ -496,7 +489,8 @@ plugin:
discoverStatis:
name: discoverLocal
option:
interval: 60 # Statistical interval, the unit is second
# Statistical interval, the unit is second
interval: 60
statis:
entries:
- name: local
Expand All @@ -506,29 +500,40 @@ plugin:
ratelimit:
name: token-bucket
option:
remote-conf: false # Whether to use remote configuration
ip-limit: # IP -level current, global
open: false # Whether the system opens IP -level current limit
# Whether to use remote configuration
remote-conf: false
# IP -level current, global
ip-limit:
# Whether the system opens IP -level current limit
open: false
global:
open: false
bucket: 300 # Maximum peak
rate: 200 # The average number of requests per second of IP
resource-cache-amount: 1024 # Number of IP of the maximum cache
# Maximum peak
bucket: 300
# The average number of requests per second of IP
rate: 200
# Number of IP of the maximum cache
resource-cache-amount: 1024
white-list: [127.0.0.1]
instance-limit:
open: false
global:
bucket: 200
rate: 100
resource-cache-amount: 1024
api-limit: # Interface-level current limit
open: false # Whether to turn on the interface restriction and global switch, only for TRUE can it represent the flow restriction on the system.By default
# Interface-level ratelimit limit
api-limit:
# Whether to turn on the interface restriction and global switch, only for TRUE can it represent the flow restriction on the system.By default
open: false
rules:
- name: store-read
limit:
open: false # The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global
bucket: 2000 # The maximum value of token barrels
rate: 1000 # The number of token generated per second
# The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global
open: false
# The maximum value of token barrels
bucket: 2000
# The number of token generated per second
rate: 1000
- name: store-write
limit:
open: false
Expand Down

0 comments on commit cdaa103

Please sign in to comment.