Skip to content
This repository was archived by the owner on Mar 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
language: go
go:
- 1.11.x
- 1.12.x
- 1.13.x
env:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ https://godoc.org/github.com/ooni/netx#pkg-subdirectories).

## Build, run tests, run example commands

You need Go >= 1.11. We use Go modules.
You need Go >= 1.12. We use Go modules.

To run tests:

Expand Down
8 changes: 4 additions & 4 deletions httpx/httpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func NewTransport(beginning time.Time, handler model.Handler) *Transport {
t.dialer = internal.NewDialer(beginning, handler)
t.transport = httptransport.NewTransport(beginning, handler)
// make sure we use an http2 ready TLS config
t.dialer.TLSConfig = t.transport.TLSClientConfig
t.dialer.TLSConfig = t.transport.Transport.TLSClientConfig
// make sure HTTP uses our dialer
t.transport.Dial = t.dialer.Dial
t.transport.DialContext = t.dialer.DialContext
t.transport.DialTLS = t.dialer.DialTLS
t.transport.Transport.Dial = t.dialer.Dial
t.transport.Transport.DialContext = t.dialer.DialContext
t.transport.Transport.DialTLS = t.dialer.DialTLS
return t
}

Expand Down
138 changes: 20 additions & 118 deletions internal/httptransport/httptransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
package httptransport

import (
"io"
"net/http"
"net/http/httptrace"
"sync"
"sync/atomic"
"time"

"github.com/ooni/netx/internal/httptransport/toptripper"
"github.com/ooni/netx/model"
"golang.org/x/net/http2"
)
Expand All @@ -19,145 +16,50 @@ var nextTransactionID int64
// Transport performs single HTTP transactions and emits
// measurement events as they happen.
type Transport struct {
http.Transport
Handler model.Handler
Beginning time.Time
Transport *http.Transport
Handler model.Handler
Beginning time.Time
roundTripper http.RoundTripper
}

// NewTransport creates a new Transport.
func NewTransport(beginning time.Time, handler model.Handler) *Transport {
transport := &Transport{
Beginning: beginning,
Handler: handler,
Transport: http.Transport{
Transport: &http.Transport{
ExpectContinueTimeout: 1 * time.Second,
IdleConnTimeout: 90 * time.Second,
MaxIdleConns: 100,
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
},
}
transport.roundTripper = toptripper.New(
beginning, handler, transport.Transport,
)
// Configure h2 and make sure that the custom TLSConfig we use for dialing
// is actually compatible with upgrading to h2. (This mainly means we
// need to make sure we include "h2" in the NextProtos array.) Because
// http2.ConfigureTransport only returns error when we have already
// configured http2, it is safe to ignore the return value.
http2.ConfigureTransport(&transport.Transport)
http2.ConfigureTransport(transport.Transport)
return transport
}

// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
outmethod := req.Method
outurl := req.URL.String()
tid := atomic.AddInt64(&nextTransactionID, 1)
outheaders := http.Header{}
var mutex sync.Mutex
tracer := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
t.Handler.OnMeasurement(model.Measurement{
HTTPConnectionReady: &model.HTTPConnectionReadyEvent{
LocalAddress: info.Conn.LocalAddr().String(),
Network: info.Conn.LocalAddr().Network(),
RemoteAddress: info.Conn.RemoteAddr().String(),
Time: time.Now().Sub(t.Beginning),
TransactionID: tid,
},
})
},
WroteHeaderField: func(key string, values []string) {
mutex.Lock()
outheaders[key] = values
mutex.Unlock()
},
WroteHeaders: func() {
mutex.Lock()
m := model.Measurement{
HTTPRequestHeadersDone: &model.HTTPRequestHeadersDoneEvent{
Headers: outheaders,
Method: outmethod,
Time: time.Now().Sub(t.Beginning),
TransactionID: tid,
URL: outurl,
},
}
mutex.Unlock()
t.Handler.OnMeasurement(m)
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
t.Handler.OnMeasurement(model.Measurement{
HTTPRequestDone: &model.HTTPRequestDoneEvent{
Time: time.Now().Sub(t.Beginning),
TransactionID: tid,
},
})
},
GotFirstResponseByte: func() {
t.Handler.OnMeasurement(model.Measurement{
HTTPResponseStart: &model.HTTPResponseStartEvent{
Time: time.Now().Sub(t.Beginning),
TransactionID: tid,
},
})
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), tracer))
resp, err = t.Transport.RoundTrip(req)
if err != nil {
return
}
t.Handler.OnMeasurement(model.Measurement{
HTTPResponseHeadersDone: &model.HTTPResponseHeadersDoneEvent{
Headers: resp.Header,
StatusCode: int64(resp.StatusCode),
Time: time.Now().Sub(t.Beginning),
TransactionID: tid,
},
})
// "The http Client and Transport guarantee that Body is always
// non-nil, even on responses without a body or responses with
// a zero-length body." (from the docs)
resp.Body = &bodyWrapper{
ReadCloser: resp.Body,
t: t,
tid: tid,
}
return
return t.roundTripper.RoundTrip(req)
}

type bodyWrapper struct {
io.ReadCloser
t *Transport
tid int64
}

func (bw *bodyWrapper) Read(b []byte) (n int, err error) {
start := time.Now()
n, err = bw.ReadCloser.Read(b)
stop := time.Now()
bw.t.Handler.OnMeasurement(model.Measurement{
HTTPResponseBodyPart: &model.HTTPResponseBodyPartEvent{
// "Read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered."
Data: b[:n],
Duration: stop.Sub(start),
Error: err,
NumBytes: int64(n),
Time: stop.Sub(bw.t.Beginning),
TransactionID: bw.tid,
},
})
return
}

func (bw *bodyWrapper) Close() (err error) {
err = bw.ReadCloser.Close()
bw.t.Handler.OnMeasurement(model.Measurement{
HTTPResponseDone: &model.HTTPResponseDoneEvent{
Time: time.Now().Sub(bw.t.Beginning),
TransactionID: bw.tid,
},
})
return
// CloseIdleConnections closes the idle connections.
func (t *Transport) CloseIdleConnections() {
// Adapted from net/http code
type closeIdler interface {
CloseIdleConnections()
}
if tr, ok := t.roundTripper.(closeIdler); ok {
tr.CloseIdleConnections()
}
}
2 changes: 2 additions & 0 deletions internal/httptransport/httptransport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestIntegration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
client.CloseIdleConnections()
}

func TestIntegrationFailure(t *testing.T) {
Expand All @@ -38,4 +39,5 @@ func TestIntegrationFailure(t *testing.T) {
if resp != nil {
t.Fatal("expected a nil response here")
}
client.CloseIdleConnections()
}
162 changes: 162 additions & 0 deletions internal/httptransport/toptripper/toptripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Package toptripper contains the top HTTP round tripper.
package toptripper

import (
"io"
"net/http"
"net/http/httptrace"
"sync"
"sync/atomic"
"time"

"github.com/ooni/netx/model"
)

var nextTransactionID int64

// Transport performs single HTTP transactions and emits
// measurement events as they happen.
type Transport struct {
beginning time.Time
handler model.Handler
roundTripper http.RoundTripper
}

// New creates a new Transport.
func New(
beginning time.Time, handler model.Handler,
roundTripper http.RoundTripper,
) *Transport {
return &Transport{
beginning: beginning,
handler: handler,
roundTripper: roundTripper,
}
}

// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.
func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
outmethod := req.Method
outurl := req.URL.String()
tid := atomic.AddInt64(&nextTransactionID, 1)
outheaders := http.Header{}
var mutex sync.Mutex
tracer := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
t.handler.OnMeasurement(model.Measurement{
HTTPConnectionReady: &model.HTTPConnectionReadyEvent{
LocalAddress: info.Conn.LocalAddr().String(),
Network: info.Conn.LocalAddr().Network(),
RemoteAddress: info.Conn.RemoteAddr().String(),
Time: time.Now().Sub(t.beginning),
TransactionID: tid,
},
})
},
WroteHeaderField: func(key string, values []string) {
mutex.Lock()
outheaders[key] = values
mutex.Unlock()
},
WroteHeaders: func() {
mutex.Lock()
m := model.Measurement{
HTTPRequestHeadersDone: &model.HTTPRequestHeadersDoneEvent{
Headers: outheaders,
Method: outmethod,
Time: time.Now().Sub(t.beginning),
TransactionID: tid,
URL: outurl,
},
}
mutex.Unlock()
t.handler.OnMeasurement(m)
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
t.handler.OnMeasurement(model.Measurement{
HTTPRequestDone: &model.HTTPRequestDoneEvent{
Time: time.Now().Sub(t.beginning),
TransactionID: tid,
},
})
},
GotFirstResponseByte: func() {
t.handler.OnMeasurement(model.Measurement{
HTTPResponseStart: &model.HTTPResponseStartEvent{
Time: time.Now().Sub(t.beginning),
TransactionID: tid,
},
})
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), tracer))
resp, err = t.roundTripper.RoundTrip(req)
if err != nil {
return
}
t.handler.OnMeasurement(model.Measurement{
HTTPResponseHeadersDone: &model.HTTPResponseHeadersDoneEvent{
Headers: resp.Header,
StatusCode: int64(resp.StatusCode),
Time: time.Now().Sub(t.beginning),
TransactionID: tid,
},
})
// "The http Client and Transport guarantee that Body is always
// non-nil, even on responses without a body or responses with
// a zero-length body." (from the docs)
resp.Body = &bodyWrapper{
ReadCloser: resp.Body,
t: t,
tid: tid,
}
return
}

// CloseIdleConnections closes the idle connections.
func (t *Transport) CloseIdleConnections() {
// Adapted from net/http code
type closeIdler interface {
CloseIdleConnections()
}
if tr, ok := t.roundTripper.(closeIdler); ok {
tr.CloseIdleConnections()
}
}

type bodyWrapper struct {
io.ReadCloser
t *Transport
tid int64
}

func (bw *bodyWrapper) Read(b []byte) (n int, err error) {
start := time.Now()
n, err = bw.ReadCloser.Read(b)
stop := time.Now()
bw.t.handler.OnMeasurement(model.Measurement{
HTTPResponseBodyPart: &model.HTTPResponseBodyPartEvent{
// "Read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered."
Data: b[:n],
Duration: stop.Sub(start),
Error: err,
NumBytes: int64(n),
Time: stop.Sub(bw.t.beginning),
TransactionID: bw.tid,
},
})
return
}

func (bw *bodyWrapper) Close() (err error) {
err = bw.ReadCloser.Close()
bw.t.handler.OnMeasurement(model.Measurement{
HTTPResponseDone: &model.HTTPResponseDoneEvent{
Time: time.Now().Sub(bw.t.beginning),
TransactionID: bw.tid,
},
})
return
}
Loading