Skip to content

Commit

Permalink
[ISSUE #1205] 修复清理软删除实例时没有同步清理 instance_metadata 以及 health_check 表的数据 (
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Aug 23, 2023
1 parent eb64f92 commit 2e59684
Show file tree
Hide file tree
Showing 56 changed files with 666 additions and 260 deletions.
1 change: 0 additions & 1 deletion admin/job/clean_deleted_instance.go
Expand Up @@ -68,7 +68,6 @@ func (job *cleanDeletedInstancesJob) execute() {
}

log.Infof("[Maintain][Job][CleanDeletedInstances] clean deleted instance count %d", count)

if count < batchSize {
break
}
Expand Down
1 change: 0 additions & 1 deletion apiserver/eurekaserver/eureka_suit_test.go
Expand Up @@ -28,7 +28,6 @@ import (
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/logger"
_ "github.com/polarismesh/polaris/plugin/statis/prometheus"

testsuit "github.com/polarismesh/polaris/test/suit"
)

Expand Down
10 changes: 8 additions & 2 deletions apiserver/eurekaserver/write_test.go
Expand Up @@ -95,14 +95,20 @@ func TestEurekaServer_renew(t *testing.T) {

ctrl := gomock.NewController(t)

mockTx := mock.NewMockTx(ctrl)
mockTx.EXPECT().Commit().Return(nil).AnyTimes()
mockTx.EXPECT().Rollback().Return(nil).AnyTimes()
mockTx.EXPECT().CreateReadView().Return(nil).AnyTimes()

mockStore := mock.NewMockStore(ctrl)
mockStore.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(map[string]*model.Instance{
insId: ins,
disableBeatInsId: disableBeatIns,
}, nil)
mockStore.EXPECT().StartReadTx().Return(mockTx, nil).AnyTimes()
mockStore.EXPECT().
GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Expand All @@ -114,7 +120,7 @@ func TestEurekaServer_renew(t *testing.T) {
},
}, nil)

mockStore.EXPECT().GetInstancesCount().AnyTimes().Return(uint32(1), nil)
mockStore.EXPECT().GetInstancesCountTx(gomock.Any()).AnyTimes().Return(uint32(1), nil)
mockStore.EXPECT().GetUnixSecond(gomock.Any()).AnyTimes().Return(time.Now().Unix(), nil)
mockStore.EXPECT().GetServicesCount().Return(uint32(1), nil).AnyTimes()
mockStore.EXPECT().StartLeaderElection(gomock.Any()).AnyTimes()
Expand Down
10 changes: 6 additions & 4 deletions apiserver/grpcserver/utils/help.go
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"go.uber.org/zap"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/common/log"
)
Expand All @@ -43,17 +45,17 @@ func GetClientOpenMethod(include []string, protocol string) (map[string]bool, er
for _, item := range include {
if methods, ok := clientAccess[item]; ok {
for _, method := range methods {
method = "/v1.Polaris" + strings.ToUpper(protocol) + "/" + method
recordMethod := "/v1.Polaris" + strings.ToUpper(protocol) + "/" + method
if item == apiserver.HealthcheckAccess && method != "Heartbeat" {
method = "/v1.PolarisHeartbeat" + strings.ToUpper(protocol) + "/" + method
recordMethod = "/v1.PolarisHeartbeat" + strings.ToUpper(protocol) + "/" + method
}
openMethod[method] = true
openMethod[recordMethod] = true
}
} else {
log.Errorf("method %s does not exist in %sserver client access", item, protocol)
return nil, fmt.Errorf("method %s does not exist in %sserver client access", item, protocol)
}
}

log.Info("[APIServer] client open method info", zap.Any("openMethod", openMethod))
return openMethod, nil
}
4 changes: 4 additions & 0 deletions apiserver/httpserver/config/server.go
Expand Up @@ -67,7 +67,11 @@ func (h *HTTPServer) GetConsoleAccessServer(include []string) (*restful.WebServi
switch item {
case defaultReadAccess:
h.addDefaultReadAccess(ws)
// 仅为了兼容老的客户端发现路径
h.addDiscover(ws)
case configConsoleAccess, defaultAccess:
// 仅为了兼容老的客户端发现路径
h.addDiscover(ws)
h.addDefaultAccess(ws)
}
}
Expand Down
4 changes: 2 additions & 2 deletions apiserver/httpserver/utils/handler.go
Expand Up @@ -319,7 +319,7 @@ func (h *Handler) WriteHeaderAndProto(obj api.ResponseMessage) {
h.Response.AddHeader(utils.PolarisRequestID, requestID)
h.Response.WriteHeader(status)

m := jsonpb.Marshaler{Indent: " ", EmitDefaults: false}
m := jsonpb.Marshaler{Indent: " ", EmitDefaults: true}
err := m.Marshal(h.Response, h.i18nAction(obj))
if err != nil {
log.Error(err.Error(), utils.ZapRequestID(requestID))
Expand All @@ -343,7 +343,7 @@ func (h *Handler) WriteHeaderAndProtoV2(obj api.ResponseMessage) {
h.Response.AddHeader(utils.PolarisRequestID, requestID)
h.Response.WriteHeader(status)

m := jsonpb.Marshaler{Indent: " ", EmitDefaults: false}
m := jsonpb.Marshaler{Indent: " ", EmitDefaults: true}
err := m.Marshal(h.Response, obj)
if err != nil {
log.Error(err.Error(), utils.ZapRequestID(requestID))
Expand Down
10 changes: 8 additions & 2 deletions auth/defaultauth/common_test.go
Expand Up @@ -91,9 +91,15 @@ func initCache(ctrl *gomock.Controller) (*cache.Config, *storemock.MockStore) {

storage := storemock.NewMockStore(ctrl)

mockTx := storemock.NewMockTx(ctrl)
mockTx.EXPECT().Commit().Return(nil).AnyTimes()
mockTx.EXPECT().Rollback().Return(nil).AnyTimes()
mockTx.EXPECT().CreateReadView().Return(nil).AnyTimes()

storage.EXPECT().StartReadTx().Return(mockTx, nil).AnyTimes()
storage.EXPECT().GetServicesCount().AnyTimes().Return(uint32(1), nil)
storage.EXPECT().GetInstancesCount().AnyTimes().Return(uint32(1), nil)
storage.EXPECT().GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(map[string]*model.Instance{
storage.EXPECT().GetInstancesCountTx(gomock.Any()).AnyTimes().Return(uint32(1), nil)
storage.EXPECT().GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(map[string]*model.Instance{
"123": {
Proto: &service_manage.Instance{
Id: wrapperspb.String(uuid.NewString()),
Expand Down
3 changes: 1 addition & 2 deletions auth/defaultauth/strategy_test.go
Expand Up @@ -92,7 +92,6 @@ func newStrategyTest(t *testing.T) *StrategyTest {
storage.EXPECT().GetStrategyResources(gomock.Eq(users[1].ID), gomock.Any()).AnyTimes().Return(strategies[1].Resources, nil)
storage.EXPECT().GetStrategyResources(gomock.Eq(groups[1].ID), gomock.Any()).AnyTimes().Return(strategies[len(users)-1+2].Resources, nil)


ctx, cancel := context.WithCancel(context.Background())
cacheMgn, err := cache.TestCacheInitialize(ctx, cfg, storage)
if err != nil {
Expand Down Expand Up @@ -657,7 +656,7 @@ func Test_GetStrategy(t *testing.T) {

_ = strategyTest.cacheMgn.TestUpdate()
resp := strategyTest.svr.GetStrategy(valCtx, &apisecurity.AuthStrategy{
Id: &wrapperspb.StringValue{Value: strategyTest.strategies[0].ID},
Id: &wrapperspb.StringValue{Value: strategyTest.strategies[index].ID},
})

assert.Equal(t, api.NotAllowedAccess, resp.Code.GetValue(), resp.Info.GetValue())
Expand Down
3 changes: 2 additions & 1 deletion auth/defaultauth/token_test.go
Expand Up @@ -22,8 +22,9 @@ import (
"testing"
"time"

"github.com/polarismesh/polaris/auth/defaultauth"
"golang.org/x/crypto/bcrypt"

"github.com/polarismesh/polaris/auth/defaultauth"
)

// Test_CustomDesignSalt 主要用于有自定义salt需求的用户
Expand Down
1 change: 1 addition & 0 deletions auth/defaultauth/utils_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/golang/protobuf/ptypes/wrappers"

"github.com/polarismesh/polaris/auth/defaultauth"
"github.com/polarismesh/polaris/common/utils"
)
Expand Down
9 changes: 4 additions & 5 deletions cache/cache.go
Expand Up @@ -45,11 +45,10 @@ type CacheManager struct {
// Initialize 缓存对象初始化
func (nc *CacheManager) Initialize() error {
if config.DiffTime != 0 {
if config.DiffTime > 0 {
types.DefaultTimeDiff = config.DiffTime
} else {
types.DefaultTimeDiff = -1 * config.DiffTime
}
types.DefaultTimeDiff = -1 * (config.DiffTime.Abs())
}
if types.DefaultTimeDiff > 0 {
return fmt.Errorf("cache diff time to pull store must negative number: %+v", types.DefaultTimeDiff)
}

for _, obj := range nc.caches {
Expand Down
6 changes: 3 additions & 3 deletions cache/cache_test.go
Expand Up @@ -68,8 +68,8 @@ func TestCacheManager_Start(t *testing.T) {
assert.NotNil(t, c)
beg := time.Unix(0, 0).Add(types.DefaultTimeDiff)
storage.EXPECT().GetUnixSecond(gomock.Any()).AnyTimes().Return(time.Now().Unix(), nil)
storage.EXPECT().GetMoreInstances(beg, true, false, nil).Return(nil, nil).MaxTimes(1)
storage.EXPECT().GetMoreInstances(beg, false, false, nil).Return(nil, nil).MaxTimes(3)
storage.EXPECT().GetMoreInstances(gomock.Any(), beg, true, false, nil).Return(nil, nil).MaxTimes(1)
storage.EXPECT().GetMoreInstances(gomock.Any(), beg, false, false, nil).Return(nil, nil).MaxTimes(3)
storage.EXPECT().GetMoreServices(beg, true, false, false).Return(nil, nil).MaxTimes(1)
storage.EXPECT().GetMoreServices(beg, false, false, false).Return(nil, nil).MaxTimes(3)
storage.EXPECT().GetRoutingConfigsForCache(beg, true).Return(nil, nil).MaxTimes(3)
Expand All @@ -81,7 +81,7 @@ func TestCacheManager_Start(t *testing.T) {
storage.EXPECT().GetRateLimitsForCache(beg, true).Return(nil, nil).MaxTimes(1)
storage.EXPECT().GetRateLimitsForCache(beg, false).Return(nil, nil).MaxTimes(3)
storage.EXPECT().GetCircuitBreakerRulesForCache(beg, false).Return(nil, nil).MaxTimes(3)
storage.EXPECT().GetInstancesCount().Return(uint32(0), nil).MaxTimes(1)
storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(0), nil).MaxTimes(1)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
7 changes: 5 additions & 2 deletions cache/config.go
Expand Up @@ -21,8 +21,11 @@ import "time"

// Config 缓存配置
type Config struct {
Open bool `yaml:"open"`
DiffTime time.Duration `yaml:"diffTime"`
// Open 是否启用缓存
Open bool `yaml:"open"`
// DiffTime 设置拉取时间范围, [T1 - abs(DiffTime), T1]
DiffTime time.Duration `yaml:"diffTime"`
// Resources 设置那些资源需要使用缓存
Resources []ConfigEntry `yaml:"resources"`
}

Expand Down
8 changes: 7 additions & 1 deletion cache/config/config_file.go
Expand Up @@ -179,6 +179,8 @@ func (fc *fileCache) setReleases(releases []*model.ConfigFileRelease) (map[strin
}

if item.Active {
configLog.Info("[Config][Release][Cache] notify config release change",
zap.Any("info", item.SimpleConfigFileRelease))
fc.sendEvent(item)
}
}
Expand All @@ -187,9 +189,13 @@ func (fc *fileCache) setReleases(releases []*model.ConfigFileRelease) (map[strin
}

func (fc *fileCache) sendEvent(item *model.ConfigFileRelease) {
_ = eventhub.Publish(eventhub.ConfigFilePublishTopic, &eventhub.PublishConfigFileEvent{
err := eventhub.Publish(eventhub.ConfigFilePublishTopic, &eventhub.PublishConfigFileEvent{
Message: item.SimpleConfigFileRelease,
})
if err != nil {
configLog.Error("[Config][Release][Cache] notify config release change",
zap.Any("info", item.ConfigFileReleaseKey), zap.Error(err))
}
}

// handleUpdateRelease
Expand Down
Expand Up @@ -19,10 +19,12 @@ package service

import (
"crypto/sha1"
"fmt"
"sort"
"sync"
"time"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"

types "github.com/polarismesh/polaris/cache/api"
Expand Down Expand Up @@ -81,12 +83,16 @@ func (c *circuitBreakerCache) Update() error {
}

func (c *circuitBreakerCache) realUpdate() (map[string]time.Time, int64, error) {
start := time.Now()
cbRules, err := c.storage.GetCircuitBreakerRulesForCache(c.LastFetchTime(), c.IsFirstUpdate())
if err != nil {
log.Errorf("[Cache] circuit breaker config cache update err:%s", err.Error())
log.Errorf("[Cache][CircuitBreaker] cache update err:%s", err.Error())
return nil, -1, err
}
lastMtimes := c.setCircuitBreaker(cbRules)
lastMtimes, upsert, del := c.setCircuitBreaker(cbRules)
log.Info("[Cache][CircuitBreaker] get more rules",
zap.Int("pull-from-store", len(cbRules)), zap.Int("upsert", upsert), zap.Int("delete", del),
zap.Time("last", c.LastMtime(c.Name())), zap.Duration("used", time.Since(start)))
return lastMtimes, int64(len(cbRules)), nil
}

Expand Down Expand Up @@ -230,8 +236,6 @@ func (c *circuitBreakerCache) storeCircuitBreakerToServiceCache(
c.lock.Lock()
defer c.lock.Unlock()

c.cleanOldCircuitBreakerToServiceCache(entry, svcKeys)

if len(svcKeys) == 0 {
// all wildcard
c.storeAndReloadCircuitBreakerRules(c.allWildcardRules, entry)
Expand Down Expand Up @@ -299,11 +303,6 @@ func (c *circuitBreakerCache) storeCircuitBreakerToServiceCache(
}
}

func (c *circuitBreakerCache) cleanOldCircuitBreakerToServiceCache(
entry *model.CircuitBreakerRule, svcKeys map[model.ServiceKey]bool) {

}

const allMatched = "*"

func getServicesInvolveByCircuitBreakerRule(cbRule *model.CircuitBreakerRule) map[model.ServiceKey]bool {
Expand All @@ -325,11 +324,15 @@ func getServicesInvolveByCircuitBreakerRule(cbRule *model.CircuitBreakerRule) ma
}

// setCircuitBreaker 更新store的数据到cache中
func (c *circuitBreakerCache) setCircuitBreaker(cbRules []*model.CircuitBreakerRule) map[string]time.Time {
func (c *circuitBreakerCache) setCircuitBreaker(
cbRules []*model.CircuitBreakerRule) (map[string]time.Time, int, int) {

if len(cbRules) == 0 {
return nil
return nil, 0, 0
}

var upsert, del int

lastMtime := c.LastMtime(c.Name()).Unix()

for _, cbRule := range cbRules {
Expand All @@ -343,23 +346,27 @@ func (c *circuitBreakerCache) setCircuitBreaker(cbRules []*model.CircuitBreakerR
if oldRule.IsServiceChange(cbRule) {
// 从老的规则中获取所有的 svcKeys 信息列表
svcKeys := getServicesInvolveByCircuitBreakerRule(oldRule)
log.Info("[Cache][CircuitBreaker] clean rule bind old service info",
zap.String("svc-keys", fmt.Sprintf("%#v", svcKeys)), zap.String("rule-id", cbRule.ID))
// 挨个清空
c.deleteCircuitBreakerFromServiceCache(cbRule.ID, svcKeys)
}
}
svcKeys := getServicesInvolveByCircuitBreakerRule(cbRule)
if !cbRule.Valid {
del++
c.rules.Delete(cbRule.ID)
c.deleteCircuitBreakerFromServiceCache(cbRule.ID, svcKeys)
continue
}
upsert++
c.rules.Store(cbRule.ID, cbRule)
c.storeCircuitBreakerToServiceCache(cbRule, svcKeys)
}

return map[string]time.Time{
c.Name(): time.Unix(lastMtime, 0),
}
}, upsert, del
}

// GetCircuitBreakerCount 获取熔断规则总数
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions cache/service/faultdetect.go
Expand Up @@ -19,10 +19,12 @@ package service

import (
"crypto/sha1"
"fmt"
"sort"
"sync"
"time"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"

types "github.com/polarismesh/polaris/cache/api"
Expand Down Expand Up @@ -310,6 +312,8 @@ func (f *faultDetectCache) setFaultDetectRules(fdRules []*model.FaultDetectRule)
if oldRule.IsServiceChange(fdRule) {
// 从老的规则中获取所有的 svcKeys 信息列表
svcKeys := getServicesInvolveByFaultDetectRule(oldRule)
log.Info("[Cache][FaultDetect] clean rule bind old service info",
zap.String("svc-keys", fmt.Sprintf("%#v", svcKeys)), zap.String("rule-id", fdRule.ID))
// 挨个清空
f.deleteFaultDetectRuleFromServiceCache(fdRule.ID, svcKeys)
}
Expand Down

0 comments on commit 2e59684

Please sign in to comment.