Skip to content

Commit

Permalink
Merge branch 'master' into release/0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ironsmile committed Nov 10, 2015
2 parents 4de7468 + c8fd5ca commit 8c3a698
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

A human readable change log between our released versions can be found in here.

## v0.1.5 - 2015-11-10

### New Stuff

* Nedomi can now utilise more efficient OS syscalls for copying data in some situations. For example, with cached files in Linux the `sendfile` syscall will be used to bridge the opened file and network socket directly in kernel space. For other systems, their respectful method for efficient copying will be used. This behaviour is made possible by the golang's standard library.

## v0.1.4 - 2015-11-09

### New Stuff
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.4
0.1.5
4 changes: 4 additions & 0 deletions app/write_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,7 @@ func (l *responseLogger) Status() int {
func (l *responseLogger) Size() int {
return l.size
}

func (l *responseLogger) ReadFrom(r io.Reader) (n int64, err error) {
return io.Copy(l.ResponseWriter, r)
}
22 changes: 1 addition & 21 deletions handler/cache/caching_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package cache

import (
"fmt"
"io"
"net/http"

"golang.org/x/net/context"

"github.com/ironsmile/nedomi/config"
"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/utils"
)

// CachingProxy is resposible for caching the metadata and parts the requested
Expand Down Expand Up @@ -40,25 +38,7 @@ func (c *CachingProxy) RequestHandle(ctx context.Context, resp http.ResponseWrit
return
}

rh := &reqHandler{c, ctx, req, toResponseWriteCloser(resp), c.NewObjectIDForURL(req.URL), nil}
rh := &reqHandler{c, ctx, req, resp, c.NewObjectIDForURL(req.URL), nil}
rh.handle()
c.Logger.Logf("[%p] Done!", req)
}

func toResponseWriteCloser(rw http.ResponseWriter) responseWriteCloser {
if rwc, ok := rw.(responseWriteCloser); ok {
return rwc
}
return struct {
http.ResponseWriter
io.Closer
}{
rw,
utils.NopCloser(nil),
}
}

type responseWriteCloser interface {
http.ResponseWriter
io.Closer
}
2 changes: 1 addition & 1 deletion handler/cache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type reqHandler struct {
*CachingProxy
ctx context.Context
req *http.Request
resp responseWriteCloser
resp http.ResponseWriter
objID *types.ObjectID
obj *types.ObjectMetadata
}
Expand Down
20 changes: 12 additions & 8 deletions handler/cache/handler_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ func (h *reqHandler) getResponseHook() func(*httputils.FlexibleResponseWriter) {
isCacheable := cacheutils.IsResponseCacheable(rw.Code, rw.Headers)
if !isCacheable {
h.Logger.Debugf("[%p] Response is non-cacheable", h.req)
rw.BodyWriter = h.resp
rw.BodyWriter = utils.AddCloser(h.resp)
return
}

expiresIn := cacheutils.ResponseExpiresIn(rw.Headers, h.CacheDefaultDuration)
if expiresIn <= 0 {
h.Logger.Debugf("[%p] Response expires in the past: %s", h.req, expiresIn)
rw.BodyWriter = h.resp
rw.BodyWriter = utils.AddCloser(h.resp)
return
}

responseRange, err := httputils.GetResponseRange(rw.Code, rw.Headers)
if err != nil {
h.Logger.Debugf("[%p] Was not able to get response range (%s)", h.req, err)
rw.BodyWriter = h.resp
rw.BodyWriter = utils.AddCloser(h.resp)
return
}

Expand Down Expand Up @@ -98,17 +98,17 @@ func (h *reqHandler) getResponseHook() func(*httputils.FlexibleResponseWriter) {
//!TODO: also, error if we already have fresh metadata but the received metadata is different
if err := h.Cache.Storage.SaveMetadata(obj); err != nil {
h.Logger.Errorf("[%p] Could not save metadata for %s: %s", h.req, obj.ID, err)
rw.BodyWriter = h.resp
rw.BodyWriter = utils.AddCloser(h.resp)
return
}

if h.req.Method == "HEAD" {
rw.BodyWriter = h.resp
rw.BodyWriter = utils.AddCloser(h.resp)
return
}

rw.BodyWriter = utils.MultiWriteCloser(
h.resp,
utils.AddCloser(h.resp),
PartWriter(h.Cache, h.objID, *responseRange),
)

Expand Down Expand Up @@ -207,14 +207,18 @@ func (h *reqHandler) lazilyRespond(start, end uint64) {
return
}
if i == 0 && startOffset > 0 {
contents = utils.SkipReadCloser(contents, int64(startOffset))
contents, err = utils.SkipReadCloser(contents, int64(startOffset))
if err != nil {
h.Logger.Errorf("[%p] Unexpected error while trying to skip %d from %s : %s", h.req, startOffset, indexes[i], err)
return
}
}
if i+partsCount == len(indexes) {
endLimit := uint64(partsCount-1)*partSize + end%partSize + 1
if i == 0 {
endLimit -= startOffset
}
contents = utils.LimitReadCloser(contents, int(endLimit)) //!TODO: fix int conversion
contents = utils.LimitReadCloser(contents, int64(endLimit)) //!TODO: fix int conversion
}

if copied, err := io.Copy(h.resp, contents); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions handler/cache/part_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/ironsmile/nedomi/types"
"github.com/ironsmile/nedomi/utils"
"github.com/ironsmile/nedomi/utils/httputils"
)

Expand Down Expand Up @@ -103,10 +104,10 @@ func (pw *partWriter) flushBuffer() error {

func (pw *partWriter) Close() error {
if pw.currentPos-pw.startPos != pw.length {
return &partWriterShortWrite{
return utils.WrapErrorWithStack(&partWriterShortWrite{
expected: pw.length,
actual: pw.currentPos - pw.startPos,
}
})
}
if pw.buf == nil {
return nil
Expand Down
12 changes: 12 additions & 0 deletions utils/httputils/flexible_response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,16 @@ func (frw *FlexibleResponseWriter) Close() error {
return frw.BodyWriter.Close()
}

// ReadFrom uses io.Copy with the BoduWriter if available after writing headers and checking that the writer is set
func (frw *FlexibleResponseWriter) ReadFrom(r io.Reader) (n int64, err error) {
if !frw.wroteHeader {
frw.WriteHeader(frw.Code)
}

if frw.BodyWriter == nil {
return 0, utils.NewErrorWithStack("The body is not initialized, writes are not accepted.")
}
return io.Copy(frw.BodyWriter, r)
}

//!TODO: implement http.CloseNotifier
56 changes: 56 additions & 0 deletions utils/netutils/timeout_conn.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package netutils

import (
"io"
"net"
"sync"
"time"
)

const maxSizeOfTransfer = 128 * 1024

// timeoutConn is a connection for which Deadline sets a timeout equal to
// the difference to which the deadline was set. That timeout is then use to
// Timeout each read|write on the connection
Expand Down Expand Up @@ -51,3 +55,55 @@ func (tc *timeoutConn) SetWriteDeadline(t time.Time) error {
tc.writeTimeout = t.Sub(time.Now())
return tc.Conn.SetWriteDeadline(t)
}

// ReadFrom uses the underlying ReadFrom if available or does not use it if not available
func (tc *timeoutConn) ReadFrom(r io.Reader) (n int64, err error) {
if wt, ok := r.(io.WriterTo); ok {
return wt.WriteTo(tc)
}
if rf, ok := tc.Conn.(io.ReaderFrom); ok {
var nn int64
for {
nn, err = rf.ReadFrom(io.LimitReader(r, maxSizeOfTransfer))
n += nn
if err != nil {
return
}
if nn != maxSizeOfTransfer {
return
}
}
}

// this is here because we need to write in smaller pieces in order to set the deadline
// directly using Copy will not use deadlines if used on the underlying net.Conn
// or will loop if used on timeoutConn directly
bufp := pool.Get().(*[]byte)
var readSize, writeSize int
var readErr error
for {
readSize, readErr = r.Read(*bufp)
n += int64(readSize)
writeSize, err = tc.Write((*bufp)[:readSize])
if err != nil {
return
}
if readSize != writeSize {
return n, io.ErrShortWrite
}
if readErr != nil {
pool.Put(bufp)
if readErr == io.EOF {
return n, nil
}
return
}
}
}

var pool = sync.Pool{
New: func() interface{} {
b := make([]byte, maxSizeOfTransfer)
return &b
},
}
57 changes: 40 additions & 17 deletions utils/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ func (m *multiReadCloser) Read(p []byte) (int, error) {
return size, err
}

func (m *multiReadCloser) WriteTo(w io.Writer) (n int64, err error) {
if m.index == len(m.readers) {
return 0, io.EOF
}

var (
nn int64
rrs = m.readers[m.index:]
)

for _, reader := range rrs {
nn, err = io.Copy(w, reader)
n += nn
if err != nil {
return
}
m.index++
}
return
}

func (m *multiReadCloser) Close() error {
c := new(CompositeError)
for ; m.index < len(m.readers); m.index++ {
Expand All @@ -59,26 +80,16 @@ func (m *multiReadCloser) Close() error {
}

type limitedReadCloser struct {
io.ReadCloser
maxLeft int
io.Reader
io.Closer
}

// LimitReadCloser wraps a io.ReadCloser but stops with EOF after `max` bytes.
func LimitReadCloser(readCloser io.ReadCloser, max int) io.ReadCloser {
func LimitReadCloser(readCloser io.ReadCloser, max int64) io.ReadCloser {
return &limitedReadCloser{
ReadCloser: readCloser,
maxLeft: max,
}
}

func (r *limitedReadCloser) Read(p []byte) (int, error) {
if r.maxLeft == 0 {
return 0, io.EOF
Reader: io.LimitReader(readCloser, max),
Closer: readCloser,
}
readSize := min(r.maxLeft, len(p))
size, err := r.ReadCloser.Read(p[:readSize])
r.maxLeft -= size
return size, err
}

func min(l, r int) int {
Expand All @@ -93,12 +104,24 @@ type skippingReadCloser struct {
skip int64
}

func (lrc *limitedReadCloser) WriteTo(w io.Writer) (n int64, err error) {
return io.Copy(w, lrc.Reader)
}

// SkipReadCloser wraps a io.ReadCloser and ignores the first `skip` bytes.
func SkipReadCloser(readCloser io.ReadCloser, skip int64) io.ReadCloser {
func SkipReadCloser(readCloser io.ReadCloser, skip int64) (result io.ReadCloser, err error) {
if seeker, ok := readCloser.(io.Seeker); ok {
_, err = seeker.Seek(skip, 1)
if err == nil {
result = readCloser
}
return
}

return &skippingReadCloser{
ReadCloser: readCloser,
skip: skip,
}
}, nil
}

func (r *skippingReadCloser) Read(p []byte) (int, error) {
Expand Down
10 changes: 8 additions & 2 deletions utils/readers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func TestLimitedReadCloser(t *testing.T) {
func TestSkipReaderClose(t *testing.T) {
t.Parallel()
hw := ioutil.NopCloser(bytes.NewBufferString("Hello, World!"))
src := SkipReadCloser(hw, 5)
src, err := SkipReadCloser(hw, 5)
if err != nil {
t.Fatal("unexpected error", err)
}
defer func() {
testutils.ShouldntFail(t, src.Close())
}()
Expand All @@ -96,7 +99,10 @@ func TestSkipReaderCloseWithPipe(t *testing.T) {
var input = []byte{'a', 'b', 'c', 'd'}
var output = []byte{'b', 'c', 'd'}
r, w := io.Pipe()
src := SkipReadCloser(r, 1)
src, err := SkipReadCloser(r, 1)
if err != nil {
t.Fatal("unexpected error", err)
}
go func() {
if _, err := w.Write(input); err != nil {
t.Fatalf("Unexpected Write error: %s", err)
Expand Down
10 changes: 10 additions & 0 deletions utils/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ func NopCloser(w io.Writer) io.WriteCloser {
return nopCloser{w}
}

// AddCloser adds io.Closer to a io.Writer.
// If the provided io.Writer is io.WriteCloser
// it's just casted, otherwise a NopCloser is used
func AddCloser(w io.Writer) io.WriteCloser {
if wc, ok := w.(io.WriteCloser); ok {
return wc
}
return NopCloser(w)
}

type multiWriteCloser struct {
writers []io.WriteCloser
}
Expand Down

0 comments on commit 8c3a698

Please sign in to comment.