From babdfc009d6d3e626cb39594b2c1e0d970fc70b9 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 24 Oct 2019 20:34:29 +0200 Subject: [PATCH 1/2] http: start splitting the httptransport --- httpx/httpx.go | 8 +- internal/httptransport/httptransport.go | 138 +++------------ internal/httptransport/httptransport_test.go | 2 + .../httptransport/toptripper/toptripper.go | 162 ++++++++++++++++++ .../toptripper/toptripper_test.go | 42 +++++ internal/internal.go | 10 +- 6 files changed, 235 insertions(+), 127 deletions(-) create mode 100644 internal/httptransport/toptripper/toptripper.go create mode 100644 internal/httptransport/toptripper/toptripper_test.go diff --git a/httpx/httpx.go b/httpx/httpx.go index 521d8bf..1abd926 100644 --- a/httpx/httpx.go +++ b/httpx/httpx.go @@ -25,11 +25,11 @@ func NewTransport(beginning time.Time, handler model.Handler) *Transport { t.dialer = internal.NewDialer(beginning, handler) t.transport = httptransport.NewTransport(beginning, handler) // make sure we use an http2 ready TLS config - t.dialer.TLSConfig = t.transport.TLSClientConfig + t.dialer.TLSConfig = t.transport.Transport.TLSClientConfig // make sure HTTP uses our dialer - t.transport.Dial = t.dialer.Dial - t.transport.DialContext = t.dialer.DialContext - t.transport.DialTLS = t.dialer.DialTLS + t.transport.Transport.Dial = t.dialer.Dial + t.transport.Transport.DialContext = t.dialer.DialContext + t.transport.Transport.DialTLS = t.dialer.DialTLS return t } diff --git a/internal/httptransport/httptransport.go b/internal/httptransport/httptransport.go index 27b3c78..1f5bc34 100644 --- a/internal/httptransport/httptransport.go +++ b/internal/httptransport/httptransport.go @@ -3,13 +3,10 @@ package httptransport import ( - "io" "net/http" - "net/http/httptrace" - "sync" - "sync/atomic" "time" + "github.com/ooni/netx/internal/httptransport/toptripper" "github.com/ooni/netx/model" "golang.org/x/net/http2" ) @@ -19,9 +16,10 @@ var nextTransactionID int64 // Transport performs single HTTP transactions and emits // measurement events as they happen. type Transport struct { - http.Transport - Handler model.Handler - Beginning time.Time + Transport *http.Transport + Handler model.Handler + Beginning time.Time + roundTripper http.RoundTripper } // NewTransport creates a new Transport. @@ -29,7 +27,7 @@ func NewTransport(beginning time.Time, handler model.Handler) *Transport { transport := &Transport{ Beginning: beginning, Handler: handler, - Transport: http.Transport{ + Transport: &http.Transport{ ExpectContinueTimeout: 1 * time.Second, IdleConnTimeout: 90 * time.Second, MaxIdleConns: 100, @@ -37,127 +35,31 @@ func NewTransport(beginning time.Time, handler model.Handler) *Transport { TLSHandshakeTimeout: 10 * time.Second, }, } + transport.roundTripper = toptripper.New( + beginning, handler, transport.Transport, + ) // Configure h2 and make sure that the custom TLSConfig we use for dialing // is actually compatible with upgrading to h2. (This mainly means we // need to make sure we include "h2" in the NextProtos array.) Because // http2.ConfigureTransport only returns error when we have already // configured http2, it is safe to ignore the return value. - http2.ConfigureTransport(&transport.Transport) + http2.ConfigureTransport(transport.Transport) return transport } // RoundTrip executes a single HTTP transaction, returning // a Response for the provided Request. func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { - outmethod := req.Method - outurl := req.URL.String() - tid := atomic.AddInt64(&nextTransactionID, 1) - outheaders := http.Header{} - var mutex sync.Mutex - tracer := &httptrace.ClientTrace{ - GotConn: func(info httptrace.GotConnInfo) { - t.Handler.OnMeasurement(model.Measurement{ - HTTPConnectionReady: &model.HTTPConnectionReadyEvent{ - LocalAddress: info.Conn.LocalAddr().String(), - Network: info.Conn.LocalAddr().Network(), - RemoteAddress: info.Conn.RemoteAddr().String(), - Time: time.Now().Sub(t.Beginning), - TransactionID: tid, - }, - }) - }, - WroteHeaderField: func(key string, values []string) { - mutex.Lock() - outheaders[key] = values - mutex.Unlock() - }, - WroteHeaders: func() { - mutex.Lock() - m := model.Measurement{ - HTTPRequestHeadersDone: &model.HTTPRequestHeadersDoneEvent{ - Headers: outheaders, - Method: outmethod, - Time: time.Now().Sub(t.Beginning), - TransactionID: tid, - URL: outurl, - }, - } - mutex.Unlock() - t.Handler.OnMeasurement(m) - }, - WroteRequest: func(info httptrace.WroteRequestInfo) { - t.Handler.OnMeasurement(model.Measurement{ - HTTPRequestDone: &model.HTTPRequestDoneEvent{ - Time: time.Now().Sub(t.Beginning), - TransactionID: tid, - }, - }) - }, - GotFirstResponseByte: func() { - t.Handler.OnMeasurement(model.Measurement{ - HTTPResponseStart: &model.HTTPResponseStartEvent{ - Time: time.Now().Sub(t.Beginning), - TransactionID: tid, - }, - }) - }, - } - req = req.WithContext(httptrace.WithClientTrace(req.Context(), tracer)) - resp, err = t.Transport.RoundTrip(req) - if err != nil { - return - } - t.Handler.OnMeasurement(model.Measurement{ - HTTPResponseHeadersDone: &model.HTTPResponseHeadersDoneEvent{ - Headers: resp.Header, - StatusCode: int64(resp.StatusCode), - Time: time.Now().Sub(t.Beginning), - TransactionID: tid, - }, - }) - // "The http Client and Transport guarantee that Body is always - // non-nil, even on responses without a body or responses with - // a zero-length body." (from the docs) - resp.Body = &bodyWrapper{ - ReadCloser: resp.Body, - t: t, - tid: tid, - } - return + return t.roundTripper.RoundTrip(req) } -type bodyWrapper struct { - io.ReadCloser - t *Transport - tid int64 -} - -func (bw *bodyWrapper) Read(b []byte) (n int, err error) { - start := time.Now() - n, err = bw.ReadCloser.Read(b) - stop := time.Now() - bw.t.Handler.OnMeasurement(model.Measurement{ - HTTPResponseBodyPart: &model.HTTPResponseBodyPartEvent{ - // "Read reads up to len(p) bytes into p. It returns the number of - // bytes read (0 <= n <= len(p)) and any error encountered." - Data: b[:n], - Duration: stop.Sub(start), - Error: err, - NumBytes: int64(n), - Time: stop.Sub(bw.t.Beginning), - TransactionID: bw.tid, - }, - }) - return -} - -func (bw *bodyWrapper) Close() (err error) { - err = bw.ReadCloser.Close() - bw.t.Handler.OnMeasurement(model.Measurement{ - HTTPResponseDone: &model.HTTPResponseDoneEvent{ - Time: time.Now().Sub(bw.t.Beginning), - TransactionID: bw.tid, - }, - }) - return +// CloseIdleConnections closes the idle connections. +func (t *Transport) CloseIdleConnections() { + // Adapted from net/http code + type closeIdler interface { + CloseIdleConnections() + } + if tr, ok := t.roundTripper.(closeIdler); ok { + tr.CloseIdleConnections() + } } diff --git a/internal/httptransport/httptransport_test.go b/internal/httptransport/httptransport_test.go index d0a21bb..7e68f3c 100644 --- a/internal/httptransport/httptransport_test.go +++ b/internal/httptransport/httptransport_test.go @@ -23,6 +23,7 @@ func TestIntegration(t *testing.T) { if err != nil { t.Fatal(err) } + client.CloseIdleConnections() } func TestIntegrationFailure(t *testing.T) { @@ -38,4 +39,5 @@ func TestIntegrationFailure(t *testing.T) { if resp != nil { t.Fatal("expected a nil response here") } + client.CloseIdleConnections() } diff --git a/internal/httptransport/toptripper/toptripper.go b/internal/httptransport/toptripper/toptripper.go new file mode 100644 index 0000000..07c324a --- /dev/null +++ b/internal/httptransport/toptripper/toptripper.go @@ -0,0 +1,162 @@ +// Package toptripper contains the top HTTP round tripper. +package toptripper + +import ( + "io" + "net/http" + "net/http/httptrace" + "sync" + "sync/atomic" + "time" + + "github.com/ooni/netx/model" +) + +var nextTransactionID int64 + +// Transport performs single HTTP transactions and emits +// measurement events as they happen. +type Transport struct { + beginning time.Time + handler model.Handler + roundTripper http.RoundTripper +} + +// New creates a new Transport. +func New( + beginning time.Time, handler model.Handler, + roundTripper http.RoundTripper, +) *Transport { + return &Transport{ + beginning: beginning, + handler: handler, + roundTripper: roundTripper, + } +} + +// RoundTrip executes a single HTTP transaction, returning +// a Response for the provided Request. +func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { + outmethod := req.Method + outurl := req.URL.String() + tid := atomic.AddInt64(&nextTransactionID, 1) + outheaders := http.Header{} + var mutex sync.Mutex + tracer := &httptrace.ClientTrace{ + GotConn: func(info httptrace.GotConnInfo) { + t.handler.OnMeasurement(model.Measurement{ + HTTPConnectionReady: &model.HTTPConnectionReadyEvent{ + LocalAddress: info.Conn.LocalAddr().String(), + Network: info.Conn.LocalAddr().Network(), + RemoteAddress: info.Conn.RemoteAddr().String(), + Time: time.Now().Sub(t.beginning), + TransactionID: tid, + }, + }) + }, + WroteHeaderField: func(key string, values []string) { + mutex.Lock() + outheaders[key] = values + mutex.Unlock() + }, + WroteHeaders: func() { + mutex.Lock() + m := model.Measurement{ + HTTPRequestHeadersDone: &model.HTTPRequestHeadersDoneEvent{ + Headers: outheaders, + Method: outmethod, + Time: time.Now().Sub(t.beginning), + TransactionID: tid, + URL: outurl, + }, + } + mutex.Unlock() + t.handler.OnMeasurement(m) + }, + WroteRequest: func(info httptrace.WroteRequestInfo) { + t.handler.OnMeasurement(model.Measurement{ + HTTPRequestDone: &model.HTTPRequestDoneEvent{ + Time: time.Now().Sub(t.beginning), + TransactionID: tid, + }, + }) + }, + GotFirstResponseByte: func() { + t.handler.OnMeasurement(model.Measurement{ + HTTPResponseStart: &model.HTTPResponseStartEvent{ + Time: time.Now().Sub(t.beginning), + TransactionID: tid, + }, + }) + }, + } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), tracer)) + resp, err = t.roundTripper.RoundTrip(req) + if err != nil { + return + } + t.handler.OnMeasurement(model.Measurement{ + HTTPResponseHeadersDone: &model.HTTPResponseHeadersDoneEvent{ + Headers: resp.Header, + StatusCode: int64(resp.StatusCode), + Time: time.Now().Sub(t.beginning), + TransactionID: tid, + }, + }) + // "The http Client and Transport guarantee that Body is always + // non-nil, even on responses without a body or responses with + // a zero-length body." (from the docs) + resp.Body = &bodyWrapper{ + ReadCloser: resp.Body, + t: t, + tid: tid, + } + return +} + +// CloseIdleConnections closes the idle connections. +func (t *Transport) CloseIdleConnections() { + // Adapted from net/http code + type closeIdler interface { + CloseIdleConnections() + } + if tr, ok := t.roundTripper.(closeIdler); ok { + tr.CloseIdleConnections() + } +} + +type bodyWrapper struct { + io.ReadCloser + t *Transport + tid int64 +} + +func (bw *bodyWrapper) Read(b []byte) (n int, err error) { + start := time.Now() + n, err = bw.ReadCloser.Read(b) + stop := time.Now() + bw.t.handler.OnMeasurement(model.Measurement{ + HTTPResponseBodyPart: &model.HTTPResponseBodyPartEvent{ + // "Read reads up to len(p) bytes into p. It returns the number of + // bytes read (0 <= n <= len(p)) and any error encountered." + Data: b[:n], + Duration: stop.Sub(start), + Error: err, + NumBytes: int64(n), + Time: stop.Sub(bw.t.beginning), + TransactionID: bw.tid, + }, + }) + return +} + +func (bw *bodyWrapper) Close() (err error) { + err = bw.ReadCloser.Close() + bw.t.handler.OnMeasurement(model.Measurement{ + HTTPResponseDone: &model.HTTPResponseDoneEvent{ + Time: time.Now().Sub(bw.t.beginning), + TransactionID: bw.tid, + }, + }) + return +} diff --git a/internal/httptransport/toptripper/toptripper_test.go b/internal/httptransport/toptripper/toptripper_test.go new file mode 100644 index 0000000..5c54695 --- /dev/null +++ b/internal/httptransport/toptripper/toptripper_test.go @@ -0,0 +1,42 @@ +package toptripper + +import ( + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/ooni/netx/handlers" +) + +func TestIntegration(t *testing.T) { + client := &http.Client{ + Transport: New(time.Now(), handlers.NoHandler, http.DefaultTransport), + } + resp, err := client.Get("https://www.google.com") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + client.CloseIdleConnections() +} + +func TestIntegrationFailure(t *testing.T) { + client := &http.Client{ + Transport: New(time.Now(), handlers.NoHandler, http.DefaultTransport), + } + // This fails the request because we attempt to speak cleartext HTTP with + // a server that instead is expecting TLS. + resp, err := client.Get("http://www.google.com:443") + if err == nil { + t.Fatal("expected an error here") + } + if resp != nil { + t.Fatal("expected a nil response here") + } + client.CloseIdleConnections() +} diff --git a/internal/internal.go b/internal/internal.go index 7c43f70..581bc17 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -110,12 +110,12 @@ func newHTTPClientForDoH(beginning time.Time, handler model.Handler) *http.Clien // Logic to make sure we'll use the dialer in the new HTTP transport. We have // an already well configured config that works for http2 (as explained in a // comment there). Here we just use it because it's what we need. - dialer.TLSConfig = transport.TLSClientConfig + dialer.TLSConfig = transport.Transport.TLSClientConfig // Arrange the configuration such that we always use `dialer` for dialing. - transport.Dial = dialer.Dial - transport.DialContext = dialer.DialContext - transport.DialTLS = dialer.DialTLS - transport.MaxConnsPerHost = 1 // seems to be better for cloudflare DNS + transport.Transport.Dial = dialer.Dial + transport.Transport.DialContext = dialer.DialContext + transport.Transport.DialTLS = dialer.DialTLS + transport.Transport.MaxConnsPerHost = 1 // seems to be better for cloudflare DNS return &http.Client{Transport: transport} } From 1c31ed564ae33a39a447b988b99dc4ff4c9def66 Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 24 Oct 2019 20:41:16 +0200 Subject: [PATCH 2/2] Drop support for Go v1.11 It does not have CloseIdleConnections, which we need. --- .travis.yml | 1 - README.md | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8d42aef..b850fd8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ language: go go: -- 1.11.x - 1.12.x - 1.13.x env: diff --git a/README.md b/README.md index 6d5ee5e..ef4e9aa 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ https://godoc.org/github.com/ooni/netx#pkg-subdirectories). ## Build, run tests, run example commands -You need Go >= 1.11. We use Go modules. +You need Go >= 1.12. We use Go modules. To run tests: