Skip to content

Commit

Permalink
[proxy] rewrote chunked response handler
Browse files Browse the repository at this point in the history
1) We cannot send "Connection: close", because the fsouza docker client
   expects the tcp socket to stay open between requests.

2) Because we cannot force-close the connection, we can't hijack the
   connection (because go's net/http doesn't let use un-hijack it).

3) Because we need to maintain the individual chunking of messages (for
   docker-py), we can't just copy the response body, as Go will remove and
   re-add the chunking willy-nilly.

Therefore, we have to read each chunk one-by-one, and flush the
ResponseWriter after each one.
  • Loading branch information
paulbellamy committed Jul 10, 2015
1 parent b93070f commit efd23a2
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 20 deletions.
146 changes: 146 additions & 0 deletions proxy/chunked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Based on net/http/internal
package proxy

import (
"bufio"
"errors"
"io"
"io/ioutil"
)

const maxLineLength = 4096 // assumed <= bufio.defaultBufSize

var ErrLineTooLong = errors.New("header line too long")

// Unlike net/http/internal.chunkedReader, this has an interface where we can
// handle individual chunks. The interface is based on database/sql.Rows.
func NewChunkedReader(r io.Reader) *ChunkedReader {
br, ok := r.(*bufio.Reader)
if !ok {
br = bufio.NewReader(r)
}
return &ChunkedReader{r: br}
}

type ChunkedReader struct {
r *bufio.Reader
chunk *io.LimitedReader
err error
buf [2]byte
}

// Next prepares the next chunk for reading. It returns true on success, or
// false if there is no next chunk or an error happened while preparing
// it. Err should be consulted to distinguish between the two cases.
//
// Every call to Chunk, even the first one, must be preceded by a call to Next.
//
// Calls to Next will discard any unread bytes in the current Chunk.
func (cr *ChunkedReader) Next() bool {
if cr.err != nil {
return false
}

// Check the termination of the previous chunk
if cr.chunk != nil {
// Make sure the remainder is drained, in case the user of this quit
// reading early.
if _, cr.err = io.Copy(ioutil.Discard, cr.chunk); cr.err != nil {
return false
}

// Check the next two bytes after the chunk are \r\n
if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err != nil {
return false
}
if cr.buf[0] != '\r' || cr.buf[1] != '\n' {
cr.err = errors.New("malformed chunked encoding")
return false
}
} else {
cr.chunk = &io.LimitedReader{R: cr.r}
}

// Setup the next chunk
if n := cr.beginChunk(); n > 0 {
cr.chunk.N = int64(n)
} else {
cr.err = io.EOF
}
return cr.err == nil
}

// Chunk returns the io.Reader of the current chunk. On each call, this returns
// the same io.Reader for a given chunk.
func (cr *ChunkedReader) Chunk() io.Reader {
return cr.chunk
}

// Err returns the error, if any, that was encountered during iteration.
func (cr *ChunkedReader) Err() error {
if cr.err == io.EOF {
return nil
}
return cr.err
}

func (cr *ChunkedReader) beginChunk() (n uint64) {
// chunk-size CRLF
var line []byte
line, cr.err = readLine(cr.r)
if cr.err != nil {
return
}
n, cr.err = parseHexUint(line)
return
}

// Read a line of bytes (up to \n) from b.
// Give up if the line exceeds maxLineLength.
// The returned bytes are a pointer into storage in
// the bufio, so they are only valid until the next bufio read.
func readLine(b *bufio.Reader) (p []byte, err error) {
if p, err = b.ReadSlice('\n'); err != nil {
// We always know when EOF is coming.
// If the caller asked for a line, there should be a line.
if err == io.EOF {
err = io.ErrUnexpectedEOF
} else if err == bufio.ErrBufferFull {
err = ErrLineTooLong
}
return nil, err
}
if len(p) >= maxLineLength {
return nil, ErrLineTooLong
}
return trimTrailingWhitespace(p), nil
}

func trimTrailingWhitespace(b []byte) []byte {
for len(b) > 0 && isASCIISpace(b[len(b)-1]) {
b = b[:len(b)-1]
}
return b
}

func isASCIISpace(b byte) bool {
return b == ' ' || b == '\t' || b == '\n' || b == '\r'
}

func parseHexUint(v []byte) (n uint64, err error) {
for _, b := range v {
n <<= 4
switch {
case '0' <= b && b <= '9':
b = b - '0'
case 'a' <= b && b <= 'f':
b = b - 'a' + 10
case 'A' <= b && b <= 'F':
b = b - 'A' + 10
default:
return 0, errors.New("invalid byte in chunk length")
}
n |= uint64(b)
}
return
}
168 changes: 168 additions & 0 deletions proxy/chunked_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Based on net/http/internal
package proxy

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"testing"
)

func assertNextChunk(t *testing.T, r *ChunkedReader, expected string) {
if !r.Next() {
t.Fatalf("Expected chunk, but ran out early: %v", r.Err())
}
if r.Err() != nil {
t.Fatalf("Error reading chunk: %q", r.Err())
}
data, err := ioutil.ReadAll(r.Chunk())
if g := string(data); g != expected {
t.Errorf("chunk reader read %q; want %q", g, expected)
}
if err != nil {
t.Logf(`data: "%s"`, data)
t.Fatalf("reading chunk: %v", err)
}
}

func assertNoMoreChunks(t *testing.T, r *ChunkedReader) {
if r.Next() {
t.Errorf("Expected no more chunks, but found too many")
}
if r.Err() != nil {
t.Errorf("Expected no error, but found: %q", r.Err())
}
}

func TestChunk(t *testing.T) {
var b bytes.Buffer

w := NewChunkedWriter(&b)
const chunk1 = "hello, "
const chunk2 = "world! 0123456789abcdef"
w.Write([]byte(chunk1))
w.Write([]byte(chunk2))
w.Close()

r := NewChunkedReader(&b)

assertNextChunk(t, r, chunk1)
assertNextChunk(t, r, chunk2)
assertNoMoreChunks(t, r)
}

func TestIncompleteReadOfChunk(t *testing.T) {
var b bytes.Buffer

w := NewChunkedWriter(&b)
const chunk1 = "hello, "
const chunk2 = "world! 0123456789abcdef"
w.Write([]byte(chunk1))
w.Write([]byte(chunk2))
w.Close()

r := NewChunkedReader(&b)

// Incomplete read of first chunk
{
if !r.Next() {
t.Fatalf("Expected chunk, but ran out early: %v", r.Err())
}
if r.Err() != nil {
t.Fatalf("Error reading chunk: %q", r.Err())
}
// Read just 2 bytes
buf := make([]byte, 2)
if _, err := io.ReadFull(r.Chunk(), buf[:2]); err != nil {
t.Fatalf("Error reading first bytes of chunk: %q", err)
}
if buf[0] != 'h' || buf[1] != 'e' {
t.Fatalf("Unexpected first 2 bytes of chunk: %s", string(buf))
}
}

// Second chunk still reads ok
assertNextChunk(t, r, chunk2)

assertNoMoreChunks(t, r)
}

func TestMalformedChunks(t *testing.T) {
r := NewChunkedReader(bytes.NewBufferString(
"7\r\nhello, GARBAGEBYTES17\r\nworld! 0123456789abcdef\r\n0\r\n",
))

// First chunk is ok
assertNextChunk(t, r, "hello, ")

// Second chunk fails
{
if r.Next() {
t.Errorf("Expected failure when reading chunks, but got one")
}
e := "malformed chunked encoding"
if r.Err() == nil || r.Err().Error() != e {
t.Errorf("chunk reader errored %q; want %q", r.Err(), e)
}
data, err := ioutil.ReadAll(r.Chunk())
if len(data) != 0 {
t.Errorf("chunk should have been empty. got %q", string(data))
}
if err != nil {
t.Logf(`data: "%s"`, data)
t.Errorf("reading chunk: %v", err)
}
}
}

// Stolen from net/http/internal for testing
//
// NewChunkedWriter returns a new chunkedWriter that translates writes into HTTP
// "chunked" format before writing them to w. Closing the returned chunkedWriter
// sends the final 0-length chunk that marks the end of the stream.
//
// NewChunkedWriter is not needed by normal applications. The http
// package adds chunking automatically if handlers don't set a
// Content-Length header. Using newChunkedWriter inside a handler
// would result in double chunking or chunking with a Content-Length
// length, both of which are wrong.
func NewChunkedWriter(w io.Writer) io.WriteCloser {
return &chunkedWriter{w}
}

// Writing to chunkedWriter translates to writing in HTTP chunked Transfer
// Encoding wire format to the underlying Wire chunkedWriter.
type chunkedWriter struct {
Wire io.Writer
}

// Write the contents of data as one chunk to Wire.
// NOTE: Note that the corresponding chunk-writing procedure in Conn.Write has
// a bug since it does not check for success of io.WriteString
func (cw *chunkedWriter) Write(data []byte) (n int, err error) {

// Don't send 0-length data. It looks like EOF for chunked encoding.
if len(data) == 0 {
return 0, nil
}

if _, err = fmt.Fprintf(cw.Wire, "%x\r\n", len(data)); err != nil {
return 0, err
}
if n, err = cw.Wire.Write(data); err != nil {
return
}
if n != len(data) {
err = io.ErrShortWrite
return
}
_, err = io.WriteString(cw.Wire, "\r\n")

return
}

func (cw *chunkedWriter) Close() error {
_, err := io.WriteString(cw.Wire, "0\r\n")
return err
}
39 changes: 19 additions & 20 deletions proxy/proxy_intercept.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
Expand Down Expand Up @@ -117,29 +116,29 @@ func copyStream(dst io.Writer, src io.Reader, done chan struct{}) {
}

func doChunkedResponse(w http.ResponseWriter, resp *http.Response, client *httputil.ClientConn) {
// Because we can't go back to request/response after we
// hijack the connection, we need to close it and make the
// client open another.
w.Header().Add("Connection", "close")
w.WriteHeader(resp.StatusCode)

down, _, up, rem, err := hijack(w, client)
if err != nil {
http.Error(w, "Unable to hijack response stream for chunked response", http.StatusInternalServerError)
wf, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Error forwarding chunked response body: flush not available", http.StatusInternalServerError)
return
}

w.WriteHeader(resp.StatusCode)

up, rem := client.Hijack()
defer up.Close()
defer down.Close()
// Copy the chunked response body to downstream,
// stopping at the end of the chunked section.
rawResponseBody := io.MultiReader(rem, up)
if _, err := io.Copy(ioutil.Discard, httputil.NewChunkedReader(io.TeeReader(rawResponseBody, down))); err != nil {
http.Error(w, "Error copying chunked response body", http.StatusInternalServerError)
return

var err error
chunks := NewChunkedReader(io.MultiReader(rem, up))
for chunks.Next() && err == nil {
_, err = io.Copy(w, chunks.Chunk())
wf.Flush()
}
if err == nil {
err = chunks.Err()
}
if err != nil {
Error.Printf("Error forwarding chunked response body: %s", err)
}
resp.Trailer.Write(down)
// a chunked response ends with a CRLF
down.Write([]byte("\r\n"))
}

func hijack(w http.ResponseWriter, client *httputil.ClientConn) (down net.Conn, downBuf *bufio.ReadWriter, up net.Conn, rem io.Reader, err error) {
Expand Down

0 comments on commit efd23a2

Please sign in to comment.