Skip to content

Commit

Permalink
Wait for results before exiting from signal
Browse files Browse the repository at this point in the history
Previously the attack command would not wait for in-flight requests to
finish before exiting from an interrupt signal. In the case where all
requests take longer than the attack duration then the output file will
be empty and reporting on it will produce an obscure error:

    % echo "GET http://172.18.0.254/will/timeout" | time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && fg && vegeta report vegeta.out
    [1] 12347 12348
    [1]  + 12347 done       echo "GET http://172.18.0.254/will/timeout" |
           12348 running    time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out
    vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out  0.00s user 0.01s system 0% cpu 1.075 total
    2023/01/11 21:35:50 encode: can't detect encoding of "vegeta.out"

By omitting the return on the first call to `Stop()` we can use the
results channel to block the exit until the attack has finished:

    % echo "GET http://172.18.0.254/will/timeout" | time ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && fg && ./vegeta report vegeta.out
    [1] 12433 12434
    [1]  + 12433 done       echo "GET http://172.18.0.254/will/timeout" |
           12434 running    time ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out
    ./vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out  0.00s user 0.01s system 0% cpu 11.012 total
    Requests      [total, rate, throughput]         1, 1.00, 0.00
    Duration      [total, attack, wait]             10.001s, 0s, 10.001s
    Latencies     [min, mean, 50, 90, 95, 99, max]  10.001s, 10.001s, 10.001s, 10.001s, 10.001s, 10.001s, 10.001s
    Bytes In      [total, mean]                     0, 0.00
    Bytes Out     [total, mean]                     0, 0.00
    Success       [ratio]                           0.00%
    Status Codes  [code:count]                      0:1
    Error Set:
    Get "http://172.18.0.254/will/timeout": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

A subsequent interrupt signal (ie. `^C^C`) is honoured if you want to
force an immediate exit:

    % echo "GET http://172.18.0.254/will/timeout" | time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out & sleep 1 && pkill -2 vegeta && pkill -2 vegeta && fg
    [1] 12073 12074
    vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out  0.00s user 0.01s system 1% cpu 1.057 total
    [1]  + 12073 done       echo "GET http://172.18.0.254/will/timeout" |
           12074 done       time vegeta attack -rate 1 -duration 0 -timeout 10s -output vegeta.out

Testing this required a refactor of `attack()` in order to pass our own
signal channel in. The diff is fortunately pretty simple though. Like
most simple changes and async code, the majority of the changeset is
testing it.

Closes #611

Signed-off-by: Tomás Senart <tsenart@gmail.com>
  • Loading branch information
dcarley authored and tsenart committed Jul 15, 2023
1 parent 3e67d01 commit c820272
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 7 deletions.
17 changes: 14 additions & 3 deletions attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,27 @@ func attack(opts *attackOpts) (err error) {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)

return processAttack(atk, res, enc, sig)
}

func processAttack(
atk *vegeta.Attacker,
res <-chan *vegeta.Result,
enc vegeta.Encoder,
sig <-chan os.Signal,
) error {
for {
select {
case <-sig:
atk.Stop()
return nil
if stopSent := atk.Stop(); !stopSent {
// Exit immediately on second signal.
return nil
}
case r, ok := <-res:
if !ok {
return nil
}
if err = enc.Encode(r); err != nil {
if err := enc.Encode(r); err != nil {
return err
}
}
Expand Down
130 changes: 130 additions & 0 deletions attack_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package main

import (
"bufio"
"bytes"
"io"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sync"
"testing"
"time"

vegeta "github.com/tsenart/vegeta/v12/lib"
)

func TestHeadersSet(t *testing.T) {
Expand All @@ -26,3 +35,124 @@ func TestHeadersSet(t *testing.T) {
}
}
}

func decodeMetrics(buf bytes.Buffer) (vegeta.Metrics, error) {
var metrics vegeta.Metrics
dec := vegeta.NewDecoder(bufio.NewReader(&buf))

for {
var r vegeta.Result
if err := dec.Decode(&r); err != nil {
if err == io.EOF {
break
}
return metrics, err
}
metrics.Add(&r)
}
metrics.Close()

return metrics, nil
}

func TestAttackSignalOnce(t *testing.T) {
t.Parallel()

const (
signalDelay = 300 * time.Millisecond // Delay before stopping.
clientTimeout = 1 * time.Second // This, plus delay, is the max time for the attack.
serverTimeout = 2 * time.Second // Must be more than clientTimeout.
attackDuration = 10 * time.Second // The attack should never take this long.
)

server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(serverTimeout) // Server.Close() will block for this long on shutdown.
}),
)
defer server.Close()

tr := vegeta.NewStaticTargeter(vegeta.Target{Method: "GET", URL: server.URL})
atk := vegeta.NewAttacker(vegeta.Timeout(clientTimeout))
rate := vegeta.Rate{Freq: 10, Per: time.Second} // Every 100ms.

var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
enc := vegeta.NewEncoder(writer)
sig := make(chan os.Signal, 1)
res := atk.Attack(tr, rate, attackDuration, "")

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
processAttack(atk, res, enc, sig)
}()

// Allow more than one request to have started before stopping.
time.Sleep(signalDelay)
sig <- os.Interrupt
wg.Wait()
writer.Flush()

metrics, err := decodeMetrics(buf)
if err != nil {
t.Error(err)
}
if got, min := metrics.Requests, uint64(2); got < min {
t.Errorf("not enough requests recorded. got %+v, min: %+v", got, min)
}
if got, want := metrics.Success, 0.0; got != want {
t.Errorf("all requests should fail. got %+v, want: %+v", got, want)
}
if got, max := metrics.Duration, clientTimeout; got > max {
t.Errorf("attack duration too long. got %+v, max: %+v", got, max)
}
if got, want := metrics.Wait.Round(time.Second), clientTimeout; got != want {
t.Errorf("attack wait doesn't match timeout. got %+v, want: %+v", got, want)
}
}

func TestAttackSignalTwice(t *testing.T) {
t.Parallel()

const (
attackDuration = 10 * time.Second // The attack should never take this long.
)

server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
)
defer server.Close()

tr := vegeta.NewStaticTargeter(vegeta.Target{Method: "GET", URL: server.URL})
atk := vegeta.NewAttacker()
rate := vegeta.Rate{Freq: 1, Per: time.Second}

var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
enc := vegeta.NewEncoder(writer)
sig := make(chan os.Signal, 1)
res := atk.Attack(tr, rate, attackDuration, "")

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
processAttack(atk, res, enc, sig)
}()

// Exit as soon as possible.
sig <- os.Interrupt
sig <- os.Interrupt
wg.Wait()
writer.Flush()

metrics, err := decodeMetrics(buf)
if err != nil {
t.Error(err)
}
if got, max := metrics.Duration, time.Second; got > max {
t.Errorf("attack duration too long. got %+v, max: %+v", got, max)
}
}
14 changes: 10 additions & 4 deletions lib/attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Attacker struct {
dialer *net.Dialer
client http.Client
stopch chan struct{}
stopOnce sync.Once
workers uint64
maxWorkers uint64
maxBody int64
Expand Down Expand Up @@ -68,6 +69,7 @@ var (
func NewAttacker(opts ...func(*Attacker)) *Attacker {
a := &Attacker{
stopch: make(chan struct{}),
stopOnce: sync.Once{},
workers: DefaultWorkers,
maxWorkers: DefaultMaxWorkers,
maxBody: DefaultMaxBody,
Expand Down Expand Up @@ -325,13 +327,17 @@ func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <
return results
}

// Stop stops the current attack.
func (a *Attacker) Stop() {
// Stop stops the current attack. The return value indicates whether this call
// has signalled the attack to stop (`true` for the first call) or whether it
// was a noop because it has been previously signalled to stop (`false` for any
// subsequent calls).
func (a *Attacker) Stop() bool {
select {
case <-a.stopch:
return
return false
default:
close(a.stopch)
a.stopOnce.Do(func() { close(a.stopch) })

This comment has been minimized.

Copy link
@tsenart

tsenart Jul 15, 2023

Owner

@dcarley: Added this sync.Once to prevent two racing go-routines to double close the the channel and panicking.

return true
}
}

Expand Down

0 comments on commit c820272

Please sign in to comment.