Skip to content

Commit

Permalink
handle rate limit config update event (apache#196)
Browse files Browse the repository at this point in the history
* init

* update dubbo-go to v1.5.6 & fmt sth.

* update

* fix fmt

* add gin

* baseinfo and resources

* Update go.mod

* update dubbo-go to v1.5.6 & fmt sth.

* api complete

* import

* import

* update dubbo-go to v1.5.6 & fmt sth.

* fix fmt

* update

* update

* update

* update

* replace path with id

* update

* update

* rateLimit filter

* add licence header

* fix imports

* add license header

* add license header

* fix ci

* update

* update

* comment

* use go 1.15 run ci to fix ci problem

* fix matcher var inline

* fix ci

* fix sync.RWMutex

* errors.warp

* test table

* use gost zk

* add lock

* remove unreachable statement

* Pattern pattern,omitempty

* update develop

* modify import and chinese comment

* update the docs for ratelimit

* update develop

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* ratelimit filter

* split func and bugfix

* fix go.mod go.sum

* fix go.mod go.sum

* fix go.mod go.sum

* ratelimit config change

* ratelimit config change

* ratelimit config change

* ratelimit config change

* ratelimit config change

* ratelimit config change

* ratelimit config change

* ratelimit config change

* update go.sum

* fix logger fmt style

* fix logger fmt style

* time delay when lazy load GenericService

* time delay when lazy load GenericService

* fix ci

* fix logger fmt style

* err.Error() is not a good style

* fix ci

Co-authored-by: randy <ztelur@gmail.com>
Co-authored-by: Zhenxu Ke <kezhenxu94@apache.org>
  • Loading branch information
3 people committed Jul 1, 2021
1 parent da865b0 commit fa5681d
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 54 deletions.
9 changes: 0 additions & 9 deletions cmd/pixiu/pixiu.go
Expand Up @@ -30,15 +30,6 @@ import (
"github.com/urfave/cli"
)

import (
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/logger"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/recovery"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/remote"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/response"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/timeout"
)

// Version pixiu version
var Version = "0.3.0"

Expand Down
4 changes: 3 additions & 1 deletion pkg/client/dubbo/dubbo.go
Expand Up @@ -260,7 +260,9 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *dg.Generic
dc.lock.Lock()
defer dc.lock.Unlock()
referenceConfig.GenericLoad(key)
time.Sleep(200 * time.Millisecond) // sleep to wait invoker create
//TODO: fix it later
// sleep to wait invoker create
time.Sleep(500 * time.Millisecond)
clientService := referenceConfig.GetRPCService().(*dg.GenericService)

dc.GenericServicePool[key] = clientService
Expand Down
190 changes: 175 additions & 15 deletions pkg/config/api_config.go
Expand Up @@ -27,6 +27,7 @@ import (

import (
fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
Expand All @@ -47,6 +48,12 @@ var (
lock sync.RWMutex
)

var (
BASE_INFO_NAME = "name"
BASE_INFO_DESC = "description"
BASE_INFO_PFP = "pluginFilePath"
)

// APIConfigResourceListener defines api resource and method config listener interface
type APIConfigResourceListener interface {
// ResourceChange handle modify resource event
Expand All @@ -61,6 +68,12 @@ type APIConfigResourceListener interface {
MethodAdd(res fc.Resource, method fc.Method) bool
// MethodDelete handle delete method event
MethodDelete(res fc.Resource, method fc.Method) bool

PluginPathChange(filePath string)

PluginGroupChange(group []fc.PluginsGroup)

RateLimitChange(*fr.Config)
}

// LoadAPIConfigFromFile load the api config from file
Expand All @@ -72,7 +85,7 @@ func LoadAPIConfigFromFile(path string) (*fc.APIConfig, error) {
apiConf := &fc.APIConfig{}
err := yaml.UnmarshalYMLConfig(path, apiConf)
if err != nil {
return nil, perrors.Errorf("unmarshalYmlConfig error %v", perrors.WithStack(err))
return nil, perrors.Errorf("unmarshalYmlConfig error %s", perrors.WithStack(err))
}
apiConfig = apiConf
return apiConf, nil
Expand All @@ -86,13 +99,13 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {
etcdv3.WithEndpoints(strings.Split(metaConfig.Address, ",")...),
)
if err != nil {
return nil, perrors.Errorf("Init etcd client fail error %v", err)
return nil, perrors.Errorf("Init etcd client fail error %s", err)
}

client = tmpClient
kList, vList, err := client.GetChildren(metaConfig.APIConfigPath)
if err != nil {
return nil, perrors.Errorf("Get remote config fail error %v", err)
return nil, perrors.Errorf("Get remote config fail error %s", err)
}
if err = initAPIConfigFromKVList(kList, vList); err != nil {
return nil, err
Expand All @@ -105,11 +118,21 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {

func initAPIConfigFromKVList(kList, vList []string) error {
var skList, svList, mkList, mvList []string
var baseInfo string
var pluginGroup []string
var rateLimit string

for i, k := range kList {
v := vList[i]
//handle base info
re := getCheckBaseInfoRegexp()
if m := re.Match([]byte(k)); m {
baseInfo = v
continue
}

// handle resource
re := getCheckResourceRegexp()
re = getCheckResourceRegexp()
if m := re.Match([]byte(k)); m {
skList = append(skList, k)
svList = append(svList, v)
Expand All @@ -122,32 +145,100 @@ func initAPIConfigFromKVList(kList, vList []string) error {
mvList = append(mvList, v)
continue
}

//handle plugin group
re = getCheckPluginsGroupRegexp()
if m := re.Match([]byte(k)); m {
pluginGroup = append(pluginGroup, v)
continue
}

//handle rate limit config
re = getCheckRatelimitRegexp()
if m := re.Match([]byte(k)); m {
rateLimit = v
continue
}
}

lock.Lock()
defer lock.Unlock()

tmpApiConf := &fc.APIConfig{}
if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil {
logger.Errorf("initBaseInfoFromString error %s", err)
return err
}
if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil {
logger.Error("initAPIConfigServiceFromKvList error %v", err.Error())
logger.Errorf("initAPIConfigServiceFromKvList error %s", err)
return err
}
if err := initAPIConfigMethodFromKvList(tmpApiConf, mkList, mvList); err != nil {
logger.Error("initAPIConfigMethodFromKvList error %v", err.Error())
logger.Errorf("initAPIConfigMethodFromKvList error %s", err)
return err
}
if err := initAPIConfigPluginsFromStringList(tmpApiConf, pluginGroup); err != nil {
logger.Errorf("initAPIConfigPluginsFromStringList error %s", err)
return err
}
if err := initAPIConfigRatelimitFromString(tmpApiConf, rateLimit); err != nil {
logger.Errorf("initAPIConfigRatelimitFromString error %s", err)
return err
}

apiConfig = tmpApiConf
return nil
}

func initBaseInfoFromString(conf *fc.APIConfig, str string) error {
properties := make(map[string]string, 8)
if err := yaml.UnmarshalYML([]byte(str), properties); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
if v, ok := properties[BASE_INFO_NAME]; ok {
conf.Name = v
}
if v, ok := properties[BASE_INFO_DESC]; ok {
conf.Description = v
}
if v, ok := properties[BASE_INFO_PFP]; ok {
conf.PluginFilePath = v
}
return nil
}

func initAPIConfigRatelimitFromString(conf *fc.APIConfig, str string) error {
c := fr.Config{}
if err := yaml.UnmarshalYML([]byte(str), &c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
conf.RateLimit = c
return nil
}

func initAPIConfigPluginsFromStringList(conf *fc.APIConfig, plugins []string) error {
var groups []fc.PluginsGroup
for _, v := range plugins {
g := fc.PluginsGroup{}
if err := yaml.UnmarshalYML([]byte(v), &g); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
groups = append(groups, g)
}
conf.PluginsGroup = groups
return nil
}

func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error {
for i := range kList {
v := vList[i]
method := &fc.Method{}
err := yaml.UnmarshalYML([]byte(v), method)
if err != nil {
logger.Error("unmarshalYmlConfig error %v", err.Error())
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}

Expand Down Expand Up @@ -188,7 +279,7 @@ func initAPIConfigServiceFromKvList(config *fc.APIConfig, kList, vList []string)
resource := &fc.Resource{}
err := yaml.UnmarshalYML([]byte(v), resource)
if err != nil {
logger.Error("unmarshalYmlConfig error %v", err.Error())
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}

Expand Down Expand Up @@ -219,7 +310,7 @@ func listenResourceAndMethodEvent(key string) bool {
for {
wc, err := client.WatchWithOption(key, clientv3.WithPrefix())
if err != nil {
logger.Warnf("Watch api config {key:%s} = error{%v}", key, err)
logger.Warnf("Watch api config {key:%s} = error{%s}", key, err)
return false
}

Expand Down Expand Up @@ -275,7 +366,7 @@ func handleDeleteEvent(key, val []byte) {
resourceIdStr := pathArray[len(pathArray)-1]
ID, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigResource(ID)
Expand All @@ -292,18 +383,24 @@ func handleDeleteEvent(key, val []byte) {
resourceIdStr := pathArray[len(pathArray)-3]
resourceId, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}

methodIdStr := pathArray[len(pathArray)-1]
methodId, err := strconv.Atoi(methodIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigMethod(resourceId, methodId)
}

re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
empty := &fr.Config{}
listener.RateLimitChange(empty)
}
}

func handlePutEvent(key, val []byte) {
Expand All @@ -315,7 +412,7 @@ func handlePutEvent(key, val []byte) {
res := &fc.Resource{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Error("handlePutEvent UnmarshalYML error %v", err)
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigResource(*res)
Expand All @@ -327,11 +424,32 @@ func handlePutEvent(key, val []byte) {
res := &fc.Method{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Error("handlePutEvent UnmarshalYML error %v", err)
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigMethod(res.ResourcePath, *res)
}

//handle base info
re = getCheckBaseInfoRegexp()
if m := re.Match(key); m {
mergeBaseInfo(val)
return
}

//handle plugins group
re = getCheckPluginsGroupRegexp()
if m := re.Match(key); m {
mergePluginGroup(val)
return
}

//handle ratelimit
re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
mergeRatelimit(val)
return
}
}

func deleteApiConfigResource(resourceId int) {
Expand Down Expand Up @@ -361,6 +479,36 @@ func mergeApiConfigResource(val fc.Resource) {
listener.ResourceAdd(val)
}

func mergeRatelimit(val []byte) {
c := &fr.Config{}
if err := yaml.UnmarshalYML(val, c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return
}
apiConfig.RateLimit = *c
listener.RateLimitChange(c)
}

func mergePluginGroup(val []byte) {
g := &fc.PluginsGroup{}
if err := yaml.UnmarshalYML(val, g); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return
}
for i, v := range apiConfig.PluginsGroup {
if v.GroupName == g.GroupName {
apiConfig.PluginsGroup[i] = *g
}
}
listener.PluginGroupChange(apiConfig.PluginsGroup)
}

func mergeBaseInfo(val []byte) {
_ = initBaseInfoFromString(apiConfig, string(val))

listener.PluginPathChange(apiConfig.PluginFilePath)
}

func deleteApiConfigMethod(resourceId, methodId int) {
for _, resource := range apiConfig.Resources {
if resource.ID != resourceId {
Expand Down Expand Up @@ -401,12 +549,24 @@ func mergeApiConfigMethod(path string, val fc.Method) {
}
}

func getCheckBaseInfoRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/base$")
}

func getCheckResourceRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/resources/[^/]+/?$")
}

func getExtractMethodRegexp() *regexp.Regexp {
return regexp.MustCompile("resources/([^/]+)/method/[^/]+/?$")
return regexp.MustCompile(".+/resources/([^/]+)/method/[^/]+/?$")
}

func getCheckPluginsGroupRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/filter/pluginGroup/[^/]+/?$")
}

func getCheckRatelimitRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/filter/ratelimit/[^/]+/?$")
}

// RegisterConfigListener register APIConfigListener
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/accesslog/access_log.go
Expand Up @@ -40,7 +40,7 @@ import (

var accessLogWriter = &model.AccessLogWriter{AccessLogDataChan: make(chan model.AccessLogData, constant.LogDataBuffer)}

func init() {
func Init() {
extension.SetFilterFunc(constant.AccessLogFilter, accessLog())
accessLogWriter.Write()
}
Expand Down

0 comments on commit fa5681d

Please sign in to comment.