Skip to content

Commit

Permalink
Merge pull request #4565 from vmware/fix_stop_periodical_job_issue
Browse files Browse the repository at this point in the history
Fix issue of stopping periodic job
  • Loading branch information
steven-zou committed Apr 2, 2018
2 parents e4952b7 + 8cd98dd commit 44d63fe
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 57 deletions.
6 changes: 4 additions & 2 deletions src/jobservice/job/impl/demo_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
logger.Info("I'm finished, exit!")
fmt.Println("I'm finished, exit!")
}()
fmt.Println("I'm running")
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx)
Expand All @@ -81,11 +82,12 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error

//HOLD ON FOR A WHILE
logger.Error("Holding for 20 sec")
<-time.After(10 * time.Second)
<-time.After(15 * time.Second)
//logger.Fatal("I'm back, check if I'm stopped/cancelled")

if cmd, ok := ctx.OPCommand(); ok {
logger.Infof("cmd=%s\n", cmd)
fmt.Printf("Receive OP command: %s\n", cmd)
if cmd == opm.CtlCommandCancel {
logger.Info("exit for receiving cancel signal")
return errs.JobCancelledError()
Expand All @@ -95,7 +97,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
return errs.JobStoppedError()
}

fmt.Println("I'm here")
fmt.Println("I'm close to end")

return nil
}
166 changes: 166 additions & 0 deletions src/jobservice/opm/op_commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2018 The Harbor Authors. All rights reserved.

package opm

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/jobservice/logger"
"github.com/vmware/harbor/src/jobservice/models"
"github.com/vmware/harbor/src/jobservice/utils"
)

const (
commandValidTime = 5 * time.Minute
commandSweepTickerTime = 1 * time.Hour
//EventFireCommand for firing command event
EventFireCommand = "fire_command"
)

type oPCommand struct {
command string
fireTime int64
}

//oPCommands maintain commands list
type oPCommands struct {
lock *sync.RWMutex
commands map[string]*oPCommand
context context.Context
redisPool *redis.Pool
namespace string
stopChan chan struct{}
doneChan chan struct{}
}

//newOPCommands is constructor of OPCommands
func newOPCommands(ctx context.Context, ns string, redisPool *redis.Pool) *oPCommands {
return &oPCommands{
lock: new(sync.RWMutex),
commands: make(map[string]*oPCommand),
context: ctx,
redisPool: redisPool,
namespace: ns,
stopChan: make(chan struct{}, 1),
doneChan: make(chan struct{}, 1),
}
}

//Start the command sweeper
func (opc *oPCommands) Start() {
go opc.loop()
logger.Info("OP commands sweeper is started")
}

//Stop the command sweeper
func (opc *oPCommands) Stop() {
opc.stopChan <- struct{}{}
<-opc.doneChan
}

//Fire command
func (opc *oPCommands) Fire(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}

if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}

notification := &models.Message{
Event: EventFireCommand,
Data: []string{jobID, command},
}

rawJSON, err := json.Marshal(notification)
if err != nil {
return err
}

conn := opc.redisPool.Get()
defer conn.Close()

_, err = conn.Do("PUBLISH", utils.KeyPeriodicNotification(opc.namespace), rawJSON)

return err
}

//Push command into the list
func (opc *oPCommands) Push(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}

if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}

opc.lock.Lock()
defer opc.lock.Unlock()

opc.commands[jobID] = &oPCommand{
command: command,
fireTime: time.Now().Unix(),
}

return nil
}

//Pop out the command if existing
func (opc *oPCommands) Pop(jobID string) (string, bool) {
if utils.IsEmptyStr(jobID) {
return "", false
}

opc.lock.RLock()
defer opc.lock.RUnlock()

c, ok := opc.commands[jobID]
if ok {
if time.Unix(c.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, jobID)
return c.command, true
}
}

return "", false
}

func (opc *oPCommands) loop() {
defer func() {
logger.Info("OP commands is stopped")
opc.doneChan <- struct{}{}
}()

tk := time.NewTicker(commandSweepTickerTime)
defer tk.Stop()

for {
select {
case <-tk.C:
opc.sweepCommands()
case <-opc.context.Done():
return
case <-opc.stopChan:
return
}
}
}

func (opc *oPCommands) sweepCommands() {
opc.lock.Lock()
defer opc.lock.Unlock()

for k, v := range opc.commands {
if time.Unix(v.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, k)
}
}
}
64 changes: 20 additions & 44 deletions src/jobservice/opm/redis_job_stats_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type RedisJobStatsManager struct {
doneChan chan struct{}
processChan chan *queueItem
isRunning *atomic.Value
hookStore *HookStore //cache the hook here to avoid requesting backend
hookStore *HookStore //cache the hook here to avoid requesting backend
opCommands *oPCommands //maintain the OP commands
}

//NewRedisJobStatsManager is constructor of RedisJobStatsManager
Expand All @@ -74,6 +75,7 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r
processChan: make(chan *queueItem, processBufferSize),
hookStore: NewHookStore(),
isRunning: isRunning,
opCommands: newOPCommands(ctx, namespace, redisPool),
}
}

Expand All @@ -83,6 +85,7 @@ func (rjs *RedisJobStatsManager) Start() {
return
}
go rjs.loop()
rjs.opCommands.Start()
rjs.isRunning.Store(true)

logger.Info("Redis job stats manager is started")
Expand All @@ -97,6 +100,8 @@ func (rjs *RedisJobStatsManager) Shutdown() {
if !(rjs.isRunning.Load().(bool)) {
return
}

rjs.opCommands.Stop()
rjs.stopChan <- struct{}{}
<-rjs.doneChan
}
Expand Down Expand Up @@ -213,7 +218,12 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error
return errors.New("unknown command")
}

return rjs.writeCtlCommand(jobID, command)
if err := rjs.opCommands.Fire(jobID, command); err != nil {
return err
}

//Directly add to op commands maintaining list
return rjs.opCommands.Push(jobID, command)
}

//CheckIn mesage
Expand All @@ -239,7 +249,12 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
return "", errors.New("empty job ID")
}

return rjs.getCrlCommand(jobID)
c, ok := rjs.opCommands.Pop(jobID)
if !ok {
return "", fmt.Errorf("no OP command fired to job %s", jobID)
}

return c, nil
}

//DieAt marks the failed jobs with the time they put into dead queue.
Expand All @@ -262,7 +277,7 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return errors.New("empty job ID")
}

if utils.IsEmptyStr(hookURL) {
if !utils.IsValidURL(hookURL) {
return errors.New("invalid hook url")
}

Expand Down Expand Up @@ -302,7 +317,7 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
if !ok {
//Retrieve from backend
hookURL, err = rjs.getHook(jobID)
if err != nil {
if err != nil || !utils.IsValidURL(hookURL) {
//logged and exit
logger.Warningf("no status hook found for job %s\n, abandon status reporting", jobID)
return
Expand All @@ -328,45 +343,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
}

func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()

key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
cmd, err := redis.String(conn.Do("HGET", key, "command"))
if err != nil {
return "", err
}
//try to DEL it after getting the command
//Ignore the error,leave it as dirty data
_, err = conn.Do("DEL", key)
if err != nil {
//only logged
logger.Errorf("del key %s failed with error: %s\n", key, err)
}

return cmd, nil
}

func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error {
conn := rjs.redisPool.Get()
defer conn.Close()

key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "command", command, "fire_time", time.Now().Unix())
if err := conn.Send("HMSET", args...); err != nil {
return err
}

expireTime := 24*60*60 + rand.Int63n(10)
if err := conn.Send("EXPIRE", key, expireTime); err != nil {
return err
}

return conn.Flush()
}

func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
Expand Down
5 changes: 0 additions & 5 deletions src/jobservice/opm/redis_job_stats_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ func TestCommand(t *testing.T) {
t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd)
}
}

key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil {
t.Fatal(err)
}
}

func TestDieAt(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions src/jobservice/period/redis_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/vmware/harbor/src/jobservice/errs"

"github.com/robfig/cron"

"github.com/garyburd/redigo/redis"
Expand Down Expand Up @@ -156,6 +158,10 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
}

score, err := rps.getScoreByID(cronJobPolicyID)
if err == redis.ErrNil {
return errs.NoObjectFoundError(err.Error())
}

if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions src/jobservice/pool/message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (ms *MessageServer) Start() error {
dt, _ := json.Marshal(m.Data)
json.Unmarshal(dt, hookObject)
converted = hookObject
case opm.EventFireCommand:
//no need to convert []string
converted = m.Data
}
res := callback.Call([]reflect.Value{reflect.ValueOf(converted)})
e := res[0].Interface()
Expand Down

0 comments on commit 44d63fe

Please sign in to comment.