Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Optimize store layer error code return and instance query cache #1137

Merged
merged 37 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
09f2023
fix issue #629 (#693)
chuntaojun Sep 23, 2022
704febb
docs:add error code desc
chuntaojun Sep 25, 2022
f5569d5
fix:调整license-checker的触发
chuntaojun Oct 19, 2022
fd23b1b
fix:调整license-checker的触发
chuntaojun Oct 19, 2022
949a358
feat:support rls to push xds client
chuntaojun May 23, 2023
8165e91
feat:support rls to push xds client
chuntaojun May 23, 2023
cdc045b
feat:support rls to push xds client
chuntaojun May 24, 2023
381e419
feat:support rls to push xds client
chuntaojun May 24, 2023
84de557
feat:support rls to push xds client
chuntaojun May 24, 2023
1e40505
feat:support rls to push xds client
chuntaojun May 25, 2023
bd5fa21
feat:support rls to push xds client
chuntaojun May 25, 2023
d1c0862
feat:support rls to push xds client
chuntaojun May 25, 2023
04d3635
feat:support rls to push xds client
chuntaojun May 26, 2023
bbd0d8b
feat:support rls to push xds client
chuntaojun May 26, 2023
b400f15
feat:support rls to push xds client
chuntaojun May 26, 2023
abe05be
feat:support rls to push xds client
chuntaojun May 26, 2023
94ef6ae
feat:support rls to push xds client
chuntaojun May 26, 2023
11d2a79
feat:support rls to push xds client
chuntaojun May 26, 2023
bf76e44
feat:support rls to push xds client
chuntaojun May 26, 2023
2c8165d
feat:support rls to push xds client
chuntaojun May 27, 2023
86a98c3
feat:support rls to push xds client
chuntaojun May 31, 2023
149e76f
feat:support rls to push xds client
chuntaojun May 31, 2023
abad7d1
feat:support rls to push xds client
chuntaojun May 31, 2023
4bf54c3
feat:support rls to push xds client
chuntaojun May 31, 2023
cea410f
feat:support rls to push xds client
chuntaojun May 31, 2023
ef420f4
feat:support rls to push xds client
chuntaojun May 31, 2023
d7ab931
feat:support rls to push xds client
chuntaojun May 31, 2023
4a840ce
feat:support rls to push xds client
chuntaojun May 31, 2023
524670c
feat:support rls to push xds client
chuntaojun May 31, 2023
f9680d9
feat:support rls to push xds client
chuntaojun May 31, 2023
1281efc
feat:support rls to push xds client
chuntaojun May 31, 2023
96d9fcd
feat:support rls to push xds client
chuntaojun Jun 1, 2023
c5f3857
feat:support rls to push xds client
chuntaojun Jun 1, 2023
49b91b1
feat:support rls to push xds client
chuntaojun Jun 1, 2023
77440d5
feat:support rls to push xds client
chuntaojun Jun 1, 2023
d78399b
feat:support rls to push xds client
chuntaojun Jun 1, 2023
b8699fd
feat:support rls to push xds client
chuntaojun Jun 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ jobs:
if [[ "$(uname)" == "Darwin" ]]; then
# Mac OS X 操作系统
echo "Run on MacOS"
sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
else
# GNU/Linux操作系统
echo "Run on Linux"
sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
fi

chmod +x ./tool/*.sh
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ jobs:
if [[ "$(uname)" == "Darwin" ]]; then
# Mac OS X 操作系统
echo "Run on MacOS"
sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
else
# GNU/Linux操作系统
echo "Run on Linux"
sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
fi

chmod +x ./tool/*.sh
Expand Down
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedClientsJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedClients] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err := decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedClients] parse config err: %v", err)
return err
}
if cfg.ClientCleanTimeout < 2*time.Minute {
cfg.ClientCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedInstances] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err = decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] parse config err: %v", err)
return err
}
if cfg.InstanceCleanTimeout < 2*time.Minute {
cfg.InstanceCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
36 changes: 18 additions & 18 deletions admin/job/delete_empty_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ import (
"github.com/polarismesh/polaris/store"
)

type DeleteEmptyAutoCreatedServiceJobConfig struct {
type DeleteEmptyServiceJobConfig struct {
ServiceDeleteTimeout time.Duration `mapstructure:"serviceDeleteTimeout"`
}

type deleteEmptyAutoCreatedServiceJob struct {
cfg *DeleteEmptyAutoCreatedServiceJobConfig
type deleteEmptyServiceJob struct {
cfg *DeleteEmptyServiceJobConfig
namingServer service.DiscoverServer
cacheMgn *cache.CacheManager
storage store.Store
emptyServices map[string]time.Time
}

func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{
func (job *deleteEmptyServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyServiceJobConfig{
ServiceDeleteTimeout: 30 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
Expand All @@ -53,40 +53,40 @@ func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) er
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] parse config err: %v", err)
return err
}
job.cfg = cfg
job.emptyServices = map[string]time.Time{}
return nil
}

func (job *deleteEmptyAutoCreatedServiceJob) execute() {
err := job.deleteEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) execute() {
err := job.deleteEmptyServices()
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] delete empty autocreated services, err: %v", err)
}
}

func (job *deleteEmptyAutoCreatedServiceJob) interval() time.Duration {
func (job *deleteEmptyServiceJob) interval() time.Duration {
return job.cfg.ServiceDeleteTimeout
}

func (job *deleteEmptyAutoCreatedServiceJob) clear() {
func (job *deleteEmptyServiceJob) clear() {
job.emptyServices = map[string]time.Time{}
}

func (job *deleteEmptyAutoCreatedServiceJob) getEmptyAutoCreatedServices() []*model.Service {
services := job.getAllEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) getEmptyServices() []*model.Service {
services := job.getAllEmptyServices()
return job.filterToDeletedServices(services, time.Now(), job.cfg.ServiceDeleteTimeout)
}

func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []*model.Service {
func (job *deleteEmptyServiceJob) getAllEmptyServices() []*model.Service {
var res []*model.Service
_ = job.cacheMgn.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) {
if svc.IsAlias() {
Expand All @@ -101,7 +101,7 @@ func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []
return res
}

func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []*model.Service,
func (job *deleteEmptyServiceJob) filterToDeletedServices(services []*model.Service,
now time.Time, timeout time.Duration) []*model.Service {
var toDeleteServices []*model.Service
m := map[string]time.Time{}
Expand All @@ -122,8 +122,8 @@ func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []
return toDeleteServices
}

func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() error {
emptyServices := job.getEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) deleteEmptyServices() error {
emptyServices := job.getEmptyServices()

deleteBatchSize := 100
for i := 0; i < len(emptyServices); i += deleteBatchSize {
Expand Down
12 changes: 6 additions & 6 deletions admin/job/delete_empty_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInit(t *testing.T) {
"serviceDeleteTimeout": "1m",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err != nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config, err: %v", err)
t.Errorf("init deleteEmptyServiceJob config, err: %v", err)
}

if job.cfg.ServiceDeleteTimeout != expectValue {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config. expect: %s, actual: %s",
t.Errorf("init deleteEmptyServiceJob config. expect: %s, actual: %s",
expectValue, job.cfg.ServiceDeleteTimeout)
}
}
Expand All @@ -47,15 +47,15 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInitErr(t *testing.T) {
"serviceDeleteTimeout": "xx",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err == nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config should err")
t.Errorf("init deleteEmptyServiceJob config should err")
}
}

func Test_FilterToDeletedServices(t *testing.T) {
job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
t1, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:01:00")
t2, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:02:00")
job.emptyServices = map[string]time.Time{
Expand Down
45 changes: 30 additions & 15 deletions admin/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
jobs: map[string]maintainJob{
"DeleteUnHealthyInstance": &deleteUnHealthyInstanceJob{
namingServer: namingServer, storage: storage},
"DeleteEmptyAutoCreatedService": &deleteEmptyAutoCreatedServiceJob{
"DeleteEmptyService": &deleteEmptyServiceJob{
namingServer: namingServer, cacheMgn: cacheMgn, storage: storage},
"CleanDeletedInstances": &cleanDeletedInstancesJob{
storage: storage},
Expand All @@ -68,30 +68,45 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
log.Infof("[Maintain][Job] job (%s) not enable", cfg.Name)
continue
}
job, ok := mj.jobs[cfg.Name]
jobName := parseJobName(cfg.Name)
job, ok := mj.findAdminJob(jobName)
if !ok {
return fmt.Errorf("[Maintain][Job] job (%s) not exist", cfg.Name)
return fmt.Errorf("[Maintain][Job] job (%s) not exist", jobName)
}
_, ok = mj.startedJobs[cfg.Name]
if ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", cfg.Name)
if _, ok := mj.startedJobs[jobName]; ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", jobName)
}
err := job.init(cfg.Option)
if err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", cfg.Name, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", cfg.Name)
if err := job.init(cfg.Option); err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", jobName, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", jobName)
}
err = mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + cfg.Name)
if err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err)
if err := mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + jobName); err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", jobName, err)
return err
}
runAdminJob(ctx, cfg.Name, job.interval(), job, mj.storage)
mj.startedJobs[cfg.Name] = job
runAdminJob(ctx, jobName, job.interval(), job, mj.storage)
mj.startedJobs[jobName] = job
}
return nil
}

func parseJobName(name string) string {
// 兼容老配置
if name == "DeleteEmptyAutoCreatedService" {
name = "DeleteEmptyService"
}
return name
}

func (mj *MaintainJobs) findAdminJob(name string) (maintainJob, bool) {
job, ok := mj.jobs[name]
if !ok {
return nil, false
}

return job, true
}

// StopMaintainJobs
func (mj *MaintainJobs) StopMaintainJobs() {
if mj.cancel != nil {
Expand Down
3 changes: 2 additions & 1 deletion admin/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
connlimit "github.com/polarismesh/polaris/common/conn/limit"
commonlog "github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/model"
commonstore "github.com/polarismesh/polaris/common/store"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/plugin"
)
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *a
if err := s.storage.CleanInstance(instanceID); err != nil {
log.Error("Clean instance",
zap.String("err", err.Error()), utils.ZapRequestID(utils.ParseRequestID(ctx)))
return api.NewInstanceResponse(apimodel.Code_StoreLayerException, req)
return api.NewInstanceResponse(commonstore.StoreCode2APICode(err), req)
}

log.Info("Clean instance", utils.ZapRequestID(utils.ParseRequestID(ctx)), utils.ZapInstanceID(instanceID))
Expand Down
1 change: 1 addition & 0 deletions apiserver/grpcserver/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (g *GRPCServer) Run(errCh chan error) {
if config.Enable {
// 注册 v1 版本的 spec discover server
apiservice.RegisterPolarisGRPCServer(server, g.v1server)
apiservice.RegisterPolarisHeartbeatGRPCServer(server, g.v1server)
openMethod, getErr := apiserver.GetClientOpenMethod(config.Include, g.GetProtocol())
if getErr != nil {
return getErr
Expand Down
2 changes: 1 addition & 1 deletion apiserver/grpcserver/discover/v1/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (g *DiscoverServer) Heartbeat(ctx context.Context, in *apiservice.Instance)
}

// BatchHeartbeat 批量上报心跳
func (g *DiscoverServer) BatchHeartbeat(svr apiservice.PolarisGRPC_BatchHeartbeatServer) error {
func (g *DiscoverServer) BatchHeartbeat(svr apiservice.PolarisHeartbeatGRPC_BatchHeartbeatServer) error {
ctx := grpcserver.ConvertContext(svr.Context())

for {
Expand Down
5 changes: 3 additions & 2 deletions apiserver/xdsserverv3/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xdsserverv3
import (
"context"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/gogo/protobuf/jsonpb"

Expand All @@ -42,7 +43,7 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
return nil
}

func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *corev3.Node) {
if cb.log.DebugEnabled() {
cb.log.Debugf("stream %d closed", id)
}
Expand All @@ -56,7 +57,7 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
return nil
}

func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) {
if cb.log.DebugEnabled() {
cb.log.Debugf("delta stream %d closed", id)
}
Expand Down