Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/model: Properly schedule pull on reconnect (fixes #4504) #4508

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,8 +1877,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
return err
}

defer runner.SchedulePull()

// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
Expand Down Expand Up @@ -1918,6 +1916,15 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su

batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
changes := 0

// Schedule a pull after scanning, but only if we actually detected any
// changes.
defer func() {
if changes > 0 {
runner.SchedulePull()
}
}()

for f := range fchan {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
Expand All @@ -1929,8 +1936,10 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
batch = batch[:0]
batchSizeBytes = 0
}

batch = append(batch, f)
batchSizeBytes += f.ProtoSize()
changes++
}

if err := runner.CheckHealth(); err != nil {
Expand Down Expand Up @@ -1972,6 +1981,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
nf := f.ConvertToInvalidFileInfo(m.id.Short())
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++

case !f.IsInvalid() && !f.IsDeleted():
// The file is valid and not deleted. Lets check if it's
Expand All @@ -1998,6 +2008,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su

batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++
}
}
return true
Expand Down
53 changes: 17 additions & 36 deletions lib/model/rwfolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (f *sendReceiveFolder) Serve() {
f.setState(FolderIdle)
}()

var prevSeq int64
var prevIgnoreHash string
var success bool
pullFailTimer := time.NewTimer(time.Duration(0))
Expand All @@ -157,6 +156,8 @@ func (f *sendReceiveFolder) Serve() {
f.startWatch()
}

initialCompleted := f.initialScanFinished

for {
select {
case <-f.ctx.Done():
Expand All @@ -169,13 +170,13 @@ func (f *sendReceiveFolder) Serve() {
default:
}

if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
// Pulling failed, try again later.
pullFailTimer.Reset(f.pause)
}

case <-pullFailTimer.C:
if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
// Pulling failed, try again later.
pullFailTimer.Reset(f.pause)
// Back off from retrying to pull with an upper limit.
Expand All @@ -184,6 +185,14 @@ func (f *sendReceiveFolder) Serve() {
}
}

case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
// Pulling failed, try again later.
pullFailTimer.Reset(f.pause)
}

// The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the
// same time.
Expand Down Expand Up @@ -222,41 +231,27 @@ func (f *sendReceiveFolder) String() string {
return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f)
}

func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq int64, curIgnoreHash string, success bool) {
func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, success bool) {
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return prevSeq, prevIgnoreHash, true
return prevIgnoreHash, true
}

f.model.fmut.RLock()
curIgnores := f.model.folderIgnores[f.folderID]
f.model.fmut.RUnlock()

curSeq = prevSeq
curIgnoreHash = curIgnores.Hash()
ignoresChanged := curIgnoreHash != prevIgnoreHash
if ignoresChanged {
// The ignore patterns have changed. We need to re-evaluate if
// there are files we need now that were ignored before.
l.Debugln(f, "ignore patterns have changed, resetting curSeq")
curSeq = 0
}

// RemoteSequence() is a fast call, doesn't touch the database.
remoteSeq, ok := f.model.RemoteSequence(f.folderID)
if !ok || remoteSeq == curSeq {
l.Debugln(f, "skip (remoteSeq == curSeq)", curSeq, ok)
return curSeq, curIgnoreHash, true
}

if err := f.CheckHealth(); err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return curSeq, curIgnoreHash, true
return curIgnoreHash, true
}

l.Debugln(f, "pulling", curSeq, remoteSeq)
l.Debugln(f, "pulling")

f.setState(FolderSyncing)
f.clearErrors()
Expand All @@ -273,20 +268,6 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i
// No files were changed by the puller, so we are in
// sync. Update the local version number.

if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < remoteSeq {
// There's a corner case where the device we needed
// files from disconnected during the puller
// iteration. The files will have been removed from
// the index, so we've concluded that we don't need
// them, but at the same time we have the old remote sequence
// that includes those files in remoteSeq. So we
// catch the case that this sequence might have
// decreased here.
l.Debugf("%v adjusting remoteSeq from %d to %d", remoteSeq, lv)
remoteSeq = lv
}
curSeq = remoteSeq

f.pause = f.basePause()

break
Expand All @@ -313,7 +294,7 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i

f.setState(FolderIdle)

return curSeq, curIgnoreHash, changed == 0
return curIgnoreHash, changed == 0
}

// pullerIteration runs a single puller iteration for the given folder and
Expand Down
64 changes: 19 additions & 45 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,27 @@ package integration

import (
"log"
"sync"
"testing"
"time"
)

func TestReconnectReceiverDuringTransfer(t *testing.T) {
testReconnectDuringTransfer(t, false, true, 0, 0)
testReconnectDuringTransfer(t, false, true)
}

func TestReconnectSenderDuringTransfer(t *testing.T) {
testReconnectDuringTransfer(t, true, false, 0, 0)
testReconnectDuringTransfer(t, true, false)
}

func TestReconnectSenderAndReceiverDuringTransfer(t *testing.T) {
// Give the receiver some time to rot with needed files but
// without any peer. This triggers
// https://github.com/syncthing/syncthing/issues/463
testReconnectDuringTransfer(t, true, true, 10*time.Second, 0)
}

func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceiver bool, senderDelay, receiverDelay time.Duration) {
func testReconnectDuringTransfer(t *testing.T, restartSender, restartReceiver bool) {
log.Println("Cleaning...")
err := removeAll("s1", "s2", "h1/index*", "h2/index*")
if err != nil {
t.Fatal(err)
}

log.Println("Generating files...")
err = generateFiles("s1", 2500, 20, "../LICENSE")
err = generateFiles("s1", 250, 20, "../LICENSE")
if err != nil {
t.Fatal(err)
}
Expand All @@ -63,8 +55,9 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
if err != nil {
t.Fatal(err)
}
cfg.Options.MaxRecvKbps = 100
cfg.Options.MaxSendKbps = 100
cfg.Options.MaxRecvKbps = 750
cfg.Options.MaxSendKbps = 750
cfg.Options.LimitBandwidthInLan = true
if err := receiver.PostConfig(cfg); err != nil {
t.Fatal(err)
}
Expand All @@ -86,42 +79,22 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
// Receiver has made progress
prevBytes = recv.InSyncBytes

if ReconnectReceiver {
log.Printf("Pausing receiver...")
receiver.PauseAll()
}

if ReconnectSender {
log.Printf("Pausing sender...")
sender.PauseAll()
if restartReceiver {
log.Printf("Stopping receiver...")
receiver.Stop()
receiver = startInstance(t, 2)
receiver.ResumeAll()
}

var wg sync.WaitGroup

if ReconnectReceiver {
wg.Add(1)
go func() {
time.Sleep(receiverDelay)
log.Printf("Resuming receiver...")
receiver.ResumeAll()
wg.Done()
}()
}

if ReconnectSender {
wg.Add(1)
go func() {
time.Sleep(senderDelay)
log.Printf("Resuming sender...")
sender.ResumeAll()
wg.Done()
}()
if restartSender {
log.Printf("Stopping sender...")
sender.Stop()
sender = startInstance(t, 1)
sender.ResumeAll()
}

wg.Wait()
}

time.Sleep(time.Second)
time.Sleep(250 * time.Millisecond)
}

// Reset rate limits
Expand All @@ -131,6 +104,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
}
cfg.Options.MaxRecvKbps = 0
cfg.Options.MaxSendKbps = 0
cfg.Options.LimitBandwidthInLan = false
if err := receiver.PostConfig(cfg); err != nil {
t.Fatal(err)
}
Expand Down
24 changes: 8 additions & 16 deletions test/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ package integration

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -128,25 +130,15 @@ func TestReset(t *testing.T) {
}

func createFiles(t *testing.T) int {
// Create eight empty files and directories
files := []string{"f1", "f2", "f3", "f4", "f11", "f12", "f13", "f14"}
dirs := []string{"d1", "d2", "d3", "d4", "d11", "d12", "d13", "d14"}
all := append(files, dirs...)

for _, file := range files {
fd, err := os.Create(filepath.Join("s1", file))
if err != nil {
t.Fatal(err)
}
fd.Close()
}
// Create a few files

for _, dir := range dirs {
err := os.Mkdir(filepath.Join("s1", dir), 0755)
if err != nil {
const n = 8
for i := 0; i < n; i++ {
file := fmt.Sprintf("f%d", i)
if err := ioutil.WriteFile(filepath.Join("s1", file), []byte("data"), 0644); err != nil {
t.Fatal(err)
}
}

return len(all)
return n
}
4 changes: 2 additions & 2 deletions test/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

const (
longTimeLimit = 5 * time.Minute
shortTimeLimit = 45 * time.Second
longTimeLimit = 1 * time.Minute
shortTimeLimit = 25 * time.Second
s12Folder = `¯\_(ツ)_/¯ Räksmörgås 动作 Адрес` // This was renamed to ensure arbitrary folder IDs are fine.
)

Expand Down