Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/model: Retain index info for new folders/devs (ref #7100) #7133

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions lib/model/indexsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,6 @@ func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.Fi
r.mut.Unlock()
}

func (r *indexSenderRegistry) addNew(folder config.FolderConfiguration, fset *db.FileSet) {
r.mut.Lock()
r.startLocked(folder.ID, fset, 0)
r.mut.Unlock()
}

func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
myIndexID := fset.IndexID(protocol.LocalDeviceID)
mySequence := fset.Sequence(protocol.LocalDeviceID)
Expand Down Expand Up @@ -282,7 +276,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID)
startSequence = 0
} else {
l.Debugf("Device %v folder %s is not delta index compatible", r.deviceID, folder.Description())
l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description())
}

// This is the other side's description of themselves. We
Expand All @@ -296,6 +290,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
// do not support delta indexes and we should clear any
// information we have from them before accepting their
// index, which will presumably be a full index.
l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description())
fset.Drop(r.deviceID)
} else if startInfo.remote.IndexID != theirIndexID {
// The index ID we have on file is not what they're
Expand All @@ -308,36 +303,32 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
}

r.startLocked(folder.ID, fset, startSequence)
}

func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, startSequence int64) {
if is, ok := r.indexSenders[folderID]; ok {
if is, ok := r.indexSenders[folder.ID]; ok {
r.sup.RemoveAndWait(is.token, 0)
delete(r.indexSenders, folderID)
delete(r.indexSenders, folder.ID)
}
if _, ok := r.startInfos[folderID]; ok {
delete(r.startInfos, folderID)
if _, ok := r.startInfos[folder.ID]; ok {
delete(r.startInfos, folder.ID)
}

is := &indexSender{
conn: r.conn,
connClosed: r.closed,
folder: folderID,
folder: folder.ID,
fset: fset,
prevSequence: startSequence,
evLogger: r.evLogger,
pauseChan: make(chan struct{}),
resumeChan: make(chan *db.FileSet),
}
is.token = r.sup.Add(is)
r.indexSenders[folderID] = is
r.indexSenders[folder.ID] = is
}

// addPaused stores the given info to start an index sender once resume is called
// addPending stores the given info to start an index sender once resume is called
// for this folder.
// If an index sender is already running, it will be stopped.
func (r *indexSenderRegistry) addPaused(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) {
func (r *indexSenderRegistry) addPending(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) {
r.mut.Lock()
defer r.mut.Unlock()

Expand Down
61 changes: 28 additions & 33 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,16 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
func (m *model) serve(ctx context.Context) error {
// Add and start folders
cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles
clusterConfigDevices := make(deviceIDSet, len(m.cfg.Devices()))
for _, folderCfg := range m.cfg.Folders() {
if folderCfg.Paused {
folderCfg.CreateRoot()
continue
}
m.newFolder(folderCfg, cacheIgnoredFiles)
clusterConfigDevices.add(folderCfg.DeviceIDs())
}
m.resendClusterConfig(clusterConfigDevices.AsSlice())
m.cfg.Subscribe(m)

close(m.started)
Expand Down Expand Up @@ -519,13 +522,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
// In case the folder was newly shared with us we already got a
// cluster config and wont necessarily get another soon - start
// sending indexes if connected.
isNew := !from.SharedWith(indexSenders.deviceID)
if isNew {
indexSenders.addNew(to, fset)
}
if to.Paused {
indexSenders.pause(to.ID)
} else if !isNew && (fsetNil || from.Paused) {
} else if !from.SharedWith(indexSenders.deviceID) || fsetNil || from.Paused {
indexSenders.resume(to, fset)
}
}
Expand Down Expand Up @@ -554,20 +553,10 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Cluster configs might be received and processed before reaching this
// point, i.e. before the folder is started. If that's the case, start
// index senders here.
localSequenceZero := fset.Sequence(protocol.LocalDeviceID) == 0
m.pmut.RLock()
for _, id := range cfg.DeviceIDs() {
if is, ok := m.indexSenders[id]; ok {
if localSequenceZero && fset.Sequence(id) == 0 {
// In case this folder was shared to us and
// newly added, add a new index sender.
is.addNew(cfg, fset)
} else {
// For existing folders we stored the index data from
// the cluster config, so resume based on that - if
// we didn't get a cluster config yet, it's a noop.
is.resume(cfg, fset)
}
is.resume(cfg, fset)
}
}
m.pmut.RUnlock()
Expand Down Expand Up @@ -1175,6 +1164,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
continue
}
m.cfg.AddOrUpdatePendingFolder(folder.ID, folder.Label, deviceID)
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
changed = true
m.evLogger.Log(events.FolderRejected, map[string]string{
"folder": folder.ID,
Expand All @@ -1192,7 +1182,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
}

if cfg.Paused {
indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID])
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
continue
}

Expand Down Expand Up @@ -1234,7 +1224,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
// Shouldn't happen because !cfg.Paused, but might happen
// if the folder is about to be unpaused, but not yet.
l.Debugln("ccH: no fset", folder.ID)
indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID])
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
continue
}

Expand Down Expand Up @@ -2480,7 +2470,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
// Go through the folder configs and figure out if we need to restart or not.

// Tracks devices affected by any configuration change to resend ClusterConfig.
clusterConfigDevices := make(map[protocol.DeviceID]struct{}, len(from.Devices)+len(to.Devices))
clusterConfigDevices := make(deviceIDSet, len(from.Devices)+len(to.Devices))

fromFolders := mapFolders(from.Folders)
toFolders := mapFolders(to.Folders)
Expand All @@ -2493,7 +2483,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
l.Infoln("Adding folder", cfg.Description())
m.newFolder(cfg, to.Options.CacheIgnoredFiles)
}
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, cfg.DeviceIDs())
clusterConfigDevices.add(cfg.DeviceIDs())
}
}

Expand All @@ -2502,7 +2492,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
if !ok {
// The folder was removed.
m.removeFolder(fromCfg)
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
clusterConfigDevices.add(fromCfg.DeviceIDs())
continue
}

Expand All @@ -2514,8 +2504,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
// Check if anything differs that requires a restart.
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles {
m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles)
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, toCfg.DeviceIDs())
clusterConfigDevices.add(fromCfg.DeviceIDs())
clusterConfigDevices.add(toCfg.DeviceIDs())
}

// Emit the folder pause/resume event
Expand Down Expand Up @@ -2589,11 +2579,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
m.pmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
ids := make([]protocol.DeviceID, 0, len(clusterConfigDevices))
for id := range clusterConfigDevices {
ids = append(ids, id)
}
m.resendClusterConfig(ids)
m.resendClusterConfig(clusterConfigDevices.AsSlice())

m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB())
m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
Expand Down Expand Up @@ -2799,13 +2785,22 @@ func sanitizePath(path string) string {
return strings.TrimSpace(b.String())
}

func addDeviceIDsToMap(m map[protocol.DeviceID]struct{}, s []protocol.DeviceID) map[protocol.DeviceID]struct{} {
for _, id := range s {
if _, ok := m[id]; !ok {
m[id] = struct{}{}
type deviceIDSet map[protocol.DeviceID]struct{}

func (s deviceIDSet) add(ids []protocol.DeviceID) {
for _, id := range ids {
if _, ok := s[id]; !ok {
s[id] = struct{}{}
}
}
return m
}

func (s deviceIDSet) AsSlice() []protocol.DeviceID {
ids := make([]protocol.DeviceID, 0, len(s))
for id := range s {
ids = append(ids, id)
}
return ids
}

func encryptionTokenPath(cfg config.FolderConfiguration) string {
Expand Down
28 changes: 21 additions & 7 deletions lib/model/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,9 +1275,6 @@ func TestRequestIndexSenderPause(t *testing.T) {
}

func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
done := make(chan struct{})
defer close(done)

ldb := db.NewLowlevel(backend.OpenMemory())
w, fcfg := tmpDefaultWrapper()
tfs := fcfg.Filesystem()
Expand All @@ -1294,28 +1291,45 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
m.evCancel()
<-m.stopped

// Add connection (sends cluster config) before starting the new model
// Add connection (sends incoming cluster config) before starting the new model
m = newModel(w, myID, "syncthing", "dev", ldb, nil)
defer cleanupModel(m)
fc := addFakeConn(m, device1)
indexChan := make(chan []protocol.FileInfo)
done := make(chan struct{})
defer close(done) // Must be the last thing to be deferred, thus first to run.
indexChan := make(chan []protocol.FileInfo, 1)
ccChan := make(chan protocol.ClusterConfig, 1)
fc.mut.Lock()
fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
select {
case indexChan <- fs:
case <-done:
}
}
fc.clusterConfigFn = func(cc protocol.ClusterConfig) {
select {
case ccChan <- cc:
case <-done:
}
}
fc.mut.Unlock()

m.ServeBackground()
<-m.started

timeout := time.After(5 * time.Second)

// Check that cluster-config is resent after adding folders when starting model
select {
case <-timeout:
t.Fatal("timed out before receiving cluster-config")
case <-ccChan:
}

// Check that an index is sent for the newly added item
must(t, tfs.Mkdir(dir2, 0777))
m.ScanFolders()
select {
case <-time.After(5 * time.Second):
case <-timeout:
t.Fatal("timed out before receiving index")
case <-indexChan:
}
Expand Down