-
Notifications
You must be signed in to change notification settings - Fork 63
/
provider.go
141 lines (129 loc) · 3.31 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package uripost
import (
"bufio"
"bytes"
"context"
"io"
"net/http"
"strconv"
"github.com/pkg/errors"
"github.com/spf13/afero"
"github.com/yandex/pandora/components/phttp/ammo/simple"
"github.com/yandex/pandora/lib/confutil"
"go.uber.org/zap"
)
func filePosition(file afero.File) (position int64) {
position, _ = file.Seek(0, io.SeekCurrent)
return
}
type Config struct {
File string `validate:"required"`
// Limit limits total num of ammo. Unlimited if zero.
Limit int `validate:"min=0"`
// Additional HTTP headers
Headers []string
// Passes limits ammo file passes. Unlimited if zero.
Passes int `validate:"min=0"`
ChosenCases []string
}
func NewProvider(fs afero.Fs, conf Config) *Provider {
var p Provider
p = Provider{
Provider: simple.NewProvider(fs, conf.File, p.start),
Config: conf,
}
return &p
}
type Provider struct {
simple.Provider
Config
log *zap.Logger
}
func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
var passNum int
var ammoNum int
// var key string
// var val string
var bodySize int
var uri string
var tag string
header := make(http.Header)
// parse and prepare Headers from config
decodedConfigHeaders, err := simple.DecodeHTTPConfigHeaders(p.Config.Headers)
if err != nil {
return err
}
for {
passNum++
reader := bufio.NewReader(ammoFile)
for p.Limit == 0 || ammoNum < p.Limit {
data, isPrefix, err := reader.ReadLine()
if isPrefix {
return errors.Errorf("too long header in ammo at position %v", filePosition(ammoFile))
}
if err == io.EOF {
break // start over from the beginning
}
if err != nil {
return errors.Wrapf(err, "reading ammo failed at position: %v", filePosition(ammoFile))
}
if len(data) == 0 {
continue // skip empty lines
}
data = bytes.TrimSpace(data)
if data[0] == '[' {
key, val, err := decodeHeader(data)
if err == nil {
header.Set(key, val)
}
continue
}
if _, err := strconv.Atoi(string(data[0])); err == nil {
bodySize, uri, tag, _ = decodeURI(data)
}
if bodySize == 0 {
break // start over from the beginning of file if ammo size is 0
}
if !confutil.IsChosenCase(tag, p.Config.ChosenCases) {
continue
}
buff := make([]byte, bodySize)
if n, err := io.ReadFull(reader, buff); err != nil {
return errors.Wrapf(err, "failed to read ammo at position: %v; tried to read: %v; have read: %v", filePosition(ammoFile), bodySize, n)
}
req, err := http.NewRequest("POST", uri, bytes.NewReader(buff))
if err != nil {
return errors.Wrapf(err, "failed to decode ammo at position: %v; data: %q", filePosition(ammoFile), buff)
}
for k, v := range header {
// http.Request.Write sends Host header based on req.URL.Host
if k == "Host" {
req.Host = v[0]
} else {
req.Header[k] = v
}
}
// add new Headers to request from config
simple.EnrichRequestWithHeaders(req, decodedConfigHeaders)
sh := p.Pool.Get().(*simple.Ammo)
sh.Reset(req, tag)
select {
case p.Sink <- sh:
ammoNum++
case <-ctx.Done():
return ctx.Err()
}
}
if ammoNum == 0 {
return errors.New("no ammo in file")
}
if p.Passes != 0 && passNum >= p.Passes {
break
}
_, err := ammoFile.Seek(0, 0)
if err != nil {
p.log.Info("Failed to seek ammo file", zap.Error(err))
}
}
return nil
}