Skip to content

Commit

Permalink
move action_manager to parade
Browse files Browse the repository at this point in the history
document action_manager
add data to logs
  • Loading branch information
guy-har committed Oct 15, 2020
1 parent 1f38d24 commit 7d7d611
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 163 deletions.
143 changes: 0 additions & 143 deletions action/action_manager.go

This file was deleted.

15 changes: 7 additions & 8 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package export
import (
"encoding/json"
"fmt"
act "github.com/treeverse/lakefs/action"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/parade"
Expand Down Expand Up @@ -37,7 +36,7 @@ type TaskBody struct {
SourceID string
}

func (h *Handler) Handle(action string, body *string) act.HandlerResult {
func (h *Handler) Handle(action string, body *string) parade.HandlerResult {
var params TaskBody
lg := logging.Default().WithFields(logging.Fields{
"actor": actorName,
Expand All @@ -46,7 +45,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult {
err := json.Unmarshal([]byte(*body), &params)
if err != nil {
lg.WithError(err).Error("unmarshal failed")
return act.HandlerResult{
return parade.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
Expand All @@ -65,7 +64,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult {
err = h.adapter.Copy(sourcePointer, destinationPointer) // todo(guys): add wait for copy in handler
if err != nil {
lg.WithError(err).Error("copy failed")
return act.HandlerResult{
return parade.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
Expand All @@ -74,7 +73,7 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult {
err = h.adapter.Remove(destinationPointer)
if err != nil {
lg.WithError(err).Error("delete failed")
return act.HandlerResult{
return parade.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
Expand All @@ -83,20 +82,20 @@ func (h *Handler) Handle(action string, body *string) act.HandlerResult {
err = h.adapter.Put(destinationPointer, 0, strings.NewReader(""), block.PutOpts{})
if err != nil {
lg.WithError(err).Error("touch failed")
return act.HandlerResult{
return parade.HandlerResult{
Status: err.Error(),
StatusCode: parade.TaskInvalid,
}
}
//todo(guys): add cases for other actions or remove them from Actions function
default:
lg.Error("unknown action")
return act.HandlerResult{
return parade.HandlerResult{
Status: "UNKNOWN ACTION",
StatusCode: parade.TaskInvalid,
}
}
return act.HandlerResult{
return parade.HandlerResult{
Status: fmt.Sprintf("Completed"),
StatusCode: parade.TaskCompleted,
}
Expand Down
8 changes: 4 additions & 4 deletions export/export_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestExportHandler_Handle(t *testing.T) {
name: "copy on mem",
Action: actionCopy,
Body: TaskBody{
DestinationNamespace: "local://external-bucket",
DestinationNamespace: "mem://external-bucket",
DestinationID: "one/two",
SourceNamespace: "local://lakefs-buck",
SourceNamespace: "mem://lakefs-buck",
SourceID: "one/two",
},
blockstoreType: mem.BlockstoreType,
Expand All @@ -33,7 +33,7 @@ func TestExportHandler_Handle(t *testing.T) {
name: "delete on mem",
Action: actionDelete,
Body: TaskBody{
DestinationNamespace: "local://external-bucket",
DestinationNamespace: "mem://external-bucket",
DestinationID: "one/two",
},
blockstoreType: mem.BlockstoreType,
Expand All @@ -42,7 +42,7 @@ func TestExportHandler_Handle(t *testing.T) {
name: "touch on mem",
Action: actionTouch,
Body: TaskBody{
DestinationNamespace: "local://external-bucket",
DestinationNamespace: "mem://external-bucket",
DestinationID: "one/two",
},
blockstoreType: mem.BlockstoreType,
Expand Down
156 changes: 156 additions & 0 deletions parade/action_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package parade

import (
"github.com/google/uuid"
"github.com/treeverse/lakefs/logging"
"sync"
"time"
)

const (
defaultWorkers = 5
defaultChannelSize = 1000
defaultMaxTasks = 500
defaultWaitTime = time.Millisecond * 300
defaultErrWaitTime = time.Millisecond * 300
defaultMaxDuration = time.Minute * 30 // Todo(guys): change this
)

// ManagerProperties defines the configuration properties of an ActionManager
type ManagerProperties struct {
Workers int // number of goroutines handling tasks
ChannelSize int // size of the channel containing tasks for workers
MaxTasks int // max tasks requested in every ownTasks request
WaitTime *time.Duration // time to wait if OwnTasks returned no tasks.
ErrWaitTime *time.Duration // time to wait if OwnTasks returned err.
MaxDuration *time.Duration // maxDuration passed to parade.OwnTasks
}

// A ActionManager manages the process of requesting and returning tasks for a specific TaskHandler
// The manager requests tasks, sends the tasks to workers through a channel, the workers then handle the task and return it
type ActionManager struct {
properties *ManagerProperties
handler TaskHandler
parade Parade
quit chan struct{}
wp *workerPool
}

func setDefaultProperties(properties *ManagerProperties) *ManagerProperties {
if properties == nil {
properties = &ManagerProperties{}
}
if properties.Workers == 0 {
properties.Workers = defaultWorkers
}
if properties.ChannelSize == 0 {
properties.ChannelSize = defaultChannelSize
}
if properties.MaxTasks == 0 {
properties.MaxTasks = defaultMaxTasks
}
if properties.WaitTime == nil {
waitTime := defaultWaitTime
properties.WaitTime = &waitTime
}
if properties.ErrWaitTime == nil {
errWaitTime := defaultErrWaitTime
properties.ErrWaitTime = &errWaitTime
}
if properties.MaxDuration == nil {
maxDuration := defaultMaxDuration
properties.MaxDuration = &maxDuration
}
return properties
}

// NewActionManager initiates an ActionManager with workers and returns a
func NewActionManager(handler TaskHandler, parade Parade, properties *ManagerProperties) *ActionManager {
a := &ActionManager{
handler: handler,
parade: parade,
properties: setDefaultProperties(properties),
quit: nil,
}
a.start()
return a
}

func (a *ActionManager) Close() {
close(a.quit)
a.wp.Close()
}

func (a *ActionManager) start() {
taskChannel := make(chan OwnedTaskData, a.properties.ChannelSize)
a.quit = make(chan struct{})
a.wp = newWorkerPool(a.handler, taskChannel, a.parade, a.properties.Workers)
go func() {
for {
select {
case <-a.quit:
return
default:
ownedTasks, err := a.parade.OwnTasks(a.handler.Actor(), a.properties.MaxTasks, a.handler.Actions(), a.properties.MaxDuration)
if err != nil {
logging.Default().WithField("actor", a.handler.Actor()).Errorf("manager failed to receive tasks: %s", err)
time.Sleep(*a.properties.WaitTime)
}
for _, ot := range ownedTasks {
a.wp.ch <- ot
}
if len(ownedTasks) == 0 {
time.Sleep(*a.properties.WaitTime)
}
}
}
}()
}

type workerPool struct {
handler TaskHandler
ch chan OwnedTaskData
workers int
wg sync.WaitGroup
parade Parade
}

func newWorkerPool(handler TaskHandler, ch chan OwnedTaskData, parade Parade, workers int) *workerPool {
a := &workerPool{
handler: handler,
ch: ch,
workers: workers,
wg: sync.WaitGroup{},
parade: parade,
}
a.start()
return a
}

func (a *workerPool) Close() {
close(a.ch)
a.wg.Wait()
}

func (a *workerPool) start() {
a.wg.Add(a.workers)
for i := 0; i < a.workers; i++ {
go func() {
workerID := uuid.New()
defer a.wg.Done()
for task := range a.ch {
res := a.handler.Handle(task.Action, task.Body)
err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode)
if err != nil {
logging.Default().WithFields(logging.Fields{
"action": task.Action,
"task workerID": task.ID,
"status": res.Status,
"status code:": res.StatusCode,
"worker workerID": workerID,
}).Errorf("failed to return task: %w", err)
}
}
}()
}
}

0 comments on commit 7d7d611

Please sign in to comment.