Skip to content

Commit

Permalink
Fix issue of stopping periodic job
Browse files Browse the repository at this point in the history
improve op command by using cache
return 404 if no job found to stop
  • Loading branch information
steven-zou committed Apr 2, 2018
1 parent e4952b7 commit 6c6dbbe
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 56 deletions.
6 changes: 4 additions & 2 deletions src/jobservice/job/impl/demo_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
logger.Info("I'm finished, exit!")
fmt.Println("I'm finished, exit!")
}()
fmt.Println("I'm running")
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx)
Expand All @@ -81,11 +82,12 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error

//HOLD ON FOR A WHILE
logger.Error("Holding for 20 sec")
<-time.After(10 * time.Second)
<-time.After(15 * time.Second)
//logger.Fatal("I'm back, check if I'm stopped/cancelled")

if cmd, ok := ctx.OPCommand(); ok {
logger.Infof("cmd=%s\n", cmd)
fmt.Printf("Receive OP command: %s\n", cmd)
if cmd == opm.CtlCommandCancel {
logger.Info("exit for receiving cancel signal")
return errs.JobCancelledError()
Expand All @@ -95,7 +97,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
return errs.JobStoppedError()
}

fmt.Println("I'm here")
fmt.Println("I'm close to end")

return nil
}
166 changes: 166 additions & 0 deletions src/jobservice/opm/op_commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2018 The Harbor Authors. All rights reserved.

package opm

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/jobservice/logger"
"github.com/vmware/harbor/src/jobservice/models"
"github.com/vmware/harbor/src/jobservice/utils"
)

const (
commandValidTime = 5 * time.Minute
commandSweepTickerTime = 1 * time.Hour
//EventFireCommand for firing command event
EventFireCommand = "fire_command"
)

type oPCommand struct {
command string
fireTime int64
}

//oPCommands maintain commands list
type oPCommands struct {
lock *sync.RWMutex
commands map[string]*oPCommand
context context.Context
redisPool *redis.Pool
namespace string
stopChan chan struct{}
doneChan chan struct{}
}

//newOPCommands is constructor of OPCommands
func newOPCommands(ctx context.Context, ns string, redisPool *redis.Pool) *oPCommands {
return &oPCommands{
lock: new(sync.RWMutex),
commands: make(map[string]*oPCommand),
context: ctx,
redisPool: redisPool,
namespace: ns,
stopChan: make(chan struct{}, 1),
doneChan: make(chan struct{}, 1),
}
}

//Start the command sweeper
func (opc *oPCommands) Start() {
go opc.loop()
logger.Info("OP commands sweeper is started")
}

//Stop the command sweeper
func (opc *oPCommands) Stop() {
opc.stopChan <- struct{}{}
<-opc.doneChan
}

//Fire command
func (opc *oPCommands) Fire(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}

if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}

notification := &models.Message{
Event: EventFireCommand,
Data: []string{jobID, command},
}

rawJSON, err := json.Marshal(notification)
if err != nil {
return err
}

conn := opc.redisPool.Get()
defer conn.Close()

_, err = conn.Do("PUBLISH", utils.KeyPeriodicNotification(opc.namespace), rawJSON)

return err
}

//Push command into the list
func (opc *oPCommands) Push(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}

if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}

opc.lock.Lock()
defer opc.lock.Unlock()

opc.commands[jobID] = &oPCommand{
command: command,
fireTime: time.Now().Unix(),
}

return nil
}

//Pop out the command if existing
func (opc *oPCommands) Pop(jobID string) (string, bool) {
if utils.IsEmptyStr(jobID) {
return "", false
}

opc.lock.RLock()
defer opc.lock.RUnlock()

c, ok := opc.commands[jobID]
if ok {
if time.Unix(c.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, jobID)
return c.command, true
}
}

return "", false
}

func (opc *oPCommands) loop() {
defer func() {
logger.Info("OP commands is stopped")
opc.doneChan <- struct{}{}
}()

tk := time.NewTicker(commandSweepTickerTime)
defer tk.Stop()

for {
select {
case <-tk.C:
opc.sweepCommands()
case <-opc.context.Done():
return
case <-opc.stopChan:
return
}
}
}

func (opc *oPCommands) sweepCommands() {
opc.lock.Lock()
defer opc.lock.Unlock()

for k, v := range opc.commands {
if time.Unix(v.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, k)
}
}
}
64 changes: 20 additions & 44 deletions src/jobservice/opm/redis_job_stats_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type RedisJobStatsManager struct {
doneChan chan struct{}
processChan chan *queueItem
isRunning *atomic.Value
hookStore *HookStore //cache the hook here to avoid requesting backend
hookStore *HookStore //cache the hook here to avoid requesting backend
opCommands *oPCommands //maintain the OP commands
}

//NewRedisJobStatsManager is constructor of RedisJobStatsManager
Expand All @@ -74,6 +75,7 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r
processChan: make(chan *queueItem, processBufferSize),
hookStore: NewHookStore(),
isRunning: isRunning,
opCommands: newOPCommands(ctx, namespace, redisPool),
}
}

Expand All @@ -83,6 +85,7 @@ func (rjs *RedisJobStatsManager) Start() {
return
}
go rjs.loop()
rjs.opCommands.Start()
rjs.isRunning.Store(true)

logger.Info("Redis job stats manager is started")
Expand All @@ -97,6 +100,8 @@ func (rjs *RedisJobStatsManager) Shutdown() {
if !(rjs.isRunning.Load().(bool)) {
return
}

rjs.opCommands.Stop()
rjs.stopChan <- struct{}{}
<-rjs.doneChan
}
Expand Down Expand Up @@ -213,7 +218,12 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error
return errors.New("unknown command")
}

return rjs.writeCtlCommand(jobID, command)
if err := rjs.opCommands.Fire(jobID, command); err != nil {
return err
}

//Directly add to op commands maintaining list
return rjs.opCommands.Push(jobID, command)
}

//CheckIn mesage
Expand All @@ -239,7 +249,12 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
return "", errors.New("empty job ID")
}

return rjs.getCrlCommand(jobID)
c, ok := rjs.opCommands.Pop(jobID)
if !ok {
return "", fmt.Errorf("no OP command fired to job %s", jobID)
}

return c, nil
}

//DieAt marks the failed jobs with the time they put into dead queue.
Expand All @@ -262,7 +277,7 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return errors.New("empty job ID")
}

if utils.IsEmptyStr(hookURL) {
if !utils.IsValidURL(hookURL) {
return errors.New("invalid hook url")
}

Expand Down Expand Up @@ -302,7 +317,7 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
if !ok {
//Retrieve from backend
hookURL, err = rjs.getHook(jobID)
if err != nil {
if err != nil || !utils.IsValidURL(hookURL) {
//logged and exit
logger.Warningf("no status hook found for job %s\n, abandon status reporting", jobID)
return
Expand All @@ -328,45 +343,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
}

func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()

key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
cmd, err := redis.String(conn.Do("HGET", key, "command"))
if err != nil {
return "", err
}
//try to DEL it after getting the command
//Ignore the error,leave it as dirty data
_, err = conn.Do("DEL", key)
if err != nil {
//only logged
logger.Errorf("del key %s failed with error: %s\n", key, err)
}

return cmd, nil
}

func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error {
conn := rjs.redisPool.Get()
defer conn.Close()

key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "command", command, "fire_time", time.Now().Unix())
if err := conn.Send("HMSET", args...); err != nil {
return err
}

expireTime := 24*60*60 + rand.Int63n(10)
if err := conn.Send("EXPIRE", key, expireTime); err != nil {
return err
}

return conn.Flush()
}

func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
Expand Down
5 changes: 0 additions & 5 deletions src/jobservice/opm/redis_job_stats_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ func TestCommand(t *testing.T) {
t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd)
}
}

key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil {
t.Fatal(err)
}
}

func TestDieAt(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions src/jobservice/period/redis_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/vmware/harbor/src/jobservice/errs"

"github.com/robfig/cron"

"github.com/garyburd/redigo/redis"
Expand Down Expand Up @@ -156,6 +158,10 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
}

score, err := rps.getScoreByID(cronJobPolicyID)
if err == redis.ErrNil {
return errs.NoObjectFoundError(err.Error())
}

if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions src/jobservice/pool/message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (ms *MessageServer) Start() error {
dt, _ := json.Marshal(m.Data)
json.Unmarshal(dt, hookObject)
converted = hookObject
case opm.EventFireCommand:
//no need to convert []string
converted = m.Data
}
res := callback.Call([]reflect.Value{reflect.ValueOf(converted)})
e := res[0].Interface()
Expand Down

0 comments on commit 6c6dbbe

Please sign in to comment.