Skip to content

Commit

Permalink
[Refactor] Optimize store layer error code return and instance query …
Browse files Browse the repository at this point in the history
…cache (#1137)
  • Loading branch information
chuntaojun committed Jun 1, 2023
1 parent 90b39c8 commit 258b853
Show file tree
Hide file tree
Showing 115 changed files with 1,888 additions and 1,187 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ jobs:
if [[ "$(uname)" == "Darwin" ]]; then
# Mac OS X 操作系统
echo "Run on MacOS"
sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
else
# GNU/Linux操作系统
echo "Run on Linux"
sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
fi
chmod +x ./tool/*.sh
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ jobs:
if [[ "$(uname)" == "Darwin" ]]; then
# Mac OS X 操作系统
echo "Run on MacOS"
sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i '' 's/consoleOpen: true/consoleOpen: false/g' conf/polaris-server.yaml
else
# GNU/Linux操作系统
echo "Run on Linux"
sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
# sed -i 's/consoleOpen: \(true\|false\)/consoleOpen: false/g' conf/polaris-server.yaml
fi
chmod +x ./tool/*.sh
Expand Down
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedClientsJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedClients] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err := decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedClients] parse config err: %v", err)
return err
}
if cfg.ClientCleanTimeout < 2*time.Minute {
cfg.ClientCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
7 changes: 4 additions & 3 deletions admin/job/clean_deleted_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error {
log.Errorf("[Maintain][Job][CleanDeletedInstances] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
if err = decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] parse config err: %v", err)
return err
}
if cfg.InstanceCleanTimeout < 2*time.Minute {
cfg.InstanceCleanTimeout = 2 * time.Minute
}
job.cfg = cfg

return nil
}

Expand Down
36 changes: 18 additions & 18 deletions admin/job/delete_empty_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ import (
"github.com/polarismesh/polaris/store"
)

type DeleteEmptyAutoCreatedServiceJobConfig struct {
type DeleteEmptyServiceJobConfig struct {
ServiceDeleteTimeout time.Duration `mapstructure:"serviceDeleteTimeout"`
}

type deleteEmptyAutoCreatedServiceJob struct {
cfg *DeleteEmptyAutoCreatedServiceJobConfig
type deleteEmptyServiceJob struct {
cfg *DeleteEmptyServiceJobConfig
namingServer service.DiscoverServer
cacheMgn *cache.CacheManager
storage store.Store
emptyServices map[string]time.Time
}

func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{
func (job *deleteEmptyServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyServiceJobConfig{
ServiceDeleteTimeout: 30 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
Expand All @@ -53,40 +53,40 @@ func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) er
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] parse config err: %v", err)
return err
}
job.cfg = cfg
job.emptyServices = map[string]time.Time{}
return nil
}

func (job *deleteEmptyAutoCreatedServiceJob) execute() {
err := job.deleteEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) execute() {
err := job.deleteEmptyServices()
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyServiceJob] delete empty autocreated services, err: %v", err)
}
}

func (job *deleteEmptyAutoCreatedServiceJob) interval() time.Duration {
func (job *deleteEmptyServiceJob) interval() time.Duration {
return job.cfg.ServiceDeleteTimeout
}

func (job *deleteEmptyAutoCreatedServiceJob) clear() {
func (job *deleteEmptyServiceJob) clear() {
job.emptyServices = map[string]time.Time{}
}

func (job *deleteEmptyAutoCreatedServiceJob) getEmptyAutoCreatedServices() []*model.Service {
services := job.getAllEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) getEmptyServices() []*model.Service {
services := job.getAllEmptyServices()
return job.filterToDeletedServices(services, time.Now(), job.cfg.ServiceDeleteTimeout)
}

func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []*model.Service {
func (job *deleteEmptyServiceJob) getAllEmptyServices() []*model.Service {
var res []*model.Service
_ = job.cacheMgn.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) {
if svc.IsAlias() {
Expand All @@ -101,7 +101,7 @@ func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []
return res
}

func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []*model.Service,
func (job *deleteEmptyServiceJob) filterToDeletedServices(services []*model.Service,
now time.Time, timeout time.Duration) []*model.Service {
var toDeleteServices []*model.Service
m := map[string]time.Time{}
Expand All @@ -122,8 +122,8 @@ func (job *deleteEmptyAutoCreatedServiceJob) filterToDeletedServices(services []
return toDeleteServices
}

func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() error {
emptyServices := job.getEmptyAutoCreatedServices()
func (job *deleteEmptyServiceJob) deleteEmptyServices() error {
emptyServices := job.getEmptyServices()

deleteBatchSize := 100
for i := 0; i < len(emptyServices); i += deleteBatchSize {
Expand Down
12 changes: 6 additions & 6 deletions admin/job/delete_empty_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInit(t *testing.T) {
"serviceDeleteTimeout": "1m",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err != nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config, err: %v", err)
t.Errorf("init deleteEmptyServiceJob config, err: %v", err)
}

if job.cfg.ServiceDeleteTimeout != expectValue {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config. expect: %s, actual: %s",
t.Errorf("init deleteEmptyServiceJob config. expect: %s, actual: %s",
expectValue, job.cfg.ServiceDeleteTimeout)
}
}
Expand All @@ -47,15 +47,15 @@ func Test_DeleteEmptyAutoCreatedServiceJobConfigInitErr(t *testing.T) {
"serviceDeleteTimeout": "xx",
}

job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
err := job.init(raw)
if err == nil {
t.Errorf("init deleteEmptyAutoCreatedServiceJob config should err")
t.Errorf("init deleteEmptyServiceJob config should err")
}
}

func Test_FilterToDeletedServices(t *testing.T) {
job := deleteEmptyAutoCreatedServiceJob{}
job := deleteEmptyServiceJob{}
t1, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:01:00")
t2, _ := time.Parse("2006-01-02 15:04:05", "2023-03-20 12:02:00")
job.emptyServices = map[string]time.Time{
Expand Down
45 changes: 30 additions & 15 deletions admin/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
jobs: map[string]maintainJob{
"DeleteUnHealthyInstance": &deleteUnHealthyInstanceJob{
namingServer: namingServer, storage: storage},
"DeleteEmptyAutoCreatedService": &deleteEmptyAutoCreatedServiceJob{
"DeleteEmptyService": &deleteEmptyServiceJob{
namingServer: namingServer, cacheMgn: cacheMgn, storage: storage},
"CleanDeletedInstances": &cleanDeletedInstancesJob{
storage: storage},
Expand All @@ -68,30 +68,45 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
log.Infof("[Maintain][Job] job (%s) not enable", cfg.Name)
continue
}
job, ok := mj.jobs[cfg.Name]
jobName := parseJobName(cfg.Name)
job, ok := mj.findAdminJob(jobName)
if !ok {
return fmt.Errorf("[Maintain][Job] job (%s) not exist", cfg.Name)
return fmt.Errorf("[Maintain][Job] job (%s) not exist", jobName)
}
_, ok = mj.startedJobs[cfg.Name]
if ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", cfg.Name)
if _, ok := mj.startedJobs[jobName]; ok {
return fmt.Errorf("[Maintain][Job] job (%s) duplicated", jobName)
}
err := job.init(cfg.Option)
if err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", cfg.Name, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", cfg.Name)
if err := job.init(cfg.Option); err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to init, err: %v", jobName, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to init", jobName)
}
err = mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + cfg.Name)
if err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err)
if err := mj.storage.StartLeaderElection(store.ElectionKeyMaintainJobPrefix + jobName); err != nil {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", jobName, err)
return err
}
runAdminJob(ctx, cfg.Name, job.interval(), job, mj.storage)
mj.startedJobs[cfg.Name] = job
runAdminJob(ctx, jobName, job.interval(), job, mj.storage)
mj.startedJobs[jobName] = job
}
return nil
}

func parseJobName(name string) string {
// 兼容老配置
if name == "DeleteEmptyAutoCreatedService" {
name = "DeleteEmptyService"
}
return name
}

func (mj *MaintainJobs) findAdminJob(name string) (maintainJob, bool) {
job, ok := mj.jobs[name]
if !ok {
return nil, false
}

return job, true
}

// StopMaintainJobs
func (mj *MaintainJobs) StopMaintainJobs() {
if mj.cancel != nil {
Expand Down
3 changes: 2 additions & 1 deletion admin/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
connlimit "github.com/polarismesh/polaris/common/conn/limit"
commonlog "github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/model"
commonstore "github.com/polarismesh/polaris/common/store"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/plugin"
)
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *a
if err := s.storage.CleanInstance(instanceID); err != nil {
log.Error("Clean instance",
zap.String("err", err.Error()), utils.ZapRequestID(utils.ParseRequestID(ctx)))
return api.NewInstanceResponse(apimodel.Code_StoreLayerException, req)
return api.NewInstanceResponse(commonstore.StoreCode2APICode(err), req)
}

log.Info("Clean instance", utils.ZapRequestID(utils.ParseRequestID(ctx)), utils.ZapInstanceID(instanceID))
Expand Down
1 change: 1 addition & 0 deletions apiserver/grpcserver/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (g *GRPCServer) Run(errCh chan error) {
if config.Enable {
// 注册 v1 版本的 spec discover server
apiservice.RegisterPolarisGRPCServer(server, g.v1server)
apiservice.RegisterPolarisHeartbeatGRPCServer(server, g.v1server)
openMethod, getErr := apiserver.GetClientOpenMethod(config.Include, g.GetProtocol())
if getErr != nil {
return getErr
Expand Down
2 changes: 1 addition & 1 deletion apiserver/grpcserver/discover/v1/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (g *DiscoverServer) Heartbeat(ctx context.Context, in *apiservice.Instance)
}

// BatchHeartbeat 批量上报心跳
func (g *DiscoverServer) BatchHeartbeat(svr apiservice.PolarisGRPC_BatchHeartbeatServer) error {
func (g *DiscoverServer) BatchHeartbeat(svr apiservice.PolarisHeartbeatGRPC_BatchHeartbeatServer) error {
ctx := grpcserver.ConvertContext(svr.Context())

for {
Expand Down
5 changes: 3 additions & 2 deletions apiserver/xdsserverv3/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xdsserverv3
import (
"context"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/gogo/protobuf/jsonpb"

Expand All @@ -42,7 +43,7 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
return nil
}

func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *corev3.Node) {
if cb.log.DebugEnabled() {
cb.log.Debugf("stream %d closed", id)
}
Expand All @@ -56,7 +57,7 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
return nil
}

func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) {
if cb.log.DebugEnabled() {
cb.log.Debugf("delta stream %d closed", id)
}
Expand Down

0 comments on commit 258b853

Please sign in to comment.