Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refine error log in cluster pkg (#2868) #2906

Merged
merged 1 commit into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ var (

// scheduler errors
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore"))
ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted"))
ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound"))
ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist"))
ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig"))
ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow"))
ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth"))
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:scheduler:ErrGetSourceStore"))
ErrSchedulerExisted = errors.Normalize("scheduler existed", errors.RFCCodeText("PD:scheduler:ErrSchedulerExisted"))
ErrSchedulerNotFound = errors.Normalize("scheduler not found", errors.RFCCodeText("PD:scheduler:ErrSchedulerNotFound"))
ErrScheduleConfigNotExist = errors.Normalize("the config does not exist", errors.RFCCodeText("PD:scheduler:ErrScheduleConfigNotExist"))
ErrSchedulerConfig = errors.Normalize("wrong scheduler config %s", errors.RFCCodeText("PD:scheduler:ErrSchedulerConfig"))
ErrCacheOverflow = errors.Normalize("cache overflow", errors.RFCCodeText("PD:scheduler:ErrCacheOverflow"))
ErrInternalGrowth = errors.Normalize("unknown interval growth type error", errors.RFCCodeText("PD:scheduler:ErrInternalGrowth"))
ErrSchedulerCreateFuncNotRegistered = errors.Normalize("create func of %v is not registered", errors.RFCCodeText("PD:scheduler:ErrSchedulerCreateFuncNotRegistered"))
)

// placement errors
Expand All @@ -60,13 +61,8 @@ var (

// cluster errors
var (
ErrPersistStore = errors.Normalize("failed to persist store", errors.RFCCodeText("PD:cluster:ErrPersistStore"))
ErrDeleteRegion = errors.Normalize("failed to delete region from storage", errors.RFCCodeText("PD:cluster:ErrDeleteRegion"))
ErrSaveRegion = errors.Normalize("failed to save region from storage", errors.RFCCodeText("PD:cluster:ErrSaveRegion"))
ErrBuryStore = errors.Normalize("failed to bury store", errors.RFCCodeText("PD:cluster:ErrBuryStore"))
ErrDeleteStore = errors.Normalize("failed to delete store", errors.RFCCodeText("PD:cluster:ErrDeleteStore"))
ErrPersistClusterVersion = errors.Normalize("persist cluster version meet error", errors.RFCCodeText("PD:cluster:ErrPersistClusterVersion"))
ErrGetMembers = errors.Normalize("get members failed", errors.RFCCodeText("PD:cluster:ErrGetMembers"))
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
ErrStoreIsUp = errors.Normalize("store is still up, please remove store gracefully", errors.RFCCodeText("PD:cluster:ErrStoreIsUp"))
)

// grpcutil errors
Expand All @@ -90,6 +86,7 @@ var (
// proto errors
var (
ErrProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrProtoUnmarshal"))
ErrProtoMarshal = errors.Normalize("failed to marshal proto", errors.RFCCodeText("PD:proto:ErrProtoMarshal"))
)

// etcd errors
Expand Down Expand Up @@ -149,3 +146,20 @@ var (
var (
ErrHexDecodingString = errors.Normalize("decode string %s error", errors.RFCCodeText("PD:hex:ErrHexDecodingString"))
)

// filepath errors
var (
ErrFilePathAbs = errors.Normalize("failed to convert a path to absolute path", errors.RFCCodeText("PD:filepath:ErrFilePathAbs"))
)

// plugin errors
var (
ErrLoadPlugin = errors.Normalize("failed to load plugin", errors.RFCCodeText("PD:plugin:ErrLoadPlugin"))
ErrLookupPluginFunc = errors.Normalize("failed to lookup plugin function", errors.RFCCodeText("PD:plugin:ErrLookupPluginFunc"))
)

// json errors
var (
ErrJSONMarshal = errors.Normalize("failed to marshal json", errors.RFCCodeText("PD:json:ErrJSONMarshal"))
ErrJSONUnmarshal = errors.Normalize("failed to unmarshal json", errors.RFCCodeText("PD:json:ErrJSONUnmarshal"))
)
5 changes: 4 additions & 1 deletion pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func ListEtcdMembers(client *clientv3.Client) (*clientv3.MemberListResponse, err
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
listResp, err := client.MemberList(ctx)
cancel()
return listResp, errors.WithStack(err)
if err != nil {
return listResp, errs.ErrEtcdMemberList.Wrap(err).GenWithStackByCause()
}
return listResp, nil
}

// RemoveEtcdMember removes a member by the given id.
Expand Down
4 changes: 2 additions & 2 deletions server/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package api
import (
"net/http"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/cluster"
"github.com/unrolled/render"
)

Expand All @@ -37,7 +37,7 @@ func (m clusterMiddleware) Middleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := m.s.GetRaftCluster()
if rc == nil {
m.rd.JSON(w, http.StatusInternalServerError, cluster.ErrNotBootstrapped.Error())
m.rd.JSON(w, http.StatusInternalServerError, errs.ErrNotBootstrapped.FastGenByArgs().Error())
return
}
ctx := withClusterCtx(r.Context(), rc)
Expand Down
17 changes: 9 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/typeutil"
Expand Down Expand Up @@ -494,7 +495,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
}
if newStore.NeedPersist() && c.storage != nil {
if err := c.storage.SaveStore(newStore.GetMeta()); err != nil {
log.Error("failed to persist store", zap.Uint64("store-id", newStore.GetID()))
log.Error("failed to persist store", zap.Uint64("store-id", newStore.GetID()), errs.ZapError(err))
} else {
newStore = newStore.Clone(core.SetLastPersistTime(time.Now()))
}
Expand Down Expand Up @@ -622,7 +623,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", item.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(item.GetMeta())),
zap.Error(err))
errs.ZapError(err))
}
}
}
Expand Down Expand Up @@ -674,7 +675,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
errs.ZapError(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
Expand Down Expand Up @@ -1026,7 +1027,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error {

if store.IsUp() {
if !force {
return errors.New("store is still up, please remove store gracefully")
return errs.ErrStoreIsUp.FastGenByArgs()
}
log.Warn("forcedly bury store", zap.Stringer("store", store.GetMeta()))
}
Expand Down Expand Up @@ -1131,7 +1132,7 @@ func (c *RaftCluster) checkStores() {
if err := c.BuryStore(offlineStore.GetId(), false); err != nil {
log.Error("bury store failed",
zap.Stringer("store", offlineStore),
zap.Error(err))
errs.ZapError(err))
}
} else {
offlineStores = append(offlineStores, offlineStore)
Expand Down Expand Up @@ -1162,7 +1163,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error {
if err != nil {
log.Error("delete store failed",
zap.Stringer("store", store.GetMeta()),
zap.Error(err))
errs.ZapError(err))
return err
}
c.RemoveStoreLimit(store.GetID())
Expand Down Expand Up @@ -1234,7 +1235,7 @@ func (c *RaftCluster) resetClusterMetrics() {
func (c *RaftCluster) collectHealthStatus() {
members, err := GetMembers(c.etcdClient)
if err != nil {
log.Error("get members error", zap.Error(err))
log.Error("get members error", errs.ZapError(err))
}
unhealth := CheckHealth(c.httpClient, members)
for _, member := range members {
Expand Down Expand Up @@ -1312,7 +1313,7 @@ func (c *RaftCluster) OnStoreVersionChange() {
}
err := c.opt.Persist(c.storage)
if err != nil {
log.Error("persist cluster version meet error", zap.Error(err))
log.Error("persist cluster version meet error", errs.ZapError(err))
}
log.Info("cluster version changed",
zap.Stringer("old-cluster-version", clusterVersion),
Expand Down
59 changes: 31 additions & 28 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ const (
PluginUnload = "PluginUnload"
)

// ErrNotBootstrapped is error info for cluster not bootstrapped.
var ErrNotBootstrapped = errors.New("TiKV cluster not bootstrapped, please start TiKV first")

// coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
type coordinator struct {
sync.RWMutex
Expand Down Expand Up @@ -240,10 +237,10 @@ func (c *coordinator) run() {
if err == nil {
break
}
log.Error("cannot load schedulers' config", zap.Int("retry-times", i), zap.Error(err))
log.Error("cannot load schedulers' config", zap.Int("retry-times", i), errs.ZapError(err))
}
if err != nil {
log.Fatal("cannot load schedulers' config", zap.Error(err))
log.Fatal("cannot load schedulers' config", errs.ZapError(err))
}

scheduleCfg := c.cluster.opt.GetScheduleConfig().Clone()
Expand All @@ -259,7 +256,7 @@ func (c *coordinator) run() {
}
}
if len(cfg.Type) == 0 {
log.Error("the scheduler type not found", zap.String("scheduler-name", name))
log.Error("the scheduler type not found", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerNotFound))
continue
}
if cfg.Disable {
Expand All @@ -268,12 +265,12 @@ func (c *coordinator) run() {
}
s, err := schedule.CreateScheduler(cfg.Type, c.opController, c.cluster.storage, schedule.ConfigJSONDecoder([]byte(data)))
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Error(err))
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), errs.ZapError(err))
continue
}
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.addScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Error(err))
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
}
}

Expand All @@ -289,13 +286,13 @@ func (c *coordinator) run() {

s, err := schedule.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.storage, schedule.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args))
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Error(err))
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), errs.ZapError(err))
continue
}

log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
if err = c.addScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err))
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
Expand All @@ -307,7 +304,7 @@ func (c *coordinator) run() {
scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k]
c.cluster.opt.SetScheduleConfig(scheduleCfg)
if err := c.cluster.opt.Persist(c.cluster.storage); err != nil {
log.Error("cannot persist schedule config", zap.Error(err))
log.Error("cannot persist schedule config", errs.ZapError(err))
}

c.wg.Add(2)
Expand All @@ -322,26 +319,26 @@ func (c *coordinator) LoadPlugin(pluginPath string, ch chan string) {
// get func: SchedulerType from plugin
SchedulerType, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerType")
if err != nil {
log.Error("GetFunction SchedulerType error", zap.Error(err))
log.Error("GetFunction SchedulerType error", errs.ZapError(err))
return
}
schedulerType := SchedulerType.(func() string)
// get func: SchedulerArgs from plugin
SchedulerArgs, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerArgs")
if err != nil {
log.Error("GetFunction SchedulerArgs error", zap.Error(err))
log.Error("GetFunction SchedulerArgs error", errs.ZapError(err))
return
}
schedulerArgs := SchedulerArgs.(func() []string)
// create and add user scheduler
s, err := schedule.CreateScheduler(schedulerType(), c.opController, c.cluster.storage, schedule.ConfigSliceDecoder(schedulerType(), schedulerArgs()))
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), zap.Error(err))
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
if err = c.addScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err))
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
}

Expand All @@ -359,7 +356,7 @@ func (c *coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan
if action == PluginUnload {
err := c.removeScheduler(schedulerName)
if err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", schedulerName), zap.Error(err))
log.Error("can not remove scheduler", zap.String("scheduler-name", schedulerName), errs.ZapError(err))
} else {
log.Info("unload plugin", zap.String("plugin", pluginPath))
return
Expand Down Expand Up @@ -553,7 +550,7 @@ func (c *coordinator) removeScheduler(name string) error {
c.Lock()
defer c.Unlock()
if c.cluster == nil {
return ErrNotBootstrapped
return errs.ErrNotBootstrapped.FastGenByArgs()
}
s, ok := c.schedulers[name]
if !ok {
Expand All @@ -566,24 +563,30 @@ func (c *coordinator) removeScheduler(name string) error {

var err error
opt := c.cluster.opt

if err = opt.RemoveSchedulerCfg(s.Ctx(), name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err))
} else if err = opt.Persist(c.cluster.storage); err != nil {
log.Error("the option can not persist scheduler config", zap.Error(err))
} else {
err = c.cluster.storage.RemoveScheduleConfig(name)
if err != nil {
log.Error("can not remove the scheduler config", zap.Error(err))
}
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
return err
}
return err

if err = opt.Persist(c.cluster.storage); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err = c.cluster.storage.RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}

return nil
}

func (c *coordinator) pauseOrResumeScheduler(name string, t int64) error {
c.Lock()
defer c.Unlock()
if c.cluster == nil {
return ErrNotBootstrapped
return errs.ErrNotBootstrapped.FastGenByArgs()
}
s := make([]*scheduleController, 0)
if name != "all" {
Expand Down Expand Up @@ -612,7 +615,7 @@ func (c *coordinator) isSchedulerPaused(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
if c.cluster == nil {
return false, ErrNotBootstrapped
return false, errs.ErrNotBootstrapped.FastGenByArgs()
}
s, ok := c.schedulers[name]
if !ok {
Expand Down
10 changes: 7 additions & 3 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/kv"
"go.etcd.io/etcd/clientv3"
)
Expand Down Expand Up @@ -200,7 +201,7 @@ func (s *Storage) DeleteRegion(region *metapb.Region) error {
func (s *Storage) SaveConfig(cfg interface{}) error {
value, err := json.Marshal(cfg)
if err != nil {
return errors.WithStack(err)
return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause()
}
return s.Save(configPath, string(value))
}
Expand Down Expand Up @@ -505,13 +506,16 @@ func loadProto(s kv.Base, key string, msg proto.Message) (bool, error) {
return false, nil
}
err = proto.Unmarshal([]byte(value), msg)
return true, errors.WithStack(err)
if err != nil {
return false, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByCause()
}
return true, nil
}

func saveProto(s kv.Base, key string, msg proto.Message) error {
value, err := proto.Marshal(msg)
if err != nil {
return errors.WithStack(err)
return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause()
}
return s.Save(key, string(value))
}
Loading