Skip to content

Commit

Permalink
Implement optional concurrent "Range" requests (refs cavaliergopher#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinfx committed Jun 28, 2023
1 parent e06b719 commit e4a6245
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 53 deletions.
83 changes: 63 additions & 20 deletions v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -48,11 +49,23 @@ type Client struct {

// NewClient returns a new file download Client, using default configuration.
func NewClient() *Client {
dialer := &net.Dialer{}
return &Client{
UserAgent: "grab",
HTTPClient: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) {
conn, err := dialer.DialContext(ctx, network, addr)
if err == nil {
// Default net.TCPConn calls SetNoDelay(true)
// which likely could be an impact on performance
// with large file downloads, and many ACKs on networks
// with higher latency
err = conn.(*net.TCPConn).SetNoDelay(false)
}
return conn, err
},
},
},
}
Expand Down Expand Up @@ -86,6 +99,7 @@ func (c *Client) Do(req *Request) *Response {
ctx: ctx,
cancel: cancel,
bufferSize: req.BufferSize,
transfer: (*transfer)(nil),
}
if resp.bufferSize == 0 {
// default to Client.BufferSize
Expand Down Expand Up @@ -330,13 +344,19 @@ func (c *Client) headRequest(resp *Response) stateFunc {
}
resp.optionsKnown = true

if resp.Request.NoResume {
return c.getRequest
}
// If we are going to do a range request, then we need to perform
// the HEAD req to check for support.
// Otherwise, we may not need to do the HEAD request if we have
// enough information already.
if resp.Request.RangeRequestMax <= 0 {
if resp.Request.NoResume {
return c.getRequest
}

if resp.Filename != "" && resp.fi == nil {
// destination path is already known and does not exist
return c.getRequest
if resp.Filename != "" && resp.fi == nil {
// destination path is already known and does not exist
return c.getRequest
}
}

hreq := new(http.Request)
Expand Down Expand Up @@ -365,6 +385,13 @@ func (c *Client) headRequest(resp *Response) stateFunc {
}

func (c *Client) getRequest(resp *Response) stateFunc {
if resp.Request.RangeRequestMax > 0 && resp.acceptRanges {
// For a concurrent range request, we don't do a single
// GET request here. It will be handled later in the transfer,
// based on the HEAD response
return c.openWriter
}

resp.HTTPResponse, resp.err = c.doHTTPRequest(resp.Request.HTTPRequest)
if resp.err != nil {
return c.closeResponse
Expand Down Expand Up @@ -410,11 +437,12 @@ func (c *Client) readResponse(resp *Response) stateFunc {
resp.Filename = filepath.Join(resp.Request.Filename, filename)
}

if !resp.Request.NoStore && resp.requestMethod() == "HEAD" {
if resp.HTTPResponse.Header.Get("Accept-Ranges") == "bytes" {
resp.CanResume = true
if resp.requestMethod() == "HEAD" {
resp.acceptRanges = resp.HTTPResponse.Header.Get("Accept-Ranges") == "bytes"
if !resp.Request.NoStore {
resp.CanResume = resp.acceptRanges
return c.statFileInfo
}
return c.statFileInfo
}
return c.openWriter
}
Expand All @@ -431,6 +459,12 @@ func (c *Client) openWriter(resp *Response) stateFunc {
}
}

if resp.bufferSize < 1 {
resp.bufferSize = 32 * 1024
}

var writerAt io.WriterAt

if resp.Request.NoStore {
resp.writer = &resp.storeBuffer
} else {
Expand All @@ -453,11 +487,12 @@ func (c *Client) openWriter(resp *Response) stateFunc {
return c.closeResponse
}
resp.writer = f
writerAt = f

// seek to start or end
whence := os.SEEK_SET
whence := io.SeekStart
if resp.bytesResumed > 0 {
whence = os.SEEK_END
whence = io.SeekEnd
}
_, resp.err = f.Seek(0, whence)
if resp.err != nil {
Expand All @@ -469,13 +504,21 @@ func (c *Client) openWriter(resp *Response) stateFunc {
if resp.bufferSize < 1 {
resp.bufferSize = 32 * 1024
}
b := make([]byte, resp.bufferSize)
resp.transfer = newTransfer(
resp.Request.Context(),
resp.Request.RateLimiter,
resp.writer,
resp.HTTPResponse.Body,
b)

if resp.Request.RangeRequestMax > 0 && resp.acceptRanges && writerAt != nil {
// TODO: should we inspect resp.HTTPResponse.ContentLength
// and have a threshold under which a certain size should
// not use range requests? ie < 1MB? 256KB?
resp.transfer = newTransferRanges(c.HTTPClient, resp, writerAt)

} else {
resp.transfer = newTransfer(
resp.Request.Context(),
resp.Request.RateLimiter,
resp.writer,
resp.HTTPResponse.Body,
resp.bufferSize)
}

// next step is copyFile, but this will be called later in another goroutine
return nil
Expand Down Expand Up @@ -507,7 +550,7 @@ func (c *Client) copyFile(resp *Response) stateFunc {
t.Truncate(0)
}

bytesCopied, resp.err = resp.transfer.copy()
bytesCopied, resp.err = resp.transfer.Copy()
if resp.err != nil {
return c.closeResponse
}
Expand Down
58 changes: 58 additions & 0 deletions v3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,3 +912,61 @@ func TestNoStore(t *testing.T) {
})
})
}

// TestRangeRequest tests the option of using parallel range requests to download
// chunks of the remote resource
func TestRangeRequest(t *testing.T) {
size := int64(32768)
testCases := []struct {
Name string
Chunks int
StatusCode int
}{
{"NumChunksNeg", -1, http.StatusOK},
{"NumChunks0", 0, http.StatusOK},
{"NumChunks1", 1, http.StatusPartialContent},
{"NumChunks5", 5, http.StatusPartialContent},
}

for _, test := range testCases {
t.Run(test.Name, func(t *testing.T) {
opts := []grabtest.HandlerOption{
grabtest.ContentLength(int(size)),
grabtest.StatusCode(func(r *http.Request) int {
if test.Chunks > 0 {
return http.StatusPartialContent
}
return http.StatusOK
}),
}

grabtest.WithTestServer(t, func(url string) {
name := fmt.Sprintf(".testRangeRequest-%s", test.Name)
req := mustNewRequest(name, url)
req.RangeRequestMax = test.Chunks

resp := DefaultClient.Do(req)
defer os.Remove(resp.Filename)

err := resp.Err()
if err == ErrBadLength {
t.Errorf("error: %v", err)
} else if err != nil {
panic(err)
} else if resp.Size() != size {
t.Errorf("expected %v bytes, got %v bytes", size, resp.Size())
}

if resp.HTTPResponse.StatusCode != test.StatusCode {
t.Errorf("expected status code %v, got %d", test.StatusCode, resp.HTTPResponse.StatusCode)
}

if bps := resp.BytesPerSecond(); bps <= 0 {
t.Errorf("expected BytesPerSecond > 0, got %v", bps)
}

testComplete(t, resp)
}, opts...)
})
}
}
2 changes: 2 additions & 0 deletions v3/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/cavaliergopher/grab/v3

go 1.14

require golang.org/x/sync v0.3.0
2 changes: 2 additions & 0 deletions v3/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
49 changes: 43 additions & 6 deletions v3/pkg/grabtest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -106,20 +108,55 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Last-Modified", lastMod.Format(http.TimeFormat))

// set content-length
offset := 0
var offset int64
length := int64(h.contentLength)
if h.acceptRanges {
if reqRange := r.Header.Get("Range"); reqRange != "" {
if _, err := fmt.Sscanf(reqRange, "bytes=%d-", &offset); err != nil {
const b = `bytes=`
var limit int64
start, end, ok := strings.Cut(reqRange[len(b):], "-")
if !ok {
httpError(w, http.StatusBadRequest)
return
}
if offset >= h.contentLength {
httpError(w, http.StatusRequestedRangeNotSatisfiable)
var err error
if start != "" {
offset, err = strconv.ParseInt(start, 10, 64)
if err != nil {
httpError(w, http.StatusBadRequest)
return
}
if offset > length {
offset = length
}
}
if end != "" {
limit, err = strconv.ParseInt(end, 10, 64)
if err != nil {
httpError(w, http.StatusBadRequest)
return
}
}

if start != "" && end == "" {
length = length - offset
} else if start == "" && end != "" {
// unsupported range format: -<end>
httpError(w, http.StatusBadRequest)
} else {
length = limit - offset
}

if length > int64(h.contentLength) {
code := http.StatusRequestedRangeNotSatisfiable
msg := fmt.Sprintf("%s: requested range length %d "+
"is greater than ContentLength %d", http.StatusText(code), length, h.contentLength)
http.Error(w, msg, code)
return
}
}
}
w.Header().Set("Content-Length", fmt.Sprintf("%d", h.contentLength-offset))
w.Header().Set("Content-Length", fmt.Sprintf("%d", length))

// apply header blacklist
for _, key := range h.headerBlacklist {
Expand All @@ -133,7 +170,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
// use buffered io to reduce overhead on the reader
bw := bufio.NewWriterSize(w, 4096)
for i := offset; !isRequestClosed(r) && i < h.contentLength; i++ {
for i := offset; !isRequestClosed(r) && i < int64(offset+length); i++ {
bw.Write([]byte{byte(i)})
if h.rateLimiter != nil {
bw.Flush()
Expand Down
9 changes: 9 additions & 0 deletions v3/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ type Request struct {
// the Response object.
AfterCopy Hook

// RangeRequestMax enables the use of "Range" requests, if supported by the
// server, to download multiple chunks. A value > 0 defines how many chunks
// to execute concurrently.
// If the server does not support "accept-range", then the original single
// request behaviour is used.
// Note that the BufferSize will be applied as a separate buffer for each of
// the concurrent range request chunks
RangeRequestMax int

// hash, checksum and deleteOnError - set via SetChecksum.
hash hash.Hash
checksum []byte
Expand Down
10 changes: 7 additions & 3 deletions v3/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Response struct {
// previous downloads, as the 'Accept-Ranges: bytes' header is set.
CanResume bool

// specifies that the remote server advertised that it supports partial
// requests, as the 'Accept-Ranges: bytes' header is set.
acceptRanges bool

// DidResume specifies that the file transfer resumed a previously incomplete
// transfer.
DidResume bool
Expand Down Expand Up @@ -84,7 +88,7 @@ type Response struct {

// transfer is responsible for copying data from the remote server to a local
// file, tracking progress and allowing for cancelation.
transfer *transfer
transfer transferer

// bufferSize specifies the size in bytes of the transfer buffer.
bufferSize int
Expand Down Expand Up @@ -242,8 +246,8 @@ func (c *Response) checksumUnsafe() ([]byte, error) {
return nil, err
}
defer f.Close()
t := newTransfer(c.Request.Context(), nil, c.Request.hash, f, nil)
if _, err = t.copy(); err != nil {
t := newTransfer(c.Request.Context(), nil, c.Request.hash, f, 0)
if _, err = t.Copy(); err != nil {
return nil, err
}
sum := c.Request.hash.Sum(nil)
Expand Down
Loading

0 comments on commit e4a6245

Please sign in to comment.