/
tcpping.go
164 lines (141 loc) · 4.86 KB
/
tcpping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Package tcpping is the experimental tcpping experiment.
//
// See https://github.com/ooni/spec/blob/master/nettests/ts-032-tcpping.md.
package tcpping
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"github.com/ooni/probe-engine/pkg/logx"
"github.com/ooni/probe-engine/pkg/measurexlite"
"github.com/ooni/probe-engine/pkg/model"
)
const (
testName = "tcpping"
testVersion = "0.2.0"
)
// Config contains the experiment configuration.
type Config struct {
// Delay is the delay between each repetition (in milliseconds).
Delay int64 `ooni:"number of milliseconds to wait before sending each ping"`
// Repetitions is the number of repetitions for each ping.
Repetitions int64 `ooni:"number of times to repeat the measurement"`
}
func (c *Config) delay() time.Duration {
if c.Delay > 0 {
return time.Duration(c.Delay) * time.Millisecond
}
return time.Second
}
func (c *Config) repetitions() int64 {
if c.Repetitions > 0 {
return c.Repetitions
}
return 10
}
// TestKeys contains the experiment results.
type TestKeys struct {
Pings []*SinglePing `json:"pings"`
}
// SinglePing contains the results of a single ping.
type SinglePing struct {
TCPConnect *model.ArchivalTCPConnectResult `json:"tcp_connect"`
}
// Measurer performs the measurement.
type Measurer struct {
config Config
}
// ExperimentName implements ExperimentMeasurer.ExperiExperimentName.
func (m *Measurer) ExperimentName() string {
return testName
}
// ExperimentVersion implements ExperimentMeasurer.ExperimentVersion.
func (m *Measurer) ExperimentVersion() string {
return testVersion
}
var (
// errNoInputProvided indicates you didn't provide any input
errNoInputProvided = errors.New("not input provided")
// errInputIsNotAnURL indicates that input is not an URL
errInputIsNotAnURL = errors.New("input is not an URL")
// errInvalidScheme indicates that the scheme is invalid
errInvalidScheme = errors.New("scheme must be tcpconnect")
// errMissingPort indicates that there is no port.
errMissingPort = errors.New("the URL must include a port")
)
// Run implements ExperimentMeasurer.Run.
func (m *Measurer) Run(ctx context.Context, args *model.ExperimentArgs) error {
_ = args.Callbacks
measurement := args.Measurement
sess := args.Session
if measurement.Input == "" {
return errNoInputProvided
}
parsed, err := url.Parse(string(measurement.Input))
if err != nil {
return fmt.Errorf("%w: %s", errInputIsNotAnURL, err.Error())
}
if parsed.Scheme != "tcpconnect" {
return errInvalidScheme
}
if parsed.Port() == "" {
return errMissingPort
}
tk := new(TestKeys)
measurement.TestKeys = tk
out := make(chan *SinglePing)
go m.tcpPingLoop(ctx, measurement.MeasurementStartTimeSaved, sess.Logger(), parsed.Host, out)
for len(tk.Pings) < int(m.config.repetitions()) {
tk.Pings = append(tk.Pings, <-out)
}
return nil // return nil so we always submit the measurement
}
// tcpPingLoop sends all the ping requests and emits the results onto the out channel.
func (m *Measurer) tcpPingLoop(ctx context.Context, zeroTime time.Time,
logger model.Logger, address string, out chan<- *SinglePing) {
ticker := time.NewTicker(m.config.delay())
defer ticker.Stop()
for i := int64(0); i < m.config.repetitions(); i++ {
go m.tcpPingAsync(ctx, i, zeroTime, logger, address, out)
<-ticker.C
}
}
// tcpPingAsync performs a TCP ping and emits the result onto the out channel.
func (m *Measurer) tcpPingAsync(ctx context.Context, index int64,
zeroTime time.Time, logger model.Logger, address string, out chan<- *SinglePing) {
out <- m.tcpConnect(ctx, index, zeroTime, logger, address)
}
// tcpConnect performs a TCP connect and returns the result to the caller.
func (m *Measurer) tcpConnect(ctx context.Context, index int64,
zeroTime time.Time, logger model.Logger, address string) *SinglePing {
// TODO(bassosimone): make the timeout user-configurable
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
trace := measurexlite.NewTrace(index, zeroTime)
dialer := trace.NewDialerWithoutResolver(logger)
ol := logx.NewOperationLogger(logger, "TCPPing #%d %s", index, address)
conn, err := dialer.DialContext(ctx, "tcp", address)
ol.Stop(err)
measurexlite.MaybeClose(conn)
sp := &SinglePing{
TCPConnect: trace.FirstTCPConnectOrNil(), // record the first connect from the buffer
}
return sp
}
// NewExperimentMeasurer creates a new ExperimentMeasurer.
func NewExperimentMeasurer(config Config) model.ExperimentMeasurer {
return &Measurer{config: config}
}
// SummaryKeys contains summary keys for this experiment.
//
// Note that this structure is part of the ABI contract with ooniprobe
// therefore we should be careful when changing it.
type SummaryKeys struct {
IsAnomaly bool `json:"-"`
}
// GetSummaryKeys implements model.ExperimentMeasurer.GetSummaryKeys.
func (m Measurer) GetSummaryKeys(measurement *model.Measurement) (interface{}, error) {
return SummaryKeys{IsAnomaly: false}, nil
}