Skip to content

Commit

Permalink
sync: add an infinite buffer for checks and uploads #379
Browse files Browse the repository at this point in the history
Add statistics for the check/upload/rename queues.

This means that checking can complete before the uploads which will
give rclone the ability to show exactly what is outstanding.
  • Loading branch information
ncw committed Jul 19, 2018
1 parent 473e3c3 commit 5957c39
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 59 deletions.
52 changes: 41 additions & 11 deletions fs/accounting/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ func init() {

// StatsInfo accounts all transfers
type StatsInfo struct {
mu sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking *stringSet
transfers int64
transferring *stringSet
deletes int64
start time.Time
inProgress *inProgress
mu sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking *stringSet
checkQueue int
checkQueueSize int64
transfers int64
transferring *stringSet
transferQueue int
transferQueueSize int64
renameQueue int
renameQueueSize int64
deletes int64
start time.Time
inProgress *inProgress
}

// NewStats cretates an initialised StatsInfo
Expand Down Expand Up @@ -205,3 +211,27 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.mu.Unlock()
}
}

// SetCheckQueue sets the number of queued checks
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
s.mu.Lock()
s.checkQueue = n
s.checkQueueSize = size
s.mu.Unlock()
}

// SetTransferQueue sets the number of queued transfers
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
s.mu.Lock()
s.transferQueue = n
s.transferQueueSize = size
s.mu.Unlock()
}

// SetRenameQueue sets the number of queued transfers
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
s.mu.Lock()
s.renameQueue = n
s.renameQueueSize = size
s.mu.Unlock()
}
3 changes: 0 additions & 3 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,9 +827,6 @@ type ObjectPair struct {
Src, Dst Object
}

// ObjectPairChan is a channel of ObjectPair
type ObjectPairChan chan ObjectPair

// Find looks for an Info object for the name passed in
//
// Services are looked up in the config file
Expand Down
79 changes: 79 additions & 0 deletions fs/sync/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package sync

import (
"sync"

"github.com/ncw/rclone/fs"
)

// pipe provides an unbounded channel like experience
type pipe struct {
mu sync.Mutex
C chan struct{}
queue []fs.ObjectPair
closed bool
totalSize int64
stats func(items int, totalSize int64)
}

func newPipe(stats func(items int, totalSize int64)) *pipe {
return &pipe{
C: make(chan struct{}, 1<<31-1),
stats: stats,
}
}

// Put an pair into the pipe
func (p *pipe) Put(pair fs.ObjectPair) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
panic("sync: pipe closed")
}
p.queue = append(p.queue, pair)
p.C <- struct{}{}
size := pair.Src.Size()
if size > 0 {
p.totalSize += size
}
p.stats(len(p.queue), p.totalSize)
}

// Get a pair from the pipe
//
// Note that you **must* read from <-pipe.C before calling this.
func (p *pipe) Get() (pair fs.ObjectPair) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.queue) == 0 {
panic("sync: pipe empty")
}
pair, p.queue = p.queue[0], p.queue[1:]
size := pair.Src.Size()
if size > 0 {
p.totalSize -= size
}
if p.totalSize < 0 {
p.totalSize = 0
}
p.stats(len(p.queue), p.totalSize)
return pair
}

// Stats reads the number of items in the queue and the totalSize
func (p *pipe) Stats() (items int, totalSize int64) {
p.mu.Lock()
items, totalSize = len(p.queue), p.totalSize
p.mu.Unlock()
return items, totalSize
}

// Close the pipe
//
// Writes to a closed pipe will panic as will double closing a pipe
func (p *pipe) Close() {
p.mu.Lock()
close(p.C)
p.closed = true
p.mu.Unlock()
}
53 changes: 53 additions & 0 deletions fs/sync/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sync

import (
"testing"

"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
)

func TestPipe(t *testing.T) {
var queueLength int
var queueSize int64
stats := func(n int, size int64) {
queueLength, queueSize = n, size
}

// Make a new pipe
p := newPipe(stats)

checkStats := func(expectedN int, expectedSize int64) {
n, size := p.Stats()
assert.Equal(t, expectedN, n)
assert.Equal(t, expectedSize, size)
assert.Equal(t, expectedN, queueLength)
assert.Equal(t, expectedSize, queueSize)
}

checkStats(0, 0)

// Show that reading from an empty pipe panics
assert.Panics(t, func() { p.Get() })

obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)

pair1 := fs.ObjectPair{Src: obj1, Dst: nil}

// Put an object
p.Put(pair1)
checkStats(1, 5)

// Close the pipe showing reading on closed pipe is OK
p.Close()

// Read from pipe
<-p.C
pair2 := p.Get()
assert.Equal(t, pair1, pair2)
checkStats(0, 0)

// Check panic on write to closed pipe
assert.Panics(t, func() { p.Put(pair1) })
}

0 comments on commit 5957c39

Please sign in to comment.