Skip to content

Commit

Permalink
Merge f1b8ba6 into 5cbc66e
Browse files Browse the repository at this point in the history
  • Loading branch information
nitishm committed Feb 24, 2019
2 parents 5cbc66e + f1b8ba6 commit 99ac426
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ DATE=$(shell date +'%FT%TZ%z')
SERVER_DIR = cmd/server
GO = GO111MODULE=on go




all: fmt lint build test

build: deps fmt
Expand All @@ -31,7 +28,7 @@ install:
$(shell ./scripts/make-install.sh)

test:
${GO} test -v -race -coverprofile=coverage.txt -covermode=atomic ./...
${GO} test -v -race -coverprofile=coverage.txt -p 1 -covermode=atomic ./...
${GO} test -covermode=count -coverprofile=profile.cov ./...

fmt:
Expand All @@ -50,7 +47,6 @@ ineffassign:
run: build
$(shell bin/vegeta-server --scheme=http --host=localhost --port=8000)


container:
docker build -t vegeta-server:latest .

Expand Down
5 changes: 5 additions & 0 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (d *dispatcher) Run(quit chan struct{}) {
}
d.log(fields).Debug("received update for attack")
case <-quit:
d.mu.RLock()
for _, task := range d.tasks {
task.Cancel()
}
d.mu.RUnlock()
d.log(nil).Warning("gracefully shutting down the dispatcher")
return
}
Expand Down
2 changes: 0 additions & 2 deletions internal/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !race

package dispatcher

import (
Expand Down
57 changes: 46 additions & 11 deletions internal/dispatcher/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatcher
import (
"bytes"
"fmt"
"sync"
"time"

uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -61,6 +62,7 @@ type UpdateMessage struct {
}

type task struct {
mu sync.RWMutex
id string
params models.AttackParams
status models.AttackStatus
Expand All @@ -77,6 +79,7 @@ type task struct {
func NewTask(updateCh chan UpdateMessage, params models.AttackParams) *task { //nolint: golint
id := uuid.NewV4().String()
t := &task{
sync.RWMutex{},
id,
params,
models.AttackResponseStatusScheduled,
Expand All @@ -96,15 +99,20 @@ func NewTask(updateCh chan UpdateMessage, params models.AttackParams) *task { //

// Run an attack task using the passed in attack function
func (t *task) Run(fn AttackFunc) error {
if t.status != models.AttackResponseStatusScheduled {
return fmt.Errorf("cannot run task %s with status %s", t.id, t.status)
status := t.Status()
id := t.ID()

if status != models.AttackResponseStatusScheduled {
return fmt.Errorf("cannot run task %s with status %s", id, status)
}

t.log(nil).Debug("running")

go run(t, fn) //nolint: errcheck

t.mu.Lock()
t.status = models.AttackResponseStatusRunning
t.mu.Unlock()

t.SendUpdate()

Expand All @@ -113,17 +121,22 @@ func (t *task) Run(fn AttackFunc) error {

// Complete marks a task as completed
func (t *task) Complete(result io.Reader) error {
if t.status != models.AttackResponseStatusRunning {
return fmt.Errorf("cannot mark completed for task %s with status %s", t.id, t.status)
status := t.Status()
id := t.ID()

if status != models.AttackResponseStatusRunning {
return fmt.Errorf("cannot mark completed for task %s with status %s", id, status)
}

buf, err := ioutil.ReadAll(result)
if err != nil {
return err
}

t.mu.Lock()
t.status = models.AttackResponseStatusCompleted
t.result = bytes.NewBuffer(buf)
t.mu.Unlock()

t.SendUpdate()

Expand All @@ -134,13 +147,17 @@ func (t *task) Complete(result io.Reader) error {

// Cancel invokes the context cancel and marks a task as canceled
func (t *task) Cancel() error {
if t.status == models.AttackResponseStatusCompleted || t.status == models.AttackResponseStatusFailed || t.status == models.AttackResponseStatusCanceled { // nolint: lll
return fmt.Errorf("cannot cancel task %s with status %s", t.id, t.status)
status := t.Status()
id := t.ID()

if status == models.AttackResponseStatusCompleted || status == models.AttackResponseStatusFailed || status == models.AttackResponseStatusCanceled { // nolint: lll
return fmt.Errorf("cannot cancel task %s with status %s", id, status)
}

t.mu.Lock()
t.quit <- struct{}{}

t.status = models.AttackResponseStatusCanceled
t.mu.Unlock()

t.SendUpdate()

Expand All @@ -151,7 +168,9 @@ func (t *task) Cancel() error {

// Fail marks a task as failed
func (t *task) Fail() error {
t.mu.Lock()
t.status = models.AttackResponseStatusFailed
t.mu.Unlock()

t.SendUpdate()

Expand All @@ -161,40 +180,56 @@ func (t *task) Fail() error {

// SendUpdate to send a status update on the update channel
func (t *task) SendUpdate() {
t.mu.Lock()
t.updatedAt = time.Now()
t.mu.Unlock()

t.updateCh <- UpdateMessage{
t.id,
t.status,
t.ID(),
t.Status(),
}
}

// ID returns the task identifier
func (t *task) ID() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.id

}

// Status returns the latest task status
func (t *task) Status() models.AttackStatus {
t.mu.RLock()
defer t.mu.RUnlock()
return t.status
}

// Params returns a the configured attack params
func (t *task) Params() models.AttackParams {
t.mu.RLock()
defer t.mu.RUnlock()
return t.params
}

// CreatedAt returns the created at timestamp
func (t *task) CreatedAt() time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return t.createdAt
}

// UpdatedAt returns the created at timestamp
func (t *task) UpdatedAt() time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return t.updatedAt
}

// Result returns the result as a io.Reader
func (t *task) Result() io.Reader {
t.mu.RLock()
defer t.mu.RUnlock()
return t.result
}

Expand All @@ -221,8 +256,8 @@ func (t *task) log(fields map[string]interface{}) *log.Entry {
l := log.WithField("component", "task")

l = l.WithFields(log.Fields{
"ID": t.id,
"Status": t.status,
"ID": t.ID(),
"Status": t.Status(),
})

if fields != nil {
Expand Down

0 comments on commit 99ac426

Please sign in to comment.