Skip to content

Commit

Permalink
fix:registry metrics count total service data error (polarismesh#1128)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed May 16, 2023
1 parent 3c9f7c5 commit 89e649d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 46 deletions.
3 changes: 3 additions & 0 deletions admin/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func getMasterAccountToken(storage store.Store) (string, error) {
if err != nil {
return "", err
}
if user == nil {
return "", fmt.Errorf("polaris main user: %s not found", mainUser)
}
return user.Token, nil
}

Expand Down
19 changes: 10 additions & 9 deletions cache/instance_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ func (ic *instanceCache) reportMetricsInfo() {
metricValues := make([]metrics.DiscoveryMetric, 0, 32)

_ = cacheMgr.Service().IteratorServices(func(key string, svc *model.Service) (bool, error) {
if _, ok := tmpServiceInfos[svc.Namespace]; !ok {
tmpServiceInfos[svc.Namespace] = map[string]struct{}{}
}
tmpServiceInfos[svc.Namespace][svc.Name] = struct{}{}

if _, ok := allServices[svc.Namespace]; !ok {
allServices[svc.Namespace] = map[string]struct{}{}
}
allServices[svc.Namespace][svc.Name] = struct{}{}

if _, ok := offlineService[svc.Namespace]; !ok {
offlineService[svc.Namespace] = map[string]struct{}{}
}
Expand All @@ -66,15 +76,6 @@ func (ic *instanceCache) reportMetricsInfo() {
return true
}

if _, ok := tmpServiceInfos[svc.Namespace]; !ok {
tmpServiceInfos[svc.Namespace] = map[string]struct{}{}
}
tmpServiceInfos[svc.Namespace][svc.Name] = struct{}{}

if _, ok := allServices[svc.Namespace]; !ok {
allServices[svc.Namespace] = map[string]struct{}{}
}
allServices[svc.Namespace][svc.Name] = struct{}{}
if _, ok := onlineService[svc.Namespace]; !ok {
onlineService[svc.Namespace] = map[string]struct{}{}
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/statis/base/base_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *BaseWorker) Run(ctx context.Context, interval time.Duration) {
dest = dest - (dest % 60)
diff := dest - nowSeconds

log.Infof("[APICall] prometheus stats need sleep %ds", diff)
log.Infof("[APICall] base stats need sleep %ds", diff)
time.Sleep(time.Duration(diff) * time.Second)

ticker := time.NewTicker(interval)
Expand Down
13 changes: 5 additions & 8 deletions release/conf/polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,11 @@ plugin:
option:
interval: 60 # Statistical interval, the unit is second
statis:
name: local
option:
interval: 60
# entries:
# - name: local
# option:
# interval: 60
# - name: prometheus
entries:
- name: local
option:
interval: 60
- name: prometheus
ratelimit:
name: token-bucket
option:
Expand Down
3 changes: 2 additions & 1 deletion service/routing_config_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestCreateRoutingConfig(t *testing.T) {
t.Logf("%s", resp.GetInfo().GetValue())
})

t.Run("服务不存在,创建路由配置,报错", func(t *testing.T) {
t.Run("服务不存在,创建路由配置不报错", func(t *testing.T) {
discoverSuit := &DiscoverTestSuit{}
if err := discoverSuit.Initialize(); err != nil {
t.Fatal(err)
Expand All @@ -151,6 +151,7 @@ func TestCreateRoutingConfig(t *testing.T) {
_, serviceResp := discoverSuit.createCommonService(t, 120)
discoverSuit.cleanServiceName(serviceResp.GetName().GetValue(), serviceResp.GetNamespace().GetValue())

_ = discoverSuit.DiscoverServer().Cache().TestRefresh()
req := &apitraffic.Routing{}
req.Service = serviceResp.Name
req.Namespace = serviceResp.Namespace
Expand Down
73 changes: 48 additions & 25 deletions service/server_authability.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,19 @@ func (svr *serverAuthAbility) queryServiceResource(
svcSet := model.NewServiceSet()

for index := range req {
names.Add(req[index].Namespace.GetValue())
svc := svr.Cache().Service().GetServiceByName(req[index].Name.GetValue(), req[index].Namespace.GetValue())
svcName := req[index].GetName().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
names.Add(svcNamespace)
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
}
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect service access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect service access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -277,9 +281,11 @@ func (svr *serverAuthAbility) queryServiceAliasResource(
svcSet := model.NewServiceSet()

for index := range req {
names.Add(req[index].Namespace.GetValue())
alias := svr.Cache().Service().GetServiceByName(req[index].Alias.GetValue(),
req[index].AliasNamespace.GetValue())
aliasSvcName := req[index].GetAlias().GetValue()
aliasSvcNamespace := req[index].GetAliasNamespace().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
names.Add(svcNamespace)
alias := svr.Cache().Service().GetServiceByName(aliasSvcName, aliasSvcNamespace)
if alias != nil {
svc := svr.Cache().Service().GetServiceByID(alias.Reference)
if svc != nil {
Expand All @@ -289,7 +295,9 @@ func (svr *serverAuthAbility) queryServiceAliasResource(
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect service alias access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect service alias access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -305,14 +313,15 @@ func (svr *serverAuthAbility) queryInstanceResource(
svcSet := model.NewServiceSet()

for index := range req {
svcName := req[index].GetService().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
item := req[index]
if item.Namespace.GetValue() != "" && item.Service.GetValue() != "" {
svc := svr.Cache().Service().GetServiceByName(req[index].Service.GetValue(),
req[index].Namespace.GetValue())
if svcNamespace != "" && svcName != "" {
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
} else {
names.Add(req[index].Namespace.GetValue())
names.Add(svcNamespace)
}
} else {
ins := svr.Cache().Instance().GetInstance(item.GetId().GetValue())
Expand All @@ -321,14 +330,16 @@ func (svr *serverAuthAbility) queryInstanceResource(
if svc != nil {
svcSet.Add(svc)
} else {
names.Add(req[index].Namespace.GetValue())
names.Add(svcNamespace)
}
}
}
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect instance access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect instance access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -343,14 +354,17 @@ func (svr *serverAuthAbility) queryCircuitBreakerResource(
svcSet := model.NewServiceSet()

for index := range req {
svc := svr.Cache().Service().GetServiceByName(req[index].Service.GetValue(),
req[index].Namespace.GetValue())
svcName := req[index].GetService().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
}
}
ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect circuit-breaker access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect circuit-breaker access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -365,15 +379,18 @@ func (svr *serverAuthAbility) queryCircuitBreakerReleaseResource(
svcSet := model.NewServiceSet()

for index := range req {
svc := svr.Cache().Service().GetServiceByName(req[index].Service.Name.GetValue(),
req[index].Service.Namespace.GetValue())
svcName := req[index].GetService().GetName().GetValue()
svcNamespace := req[index].GetService().GetNamespace().GetValue()
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
}
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect circuit-breaker-release access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect circuit-breaker-release access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -388,15 +405,18 @@ func (svr *serverAuthAbility) queryRouteRuleResource(
svcSet := model.NewServiceSet()

for index := range req {
svc := svr.Cache().Service().GetServiceByName(req[index].Service.GetValue(),
req[index].Namespace.GetValue())
svcName := req[index].GetService().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
}
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect route-rule access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect route-rule access res", zap.Any("res", ret))
}
return ret
}

Expand All @@ -411,15 +431,18 @@ func (svr *serverAuthAbility) queryRateLimitConfigResource(
svcSet := model.NewServiceSet()

for index := range req {
svc := svr.Cache().Service().GetServiceByName(req[index].Service.GetValue(),
req[index].Namespace.GetValue())
svcName := req[index].GetService().GetValue()
svcNamespace := req[index].GetNamespace().GetValue()
svc := svr.Cache().Service().GetServiceByName(svcName, svcNamespace)
if svc != nil {
svcSet.Add(svc)
}
}

ret := svr.convertToDiscoverResourceEntryMaps(names, svcSet)
authLog.Debug("[Auth][Server] collect rate-limit access res", zap.Any("res", ret))
if authLog.DebugEnabled() {
authLog.Debug("[Auth][Server] collect rate-limit access res", zap.Any("res", ret))
}
return ret
}

Expand Down
4 changes: 2 additions & 2 deletions store/mysql/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ func checkDataBaseAffectedRows(result sql.Result, counts ...int64) error {
}

// timeToTimestamp 转时间戳(秒)
// 由于 FROM_UNIXTIME 不支持负数,所以小于0的情况赋值为0
// 由于 FROM_UNIXTIME 不支持负数,所以小于0的情况赋值为 1
func timeToTimestamp(t time.Time) int64 {
ts := t.Unix()
if ts < 0 {
ts = 0
ts = 1
}
return ts
}
Expand Down

0 comments on commit 89e649d

Please sign in to comment.