Skip to content

Commit

Permalink
Add storage layer to handle time-series data (#105)
Browse files Browse the repository at this point in the history
* Add storage layer to handle time-series data

* Fix test
  • Loading branch information
nakabonne committed Jun 1, 2021
1 parent b651fdb commit e3f49a4
Show file tree
Hide file tree
Showing 18 changed files with 487 additions and 319 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ jobs:
test:
strategy:
matrix:
go-version: [1.14.x, 1.15.x]
platform: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.platform }}
steps:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
go-version: 1.16
- name: Checkout code
uses: actions/checkout@v2
- name: Build
Expand Down
48 changes: 29 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# ali
[![codecov.io Code Coverage](https://img.shields.io/codecov/c/github/nakabonne/ali.svg)](https://codecov.io/github/nakabonne/ali?branch=master)
[![Release](https://img.shields.io/github/release/nakabonne/ali.svg?color=orange)](https://github.com/nakabonne/ali/releases/latest)
[![Go Doc](https://img.shields.io/badge/godoc-reference-blue.svg)](http://godoc.org/github.com/nakabonne/ali)
[![Go Reference](https://pkg.go.dev/badge/github.com/nakabonne/ali.svg)](https://pkg.go.dev/github.com/nakabonne/ali)

A load testing tool capable of performing real-time analysis, inspired by [vegeta](https://github.com/tsenart/vegeta) and [jplot](https://github.com/rs/jplot).

Expand Down Expand Up @@ -68,23 +67,34 @@ Usage:
ali [flags] <target URL>
Flags:
-b, --body string A request body to be sent.
-B, --body-file string The path to file whose content will be set as the http request body.
-c, --connections int Amount of maximum open idle connections per target host (default 10000)
--debug Run in debug mode.
-d, --duration duration The amount of time to issue requests to the targets. Give 0s for an infinite attack. (default 10s)
-H, --header strings A request header to be sent. Can be used multiple times to send multiple headers.
--local-addr string Local IP address. (default "0.0.0.0")
-M, --max-body int Max bytes to capture from response bodies. Give -1 for no limit. (default -1)
-W, --max-workers uint Amount of maximum workers to spawn. (default 18446744073709551615)
-m, --method string An HTTP request method for each request. (default "GET")
--no-http2 Don't issue HTTP/2 requests to servers which support it.
-K, --no-keepalive Don't use HTTP persistent connection.
-r, --rate int The request rate per second to issue against the targets. Give 0 then it will send requests as fast as possible. (default 50)
--resolvers string Custom DNS resolver addresses; comma-separated list.
-t, --timeout duration The timeout for each request. 0s means to disable timeouts. (default 30s)
-v, --version Print the current version.
-w, --workers uint Amount of initial workers to spawn. (default 10)
-b, --body string A request body to be sent.
-B, --body-file string The path to file whose content will be set as the http request body.
--cacert string PEM ca certificate file
--cert string PEM encoded tls certificate file to use
-c, --connections int Amount of maximum open idle connections per target host (default 10000)
--debug Run in debug mode.
-d, --duration duration The amount of time to issue requests to the targets. Give 0s for an infinite attack. (default 10s)
-H, --header stringArray A request header to be sent. Can be used multiple times to send multiple headers.
--insecure Skip TLS verification
--key string PEM encoded tls private key file to use
--local-addr string Local IP address. (default "0.0.0.0")
-M, --max-body int Max bytes to capture from response bodies. Give -1 for no limit. (default -1)
-W, --max-workers uint Amount of maximum workers to spawn. (default 18446744073709551615)
-m, --method string An HTTP request method for each request. (default "GET")
--no-http2 Don't issue HTTP/2 requests to servers which support it.
-K, --no-keepalive Don't use HTTP persistent connection.
--query-range duration The time range to display data points on the UI (default 1m0s)
-r, --rate int The request rate per second to issue against the targets. Give 0 then it will send requests as fast as possible. (default 50)
--resolvers string Custom DNS resolver addresses; comma-separated list.
-t, --timeout duration The timeout for each request. 0s means to disable timeouts. (default 30s)
-v, --version Print the current version.
-w, --workers uint Amount of initial workers to spawn. (default 10)
Examples:
ali --duration=10m --rate=100 http://host.xz
Author:
Ryo Nakao <ryo@nakao.dev>
```

Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
Expand Down
145 changes: 111 additions & 34 deletions attacker/attacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"math"
"net"
"net/http"
"time"

vegeta "github.com/tsenart/vegeta/v12/lib"

"github.com/nakabonne/ali/storage"
)

const (
Expand All @@ -25,11 +29,6 @@ const (

var DefaultLocalAddr = net.IPAddr{IP: net.IPv4zero}

type Attacker interface {
Attack(vegeta.Targeter, vegeta.Pacer, time.Duration, string) <-chan *vegeta.Result
Stop()
}

// Options provides optional settings to attack.
type Options struct {
Rate int
Expand All @@ -52,25 +51,30 @@ type Options struct {
CACertificatePool *x509.CertPool
TLSCertificates []tls.Certificate

Attacker Attacker
Attacker backedAttacker
}

// Result contains the results of a single HTTP request.
type Result struct {
Latency time.Duration

P50 time.Duration
P90 time.Duration
P95 time.Duration
P99 time.Duration
type Attacker interface {
// Attack keeps the request running for the specified period of time.
// Results are sent to the given channel as soon as they arrive.
// When the attack is over, it gives back final statistics.
// TODO: Use storage instead of metricsCh
Attack(ctx context.Context, metricsCh chan *Metrics)

// Rate gives back the rate set to itself.
Rate() int
// Rate gives back the duration set to itself.
Duration() time.Duration
// Rate gives back the method set to itself.
Method() string
}

// Attack keeps the request running for the specified period of time.
// Results are sent to the given channel as soon as they arrive.
// When the attack is over, it gives back final statistics.
func Attack(ctx context.Context, target string, resCh chan<- *Result, metricsCh chan *Metrics, opts Options) {
func NewAttacker(storage storage.Writer, target string, opts *Options) (Attacker, error) {
if target == "" {
return
return nil, fmt.Errorf("target is required")
}
if opts == nil {
opts = &Options{}
}
if opts.Method == "" {
opts.Method = DefaultMethod
Expand Down Expand Up @@ -114,38 +118,111 @@ func Attack(ctx context.Context, target string, resCh chan<- *Result, metricsCh
vegeta.TLSConfig(tlsConfig),
)
}
return &attacker{
target: target,
rate: opts.Rate,
duration: opts.Duration,
timeout: opts.Timeout,
method: opts.Method,
body: opts.Body,
maxBody: opts.MaxBody,
header: opts.Header,
workers: opts.Workers,
maxWorkers: opts.MaxWorkers,
keepAlive: opts.KeepAlive,
connections: opts.Connections,
http2: opts.HTTP2,
localAddr: opts.LocalAddr,
buckets: opts.Buckets,
resolvers: opts.Resolvers,
insecureSkipVerify: opts.InsecureSkipVerify,
caCertificatePool: opts.CACertificatePool,
tlsCertificates: opts.TLSCertificates,
attacker: opts.Attacker,
storage: storage,
}, nil
}

type backedAttacker interface {
Attack(vegeta.Targeter, vegeta.Pacer, time.Duration, string) <-chan *vegeta.Result
Stop()
}

rate := vegeta.Rate{Freq: opts.Rate, Per: time.Second}
type attacker struct {
target string
rate int
duration time.Duration
timeout time.Duration
method string
body []byte
maxBody int64
header http.Header
workers uint64
maxWorkers uint64
keepAlive bool
connections int
http2 bool
localAddr net.IPAddr
buckets []time.Duration
resolvers []string
insecureSkipVerify bool
caCertificatePool *x509.CertPool
tlsCertificates []tls.Certificate

attacker backedAttacker
storage storage.Writer
}

func (a *attacker) Attack(ctx context.Context, metricsCh chan *Metrics) {
rate := vegeta.Rate{Freq: a.rate, Per: time.Second}
targeter := vegeta.NewStaticTargeter(vegeta.Target{
Method: opts.Method,
URL: target,
Body: opts.Body,
Header: opts.Header,
Method: a.method,
URL: a.target,
Body: a.body,
Header: a.header,
})

metrics := &vegeta.Metrics{}
if len(opts.Buckets) > 0 {
metrics.Histogram = &vegeta.Histogram{Buckets: opts.Buckets}
if len(a.buckets) > 0 {
metrics.Histogram = &vegeta.Histogram{Buckets: a.buckets}
}

for res := range opts.Attacker.Attack(targeter, rate, opts.Duration, "main") {
for res := range a.attacker.Attack(targeter, rate, a.duration, "main") {
select {
case <-ctx.Done():
opts.Attacker.Stop()
a.attacker.Stop()
return
default:
metrics.Add(res)
m := newMetrics(metrics)
resCh <- &Result{
Latency: res.Latency,
P50: m.Latencies.P50,
P90: m.Latencies.P90,
P95: m.Latencies.P95,
P99: m.Latencies.P99,
err := a.storage.Insert(&storage.Result{
Code: res.Code,
Timestamp: res.Timestamp,
Latency: res.Latency,
P50: m.Latencies.P50,
P90: m.Latencies.P90,
P95: m.Latencies.P95,
P99: m.Latencies.P99,
})
if err != nil {
log.Printf("failed to insert results")
continue
}
metricsCh <- m
}
}
metrics.Close()
metricsCh <- newMetrics(metrics)
}

func (a *attacker) Rate() int {
return a.rate
}

func (a *attacker) Duration() time.Duration {
return a.duration
}

func (a *attacker) Method() string {
return a.method
}
65 changes: 34 additions & 31 deletions attacker/attacker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,61 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
vegeta "github.com/tsenart/vegeta/v12/lib"
"go.uber.org/goleak"

"github.com/nakabonne/ali/storage"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestNewAttacker(t *testing.T) {
tests := []struct {
name string
target string
opts Options
wantErr bool
}{
{
name: "no target given",
target: "",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewAttacker(&storage.FakeStorage{}, tt.target, &tt.opts)
assert.Equal(t, tt.wantErr, err != nil)
})
}
}

func TestAttack(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tests := []struct {
name string
target string
opts Options
want *Metrics
wantResCount int
name string
target string
opts Options
wantErr bool
}{
{
name: "no target given",
target: "",
want: nil,
},
{
name: "no result given back",
target: "http://host.xz",
opts: Options{
Attacker: &fakeAttacker{},
},
want: &Metrics{
StatusCodes: make(map[string]int),
Errors: []string{},
Attacker: &fakeBackedAttacker{},
},
wantResCount: 0,
},
{
name: "two result given back",
target: "http://host.xz",
opts: Options{
Attacker: &fakeAttacker{
Attacker: &fakeBackedAttacker{
results: []*vegeta.Result{
{
Code: 200,
Expand All @@ -56,26 +70,15 @@ func TestAttack(t *testing.T) {
},
},
},
want: &Metrics{
Requests: 2,
Rate: 2,
Throughput: 2,
Success: 1,
StatusCodes: map[string]int{
"200": 2,
},
Errors: []string{},
},
wantResCount: 2,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resCh := make(chan *Result, 100)
a, err := NewAttacker(&storage.FakeStorage{}, tt.target, &tt.opts)
require.NoError(t, err)
metricsCh := make(chan *Metrics, 100)
Attack(ctx, tt.target, resCh, metricsCh, tt.opts)
assert.Equal(t, tt.wantResCount, len(resCh))
a.Attack(ctx, metricsCh)
})
}
}
Loading

0 comments on commit e3f49a4

Please sign in to comment.