Skip to content

Commit

Permalink
lib/model: Properly schedule pull on reconnect (fixes #4504)
Browse files Browse the repository at this point in the history
We need to reset prevSeq so that we force a full check when someone
reconnects - the sequence number may not have changed due to the
reconnect. (This is a regression; we did this before f6ea2a7.)

Also add an optimization: we schedule a pull after scanning, but there
is no need to do so if no changes were detected. This matters now
because the scheduled pull actually traverses the database which is
expensive.

This, however, makes the pull not happen on initial scan if there were
not changes during the initial scan. Compensate by always scheduling a
pull after initial scan in the rwfolder itself.
  • Loading branch information
calmh committed Nov 13, 2017
1 parent 80031c5 commit 9251ec7
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 65 deletions.
15 changes: 13 additions & 2 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,8 +1877,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
return err
}

defer runner.SchedulePull()

// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
Expand Down Expand Up @@ -1918,6 +1916,15 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su

batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
changes := 0

// Schedule a pull after scanning, but only if we actually detected any
// changes.
defer func() {
if changes > 0 {
runner.SchedulePull()
}
}()

for f := range fchan {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
Expand All @@ -1929,8 +1936,10 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
batch = batch[:0]
batchSizeBytes = 0
}

batch = append(batch, f)
batchSizeBytes += f.ProtoSize()
changes++
}

if err := runner.CheckHealth(); err != nil {
Expand Down Expand Up @@ -1972,6 +1981,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
nf := f.ConvertToInvalidFileInfo(m.id.Short())
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++

case !f.IsInvalid() && !f.IsDeleted():
// The file is valid and not deleted. Lets check if it's
Expand All @@ -1998,6 +2008,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su

batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++
}
}
return true
Expand Down
12 changes: 12 additions & 0 deletions lib/model/rwfolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ func (f *sendReceiveFolder) Serve() {
f.startWatch()
}

initialCompleted := f.initialScanFinished

for {
select {
case <-f.ctx.Done():
Expand All @@ -169,6 +171,7 @@ func (f *sendReceiveFolder) Serve() {
default:
}

prevSeq = 0
if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
// Pulling failed, try again later.
pullFailTimer.Reset(f.pause)
Expand All @@ -184,6 +187,15 @@ func (f *sendReceiveFolder) Serve() {
}
}

case <-initialCompleted:
// Initial scan has completed, we should do a pull
prevSeq = 0
initialCompleted = nil // never hit this case again
if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
// Pulling failed, try again later.
pullFailTimer.Reset(f.pause)
}

// The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the
// same time.
Expand Down
64 changes: 19 additions & 45 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,27 @@ package integration

import (
"log"
"sync"
"testing"
"time"
)

func TestReconnectReceiverDuringTransfer(t *testing.T) {
testReconnectDuringTransfer(t, false, true, 0, 0)
testReconnectDuringTransfer(t, false, true)
}

func TestReconnectSenderDuringTransfer(t *testing.T) {
testReconnectDuringTransfer(t, true, false, 0, 0)
testReconnectDuringTransfer(t, true, false)
}

func TestReconnectSenderAndReceiverDuringTransfer(t *testing.T) {
// Give the receiver some time to rot with needed files but
// without any peer. This triggers
// https://github.com/syncthing/syncthing/issues/463
testReconnectDuringTransfer(t, true, true, 10*time.Second, 0)
}

func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceiver bool, senderDelay, receiverDelay time.Duration) {
func testReconnectDuringTransfer(t *testing.T, restartSender, restartReceiver bool) {
log.Println("Cleaning...")
err := removeAll("s1", "s2", "h1/index*", "h2/index*")
if err != nil {
t.Fatal(err)
}

log.Println("Generating files...")
err = generateFiles("s1", 2500, 20, "../LICENSE")
err = generateFiles("s1", 250, 20, "../LICENSE")
if err != nil {
t.Fatal(err)
}
Expand All @@ -63,8 +55,9 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
if err != nil {
t.Fatal(err)
}
cfg.Options.MaxRecvKbps = 100
cfg.Options.MaxSendKbps = 100
cfg.Options.MaxRecvKbps = 750
cfg.Options.MaxSendKbps = 750
cfg.Options.LimitBandwidthInLan = true
if err := receiver.PostConfig(cfg); err != nil {
t.Fatal(err)
}
Expand All @@ -86,42 +79,22 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
// Receiver has made progress
prevBytes = recv.InSyncBytes

if ReconnectReceiver {
log.Printf("Pausing receiver...")
receiver.PauseAll()
}

if ReconnectSender {
log.Printf("Pausing sender...")
sender.PauseAll()
if restartReceiver {
log.Printf("Stopping receiver...")
receiver.Stop()
receiver = startInstance(t, 2)
receiver.ResumeAll()
}

var wg sync.WaitGroup

if ReconnectReceiver {
wg.Add(1)
go func() {
time.Sleep(receiverDelay)
log.Printf("Resuming receiver...")
receiver.ResumeAll()
wg.Done()
}()
}

if ReconnectSender {
wg.Add(1)
go func() {
time.Sleep(senderDelay)
log.Printf("Resuming sender...")
sender.ResumeAll()
wg.Done()
}()
if restartSender {
log.Printf("Stopping sender...")
sender.Stop()
sender = startInstance(t, 1)
sender.ResumeAll()
}

wg.Wait()
}

time.Sleep(time.Second)
time.Sleep(250 * time.Millisecond)
}

// Reset rate limits
Expand All @@ -131,6 +104,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
}
cfg.Options.MaxRecvKbps = 0
cfg.Options.MaxSendKbps = 0
cfg.Options.LimitBandwidthInLan = false
if err := receiver.PostConfig(cfg); err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 8 additions & 16 deletions test/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ package integration

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -128,25 +130,15 @@ func TestReset(t *testing.T) {
}

func createFiles(t *testing.T) int {
// Create eight empty files and directories
files := []string{"f1", "f2", "f3", "f4", "f11", "f12", "f13", "f14"}
dirs := []string{"d1", "d2", "d3", "d4", "d11", "d12", "d13", "d14"}
all := append(files, dirs...)

for _, file := range files {
fd, err := os.Create(filepath.Join("s1", file))
if err != nil {
t.Fatal(err)
}
fd.Close()
}
// Create a few files

for _, dir := range dirs {
err := os.Mkdir(filepath.Join("s1", dir), 0755)
if err != nil {
const n = 8
for i := 0; i < n; i++ {
file := fmt.Sprintf("f%d", i)
if err := ioutil.WriteFile(filepath.Join("s1", file), []byte("data"), 0644); err != nil {
t.Fatal(err)
}
}

return len(all)
return n
}
4 changes: 2 additions & 2 deletions test/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

const (
longTimeLimit = 5 * time.Minute
shortTimeLimit = 45 * time.Second
longTimeLimit = 1 * time.Minute
shortTimeLimit = 25 * time.Second
s12Folder = `¯\_(ツ)_/¯ Räksmörgås 动作 Адрес` // This was renamed to ensure arbitrary folder IDs are fine.
)

Expand Down

0 comments on commit 9251ec7

Please sign in to comment.