Skip to content

Commit

Permalink
Merge pull request #134 from tuna/worker-last-online-register
Browse files Browse the repository at this point in the history
Worker last online and last register
  • Loading branch information
z4yx committed Sep 10, 2020
2 parents 2ba3a27 + 9f7f18c commit b578237
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 8 deletions.
9 changes: 5 additions & 4 deletions internal/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type MirrorStatus struct {
// A WorkerStatus is the information struct that describe
// a worker, and sent from the manager to clients.
type WorkerStatus struct {
ID string `json:"id"`
URL string `json:"url"` // worker url
Token string `json:"token"` // session token
LastOnline time.Time `json:"last_online"` // last seen
ID string `json:"id"`
URL string `json:"url"` // worker url
Token string `json:"token"` // session token
LastOnline time.Time `json:"last_online"` // last seen
LastRegister time.Time `json:"last_register"` // last register time
}

type MirrorSchedules struct {
Expand Down
10 changes: 10 additions & 0 deletions manager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type dbAdapter interface {
GetWorker(workerID string) (WorkerStatus, error)
DeleteWorker(workerID string) error
CreateWorker(w WorkerStatus) (WorkerStatus, error)
RefreshWorker(workerID string) (WorkerStatus, error)
UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error)
GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error)
ListMirrorStatus(workerID string) ([]MirrorStatus, error)
Expand Down Expand Up @@ -125,6 +126,15 @@ func (b *boltAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, err
}

func (b *boltAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}

func (b *boltAdapter) UpdateMirrorStatus(workerID, mirrorID string, status MirrorStatus) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
err := b.db.Update(func(tx *bolt.Tx) error {
Expand Down
1 change: 1 addition & 0 deletions manager/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestBoltAdapter(t *testing.T) {
ID: id,
Token: "token_" + id,
LastOnline: time.Now(),
LastRegister: time.Now(),
}
w, err = boltDB.CreateWorker(w)
So(err, ShouldBeNil)
Expand Down
9 changes: 7 additions & 2 deletions manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ func (s *Manager) listWorkers(c *gin.Context) {
for _, w := range workers {
workerInfos = append(workerInfos,
WorkerStatus{
ID: w.ID,
LastOnline: w.LastOnline,
ID: w.ID,
LastOnline: w.LastOnline,
LastRegister: w.LastRegister,
})
}
c.JSON(http.StatusOK, workerInfos)
Expand All @@ -215,6 +216,7 @@ func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus
c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil {
err := fmt.Errorf("failed to register worker: %s",
Expand Down Expand Up @@ -268,6 +270,7 @@ func (s *Manager) updateSchedulesOfWorker(c *gin.Context) {
}

s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil {
Expand Down Expand Up @@ -312,6 +315,7 @@ func (s *Manager) updateJobOfWorker(c *gin.Context) {
}

s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
curStatus, _ := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()

Expand Down Expand Up @@ -374,6 +378,7 @@ func (s *Manager) updateMirrorSize(c *gin.Context) {

mirrorName := msg.Name
s.rwmu.RLock()
s.adapter.RefreshWorker(workerID)
status, err := s.adapter.GetMirrorStatus(workerID, mirrorName)
s.rwmu.RUnlock()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,15 @@ func (b *mockDBAdapter) CreateWorker(w WorkerStatus) (WorkerStatus, error) {
return w, nil
}

func (b *mockDBAdapter) RefreshWorker(workerID string) (w WorkerStatus, err error) {
w, err = b.GetWorker(workerID)
if err == nil {
w.LastOnline = time.Now()
w, err = b.CreateWorker(w)
}
return w, err
}

func (b *mockDBAdapter) GetMirrorStatus(workerID, mirrorID string) (MirrorStatus, error) {
id := mirrorID + "/" + workerID
status, ok := b.statusStore[id]
Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewTUNASyncWorker(cfg *Config) *Worker {

// Run runs worker forever
func (w *Worker) Run() {
w.registorWorker()
w.registerWorker()
go w.runHTTPServer()
w.runSchedule()
}
Expand Down Expand Up @@ -393,7 +393,7 @@ func (w *Worker) URL() string {
return fmt.Sprintf("%s://%s:%d/", proto, w.cfg.Server.Hostname, w.cfg.Server.Port)
}

func (w *Worker) registorWorker() {
func (w *Worker) registerWorker() {
msg := WorkerStatus{
ID: w.Name(),
URL: w.URL(),
Expand Down
1 change: 1 addition & 0 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func makeMockManagerServer(recvData chan interface{}) *gin.Engine {
var _worker WorkerStatus
c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
_worker.LastRegister = time.Now()
recvData <- _worker
c.JSON(http.StatusOK, _worker)
})
Expand Down

0 comments on commit b578237

Please sign in to comment.