Skip to content

Commit

Permalink
feat: scheduler add default biz tag (dragonflyoss#1164)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Mar 16, 2022
1 parent c192ae7 commit 73cfbe9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
8 changes: 4 additions & 4 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
)

const (
// Default value of biz tag
DefaultBizTag = "unknow"

// Download tiny file timeout
downloadTinyFileContextTimeout = 2 * time.Minute

// Default value of biz tag
defaultBizTag = "unknow"
)

const (
Expand Down Expand Up @@ -159,7 +159,7 @@ type Peer struct {
func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p := &Peer{
ID: id,
BizTag: defaultBizTag,
BizTag: DefaultBizTag,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},
Expand Down
9 changes: 7 additions & 2 deletions scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
}
host := s.registerHost(ctx, req)
peer := s.registerPeer(ctx, req, task, host)
peer.Log.Infof("register peer task request: %#v", req)
peer.Log.Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad)

// Task has been successful
if task.FSM.Is(resource.TaskStateSucceeded) {
Expand Down Expand Up @@ -435,7 +435,12 @@ func (s *Service) registerHost(ctx context.Context, req *rpcscheduler.PeerTaskRe

// registerPeer creates a new peer or reuses a previous peer
func (s *Service) registerPeer(ctx context.Context, req *rpcscheduler.PeerTaskRequest, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := s.resource.PeerManager().LoadOrStore(resource.NewPeer(req.PeerId, task, host, resource.WithBizTag(req.UrlMeta.Tag)))
var options []resource.PeerOption
if req.UrlMeta.Tag != "" {
options = append(options, resource.WithBizTag(req.UrlMeta.Tag))
}

peer, loaded := s.resource.PeerManager().LoadOrStore(resource.NewPeer(req.PeerId, task, host, options...))
if !loaded {
peer.Log.Info("create new peer")
return peer
Expand Down
2 changes: 2 additions & 0 deletions scheduler/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,7 @@ func TestService_registerPeer(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.Equal(peer.BizTag, resource.DefaultBizTag)
},
},
{
Expand All @@ -1795,6 +1796,7 @@ func TestService_registerPeer(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.Equal(peer.BizTag, resource.DefaultBizTag)
},
},
}
Expand Down

0 comments on commit 73cfbe9

Please sign in to comment.