Skip to content

Commit

Permalink
unified logs!
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS authored and anrs committed Oct 27, 2022
1 parent ff9e284 commit 86bd2fc
Show file tree
Hide file tree
Showing 79 changed files with 654 additions and 604 deletions.
11 changes: 5 additions & 6 deletions client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"sync"
"time"

"github.com/projecteru2/core/log"
pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

log "github.com/sirupsen/logrus"
)

type clientWithStatus struct {
Expand Down Expand Up @@ -43,7 +42,7 @@ func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error
rpc, err = NewClient(ctx, addr, config.Auth)
})
if err != nil {
log.Errorf("[NewCoreRPCClientPool] connect to %s failed, err: %s", addr, err)
log.Errorf(ctx, err, "[NewCoreRPCClientPool] connect to %s failed, err: %s", addr, err)
continue
}
rpcClient := rpc.GetRPCClient()
Expand All @@ -61,7 +60,7 @@ func NewCoreRPCClientPool(ctx context.Context, config *PoolConfig) (*Pool, error
}

if allFailed {
log.Error("[NewCoreRPCClientPool] all connections failed")
log.Error(ctx, types.ErrAllConnectionsFailed, "[NewCoreRPCClientPool] all connections failed")
return nil, types.ErrAllConnectionsFailed
}

Expand Down Expand Up @@ -97,10 +96,10 @@ func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duratio
_, err = rpc.client.Info(ctx, &pb.Empty{})
})
if err != nil {
log.Errorf("[ClientPool] connect to %s failed, err: %s", rpc.addr, err)
log.Errorf(ctx, err, "[ClientPool] connect to %s failed, err: %s", rpc.addr, err)
return false
}
log.Debugf("[ClientPool] connect to %s success", rpc.addr)
log.Debugf(ctx, "[ClientPool] connect to %s success", rpc.addr)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion client/interceptor/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *retryStream) RecvMsg(m interface{}) (err error) {
}

return backoff.Retry(func() error {
log.Debug(nil, "[retryStream] retry on new stream") //nolint
log.Debug(s.ctx, "[retryStream] retry on new stream")
stream, err := s.newStream()
if err != nil {
// even io.EOF triggers retry, and it's what we want!
Expand Down
8 changes: 4 additions & 4 deletions client/resolver/eru/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,23 @@ func (r *Resolver) Close() {

func (r *Resolver) sync() {
ctx := context.TODO()
log.Debug(ctx, "[EruResolver] start sync service discovery")
ctx, r.cancel = context.WithCancel(ctx)
defer r.cancel()
log.Debug(ctx, "[EruResolver] start sync service discovery")

ch, err := r.discovery.Watch(ctx)
if err != nil {
log.Errorf(ctx, "[EruResolver] failed to watch service status: %v", err)
log.Errorf(ctx, err, "[EruResolver] failed to watch service status: %v", err)
return
}
for {
select {
case <-ctx.Done():
log.Errorf(ctx, "[EruResolver] watch interrupted: %v", ctx.Err())
log.Errorf(ctx, ctx.Err(), "[EruResolver] watch interrupted: %v", ctx.Err())
return
case endpoints, ok := <-ch:
if !ok {
log.Error("[EruResolver] watch closed")
log.Info(ctx, nil, "[EruResolver] watch closed")
return
}

Expand Down
6 changes: 3 additions & 3 deletions client/servicediscovery/eru_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery {
func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
cc, err := w.dial(ctx, w.endpoint, w.authConfig)
if err != nil {
log.Errorf(ctx, "[EruServiceWatch] dial failed: %v", err)
log.Errorf(ctx, err, "[EruServiceWatch] dial failed: %v", err)
return
}
client := pb.NewCoreRPCClient(cc)
Expand All @@ -48,7 +48,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
watchCtx, cancelWatch := context.WithCancel(ctx)
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
if err != nil {
log.Errorf(ctx, "[EruServiceWatch] watch failed, try later: %v", err)
log.Errorf(ctx, err, "[EruServiceWatch] watch failed, try later: %v", err)
time.Sleep(10 * time.Second)
continue
}
Expand All @@ -69,7 +69,7 @@ func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
status, err := stream.Recv()
close(cancelTimer)
if err != nil {
log.Errorf(ctx, "[EruServiceWatch] recv failed: %v", err)
log.Errorf(ctx, err, "[EruServiceWatch] recv failed: %v", err)
break
}
expectedInterval = time.Duration(status.GetIntervalInSecond())
Expand Down
4 changes: 2 additions & 2 deletions client/utils/servicepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *EndpointPusher) addCheck(endpoints []string) {
func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) {
parts := strings.Split(endpoint, ":")
if len(parts) != 2 {
log.Errorf(ctx, "[EruResolver] wrong format of endpoint: %s", endpoint)
log.Errorf(ctx, nil, "[EruResolver] wrong format of endpoint: %s", endpoint)
return
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
func (p *EndpointPusher) checkReachability(host string) (err error) {
pinger, err := ping.NewPinger(host)
if err != nil {
log.Errorf(nil, "[EruResolver] failed to create pinger: %+v", err) //nolint
log.Errorf(nil, err, "[EruResolver] failed to create pinger: %+v", err) //nolint
return
}
pinger.SetPrivileged(os.Getuid() == 0)
Expand Down
44 changes: 24 additions & 20 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ import (
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
)

// BuildImage will build image
func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch chan *types.BuildImageMessage, err error) {
logger := log.WithField("Calcium", "BuildImage").WithField("opts", opts)
// Disable build API if scm not set
if c.source == nil {
return nil, logger.ErrWithTracing(ctx, errors.WithStack(types.ErrSCMNotSet))
logger.Errorf(ctx, types.ErrSCMNotSet, "")
return nil, types.ErrSCMNotSet
}
// select nodes
node, err := c.selectBuildNode(ctx)
if err != nil {
return nil, logger.ErrWithTracing(ctx, err)
logger.Errorf(ctx, err, "")
return nil, err
}

log.Infof(ctx, "[BuildImage] Building image at pod %s node %s", node.Podname, node.Name)
Expand All @@ -44,20 +44,23 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (ch
case types.BuildFromExist:
refs, node, resp, err = c.buildFromExist(ctx, opts)
default:
return nil, logger.ErrWithTracing(ctx, errors.WithStack(types.ErrUnknownBuildType))
logger.Errorf(ctx, types.ErrUnknownBuildType, "")
return nil, types.ErrUnknownBuildType
}
if err != nil {
return nil, logger.ErrWithTracing(ctx, err)
logger.Errorf(ctx, err, "")
return nil, err
}
ch, err = c.pushImageAndClean(ctx, resp, node, refs)
return ch, logger.ErrWithTracing(ctx, err)
logger.Errorf(ctx, err, "")
return ch, err
}

func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
// get pod from config
// TODO can choose multiple pod here for other engine support
if c.config.Docker.BuildPod == "" {
return nil, errors.WithStack(types.ErrNoBuildPod)
return nil, types.ErrNoBuildPod
}

// get nodes
Expand All @@ -67,7 +70,7 @@ func (c *Calcium) selectBuildNode(ctx context.Context) (*types.Node, error) {
}

if len(nodes) == 0 {
return nil, errors.WithStack(types.ErrInsufficientNodes)
return nil, types.ErrInsufficientNodes
}
// get idle max node
return c.getMostIdleNode(ctx, nodes)
Expand Down Expand Up @@ -97,7 +100,7 @@ func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, opts *type
path, content, err := node.Engine.BuildContent(ctx, c.source, buildContentOpts)
defer os.RemoveAll(path)
if err != nil {
return nil, nil, errors.WithStack(err)
return nil, nil, err
}
opts.Tar = content
return c.buildFromContent(ctx, node, opts)
Expand All @@ -106,7 +109,7 @@ func (c *Calcium) buildFromSCM(ctx context.Context, node *types.Node, opts *type
func (c *Calcium) buildFromContent(ctx context.Context, node *types.Node, opts *types.BuildOptions) ([]string, io.ReadCloser, error) {
refs := node.Engine.BuildRefs(ctx, toBuildRefOptions(opts))
resp, err := node.Engine.ImageBuild(ctx, opts.Tar, refs, opts.Platform)
return refs, resp, errors.WithStack(err)
return refs, resp, err
}

func (c *Calcium) buildFromExist(ctx context.Context, opts *types.BuildOptions) (refs []string, node *types.Node, resp io.ReadCloser, err error) {
Expand All @@ -117,12 +120,12 @@ func (c *Calcium) buildFromExist(ctx context.Context, opts *types.BuildOptions)
refs = node.Engine.BuildRefs(ctx, toBuildRefOptions(opts))
imgID, err := node.Engine.ImageBuildFromExist(ctx, opts.ExistID, refs, opts.User)
if err != nil {
return nil, nil, nil, errors.WithStack(err)
return nil, nil, nil, err
}

buildMsg, err := json.Marshal(types.BuildImageMessage{ID: imgID})
if err != nil {
return nil, nil, nil, errors.WithStack(err)
return nil, nil, nil, err
}

return refs, node, io.NopCloser(bytes.NewReader(buildMsg)), nil
Expand All @@ -143,22 +146,22 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
break
}
if err == context.Canceled || err == context.DeadlineExceeded {
log.Errorf(ctx, "[BuildImage] context timeout")
log.Errorf(ctx, err, "[BuildImage] context timeout")
lastMessage.ErrorDetail.Code = -1
lastMessage.ErrorDetail.Message = err.Error()
lastMessage.Error = err.Error()
break
}
malformed, _ := io.ReadAll(decoder.Buffered()) // TODO err check
logger.Errorf(ctx, "[BuildImage] Decode build image message failed %+v, buffered: %v", err, malformed)
logger.Errorf(ctx, nil, "[BuildImage] Decode build image message failed %+v, buffered: %v", err, malformed)
return
}
ch <- message
lastMessage = message
}

if lastMessage.Error != "" {
log.Errorf(ctx, "[BuildImage] Build image failed %v", lastMessage.ErrorDetail.Message)
log.Errorf(ctx, nil, "[BuildImage] Build image failed %v", lastMessage.ErrorDetail.Message)
return
}

Expand All @@ -168,7 +171,8 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
log.Infof(ctx, "[BuildImage] Push image %s", tag)
rc, err := node.Engine.ImagePush(ctx, tag)
if err != nil {
ch <- &types.BuildImageMessage{Error: logger.ErrWithTracing(ctx, err).Error()}
logger.Errorf(ctx, err, "")
ch <- &types.BuildImageMessage{Error: err.Error()}
continue
}

Expand Down Expand Up @@ -212,13 +216,13 @@ func cleanupNodeImages(ctx context.Context, node *types.Node, ids []string, ttl
defer cancel()
for _, id := range ids {
if _, err := node.Engine.ImageRemove(ctx, id, false, true); err != nil {
logger.Errorf(ctx, "[BuildImage] Remove image error: %+v", errors.WithStack(err))
logger.Errorf(ctx, err, "[BuildImage] Remove image error: %+v", err)
}
}
if spaceReclaimed, err := node.Engine.ImageBuildCachePrune(ctx, true); err != nil {
logger.Errorf(ctx, "[BuildImage] Remove build image cache error: %+v", errors.WithStack(err))
logger.Errorf(ctx, err, "[BuildImage] Remove build image cache error: %+v", err)
} else {
log.Infof(ctx, "[BuildImage] Clean cached image and release space %d", spaceReclaimed)
logger.Infof(ctx, "[BuildImage] Clean cached image and release space %d", spaceReclaimed)
}
}

Expand Down
30 changes: 15 additions & 15 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"

"github.com/pkg/errors"
)

// Calcium implement the cluster
Expand All @@ -38,12 +36,11 @@ type Calcium struct {

// New returns a new cluster config
func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithField("Calcium", "New").WithField("config", config)

// set store
store, err := store.NewStore(config, t)
if err != nil {
return nil, logger.ErrWithTracing(ctx, errors.WithStack(err))
log.Errorf(ctx, err, "")
return nil, err
}

// set scm
Expand All @@ -55,11 +52,11 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
case cluster.Github:
scm, err = github.New(config)
default:
log.Warn("[Calcium] SCM not set, build API disabled")
log.Warn(ctx, "[Calcium] SCM not set, build API disabled")
}
if err != nil {
logger.Errorf(nil, "[Calcium] SCM failed: %+v", err) //nolint
return nil, errors.WithStack(err)
log.Errorf(ctx, err, "[Calcium] SCM failed: %+v", err)
return nil, err
}

// set watcher
Expand All @@ -68,18 +65,19 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro
// set resource plugin manager
rmgr, err := resources.NewPluginsManager(config)
if err != nil {
return nil, logger.ErrWithTracing(ctx, errors.WithStack(err))
log.Errorf(ctx, err, "")
return nil, err
}

// load internal plugins
cpumem, err := cpumem.NewPlugin(config)
if err != nil {
log.Errorf(ctx, "[NewPluginManager] new cpumem plugin error: %v", err)
log.Errorf(ctx, err, "[NewPluginManager] new cpumem plugin error: %v", err)
return nil, err
}
volume, err := volume.NewPlugin(config)
if err != nil {
log.Errorf(ctx, "[NewPluginManager] new volume plugin error: %v", err)
log.Errorf(ctx, err, "[NewPluginManager] new volume plugin error: %v", err)
return nil, err
}
rmgr.AddPlugins(cpumem, volume)
Expand All @@ -98,17 +96,19 @@ func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, erro

cal.wal, err = enableWAL(config, cal, store)
if err != nil {
return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
log.Errorf(ctx, err, "")
return nil, err
}

cal.identifier, err = config.Identifier()
if err != nil {
return nil, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
log.Errorf(ctx, err, "")
return nil, err
}

_ = pool.Invoke(func() { cal.InitMetrics(ctx) })

return cal, logger.ErrWithTracing(nil, errors.WithStack(err)) //nolint
log.Errorf(ctx, err, "")
return cal, err
}

// DisasterRecover .
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio

if opts.DeployStrategy != strategy.Dummy {
if msg.NodeCapacities, err = c.doGetDeployStrategy(ctx, nodenames, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
logger.Errorf(ctx, err, "[Calcium.CalculateCapacity] doGetDeployMap failed: %+v", err)
return err
}

Expand All @@ -41,7 +41,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
var infos map[string]*resources.NodeCapacityInfo
infos, msg.Total, err = c.rmgr.GetNodesDeployCapacity(ctx, nodenames, opts.ResourceOpts)
if err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
logger.Errorf(ctx, err, "[Calcium.CalculateCapacity] failed to get nodes capacity: %+v", err)
return err
}
if msg.Total <= 0 {
Expand Down

0 comments on commit 86bd2fc

Please sign in to comment.