Skip to content

Commit 7c28db6

Browse files
committed
robusthttp: use flakyhttp package for fault injection
e.g.: mkdir /tmp/flakyhttp robustirc-localnet -port=13001 -- -flakyhttp_rules_path=/tmp/flakyhttp/rules # monitor the log files using tail -f stderr.txt cd /tmp/flakyhttp echo 'peeraddr=localhost:13001 dest=localhost:13002 stage=request rate=100%' > rules
1 parent aab1120 commit 7c28db6

File tree

5 files changed

+453
-1
lines changed

5 files changed

+453
-1
lines changed

flakyhttp/flakyhttp.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package flakyhttp
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/ioutil"
7+
"log"
8+
"math/rand"
9+
"net"
10+
"net/http"
11+
"os"
12+
"path/filepath"
13+
"sort"
14+
"strconv"
15+
"strings"
16+
"sync"
17+
"time"
18+
19+
"github.com/rjeczalik/notify"
20+
)
21+
22+
const debug = false
23+
24+
type ConfigChangeAwaiter struct {
25+
rt *RoundTripper
26+
oldRevision uint64
27+
}
28+
29+
func (a *ConfigChangeAwaiter) Await(ctx context.Context) error {
30+
// TODO(later): obey context (trigger spurious wake-up when it expires?)
31+
a.rt.configMu.Lock()
32+
for a.rt.configRevision == a.oldRevision {
33+
a.rt.configCond.Wait()
34+
}
35+
a.rt.configMu.Unlock()
36+
return nil
37+
}
38+
39+
type pair struct{ key, value string }
40+
41+
type configRule struct{ pairs []pair }
42+
43+
type RoundTripper struct {
44+
rulesPath string
45+
fixedPairs []pair
46+
47+
Underlying *http.Transport
48+
49+
configMu sync.Mutex
50+
configCond *sync.Cond
51+
configRevision uint64
52+
rules []configRule
53+
54+
notifyChan chan notify.EventInfo
55+
}
56+
57+
// best practice: failureMapPath should live in its own directory without any siblings (so that inotify on the directory does not get spurious events from other changes)
58+
//
59+
// The specified pairs allow for application-specific attributes. Each pair is
60+
// specified in format <key>=<value>. The key can contain anything but an equals
61+
// sign.
62+
func NewRoundTripper(rulesPath string, pairs ...string) (*RoundTripper, error) {
63+
tr := http.DefaultTransport.(*http.Transport).Clone()
64+
// Same values as http.DefaultTransport uses:
65+
dialer := &net.Dialer{
66+
Timeout: 30 * time.Second,
67+
KeepAlive: 30 * time.Second,
68+
DualStack: true,
69+
}
70+
var fixedPairs []pair
71+
for _, p := range pairs {
72+
idx := strings.IndexByte(p, '=')
73+
if idx == -1 {
74+
return nil, fmt.Errorf("malformed pair: expected format key=value")
75+
}
76+
fixedPairs = append(fixedPairs, pair{key: p[:idx], value: p[idx+1:]})
77+
}
78+
rt := &RoundTripper{
79+
fixedPairs: fixedPairs,
80+
rulesPath: rulesPath,
81+
Underlying: tr,
82+
notifyChan: make(chan notify.EventInfo, 1),
83+
}
84+
rt.configCond = sync.NewCond(&rt.configMu)
85+
tr.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
86+
if debug {
87+
log.Printf("dial(%v, %v)", network, addr)
88+
}
89+
if rt.failDial(network, addr) {
90+
return nil, fmt.Errorf("dial failed by flakyhttp")
91+
}
92+
return dialer.DialContext(ctx, network, addr)
93+
}
94+
events := []notify.Event{
95+
notify.InCloseWrite, // ioutil.WriteFile (write in-place)
96+
notify.InMovedTo, // renameio.WriteFile (write with temporary file)
97+
}
98+
if err := notify.Watch(filepath.Dir(rulesPath), rt.notifyChan, events...); err != nil {
99+
return nil, err
100+
}
101+
go rt.handleConfigUpdates()
102+
if err := rt.loadRules(); err != nil {
103+
if !os.IsNotExist(err) {
104+
return nil, err
105+
}
106+
}
107+
return rt, nil
108+
}
109+
110+
func (rt *RoundTripper) loadRules() error {
111+
b, err := ioutil.ReadFile(rt.rulesPath)
112+
if err != nil {
113+
return err
114+
}
115+
var rules []configRule
116+
Rule:
117+
for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
118+
parts := strings.Split(line, " ")
119+
byKey := make(map[string]string, len(parts))
120+
for _, p := range parts {
121+
idx := strings.IndexByte(p, '=')
122+
if idx == -1 {
123+
return fmt.Errorf("malformed pair: expected format key=value")
124+
}
125+
key, value := p[:idx], p[idx+1:]
126+
// TODO(later): validate key is known
127+
if key == "rate" {
128+
_, err := strconv.ParseInt(strings.TrimSuffix(value, "%"), 0, 64)
129+
if err != nil {
130+
return err
131+
}
132+
}
133+
byKey[key] = value
134+
}
135+
// Skip rule if rt.fixedPairs do not match. This comes in handy for
136+
// application-specific tags such as RobustIRC’s peeraddr=<peeraddr>.
137+
for _, p := range rt.fixedPairs {
138+
if value, ok := byKey[p.key]; ok && value != p.value {
139+
continue Rule
140+
}
141+
delete(byKey, p.key)
142+
}
143+
pairs := make([]pair, 0, len(byKey))
144+
for key, value := range byKey {
145+
pairs = append(pairs, pair{key, value})
146+
}
147+
sort.Slice(pairs, func(i, j int) bool { return pairs[i].key < pairs[j].key })
148+
rules = append(rules, configRule{pairs: pairs})
149+
}
150+
rt.configMu.Lock()
151+
defer rt.configMu.Unlock()
152+
rt.configRevision++
153+
rt.rules = rules
154+
rt.configCond.Broadcast()
155+
return nil
156+
}
157+
158+
func (rt *RoundTripper) handleConfigUpdates() {
159+
for ev := range rt.notifyChan {
160+
if ev.Path() != rt.rulesPath {
161+
continue
162+
}
163+
if err := rt.loadRules(); err != nil {
164+
log.Printf("loading failureMap failed: %v", err)
165+
}
166+
}
167+
}
168+
169+
func (rt *RoundTripper) Close() error {
170+
notify.Stop(rt.notifyChan)
171+
return nil
172+
}
173+
174+
func (rt *RoundTripper) ConfigChangeAwaiter() *ConfigChangeAwaiter {
175+
return &ConfigChangeAwaiter{rt, rt.configRevision}
176+
}
177+
178+
func (rt *RoundTripper) failCommon(stage, dest string) bool {
179+
rt.configMu.Lock()
180+
defer rt.configMu.Unlock()
181+
Rule:
182+
for _, rule := range rt.rules {
183+
if debug {
184+
log.Printf("checking if rule %v matches stage=%v, dest=%v", rule, stage, dest)
185+
}
186+
for _, pair := range rule.pairs {
187+
switch pair.key {
188+
case "dest":
189+
if pair.value != dest {
190+
continue Rule
191+
}
192+
case "rate":
193+
// validated in loadRules; cannot fail
194+
rate, _ := strconv.ParseInt(strings.TrimSuffix(pair.value, "%"), 0, 64)
195+
rnd := rand.Float32() * 100
196+
if debug {
197+
log.Printf("rate=%v, rnd=%v", rate, rnd)
198+
}
199+
if rnd < float32(rate) {
200+
continue
201+
}
202+
continue Rule
203+
case "stage":
204+
if pair.value != stage {
205+
continue Rule
206+
}
207+
case "redial":
208+
if stage == "request" {
209+
// irrelevant for stage=dial as stage=request happens first
210+
rt.Underlying.CloseIdleConnections()
211+
}
212+
default:
213+
log.Fatalf("BUG: unexpected pair %s", pair)
214+
}
215+
}
216+
return true
217+
}
218+
return false
219+
}
220+
221+
func (rt *RoundTripper) failDial(network, addr string) bool {
222+
return rt.failCommon("dial", addr)
223+
}
224+
225+
func (rt *RoundTripper) failRequest(req *http.Request) bool {
226+
return rt.failCommon("request", req.URL.Host)
227+
}
228+
229+
func (rt *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
230+
if rt.failRequest(req) {
231+
return nil, fmt.Errorf("request failed by flakyhttp")
232+
}
233+
return rt.Underlying.RoundTrip(req)
234+
}

0 commit comments

Comments
 (0)