Skip to content

Commit

Permalink
lib/model: Use a single lock (#9275)
Browse files Browse the repository at this point in the history
I'm tired of the fmut/pmut shenanigans. This consolidates both under one
lock; I'm not convinced there are any significant performance
differences with this approach since we're literally just protecting map
juggling...

- The locking goes away when we were already under an appropriate fmut
lock.
- Where we had fmut.RLock()+pmut.Lock() it gets upgraded to an
fmut.Lock().
- Otherwise s/pmut/fmut/.

In order to avoid diff noise for an important change I did not do the
following cleanups, which will be filed in a PR after this one, if
accepted:

- Renaming fmut to just mut
- Renaming methods that refer to being "PRLocked" and stuff like that
- Removing the no longer relevant deadlock detector
- Comments referring to pmut and locking sequences...
  • Loading branch information
calmh committed Dec 11, 2023
1 parent c53a1f2 commit 6f10236
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 71 deletions.
107 changes: 42 additions & 65 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ type model struct {
folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders)
folderEncryptionFailures map[string]map[protocol.DeviceID]error // folder -> device -> error regarding encryption consistency (may be missing)

// fields protected by pmut
pmut sync.RWMutex
// fields also protected by fmut
connections map[string]protocol.Connection // connection ID -> connection
deviceConnIDs map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice)
promotedConnID map[protocol.DeviceID]string // device -> latest promoted connection ID
Expand Down Expand Up @@ -238,8 +237,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, ldb *db.Lowlevel, protec
folderEncryptionPasswordTokens: make(map[string][]byte),
folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error),

// fields protected by pmut
pmut: sync.NewRWMutex(),
// ditto
connections: make(map[string]protocol.Connection),
deviceConnIDs: make(map[protocol.DeviceID][]string),
promotedConnID: make(map[protocol.DeviceID]string),
Expand Down Expand Up @@ -312,13 +310,13 @@ func (m *model) initFolders(cfg config.Configuration) error {
}

func (m *model) closeAllConnectionsAndWait() {
m.pmut.RLock()
m.fmut.RLock()
closed := make([]chan struct{}, 0, len(m.connections))
for connID, conn := range m.connections {
closed = append(closed, m.closed[connID])
go conn.Close(errStopped)
}
m.pmut.RUnlock()
m.fmut.RUnlock()
for _, c := range closed {
<-c
}
Expand All @@ -338,7 +336,6 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) {
l.Infof("Starting deadlock detector with %v timeout", timeout)
detector := newDeadlockDetector(timeout, m.evLogger, m.fatal)
detector.Watch("fmut", m.fmut)
detector.Watch("pmut", m.pmut)
}

// Need to hold lock on m.fmut when calling this.
Expand Down Expand Up @@ -472,7 +469,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
// We need to hold both fmut and pmut and must acquire locks in the same
// order always. (The locks can be *released* in any order.)
m.fmut.Lock()
m.pmut.RLock()

isPathUnique := true
for folderID, folderCfg := range m.folderCfgs {
Expand All @@ -498,7 +494,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
})

m.fmut.Unlock()
m.pmut.RUnlock()

// Remove it from the database
db.DropFolder(m.db, cfg.ID)
Expand Down Expand Up @@ -563,15 +558,11 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
}

// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
runner, _ := m.folderRunners.Get(to.ID)
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
r.RegisterFolderState(to, fset, runner)
return nil
})
m.pmut.RUnlock()

var infoMsg string
switch {
Expand Down Expand Up @@ -603,15 +594,11 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Cluster configs might be received and processed before reaching this
// point, i.e. before the folder is started. If that's the case, start
// index senders here.
// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
runner, _ := m.folderRunners.Get(cfg.ID)
r.RegisterFolderState(cfg, fset, runner)
return nil
})
m.pmut.RUnlock()

return nil
}
Expand Down Expand Up @@ -645,11 +632,11 @@ func (m *model) UsageReportingStats(report *contract.Report, version int, previe
blockStatsMut.Unlock()

// Transport stats
m.pmut.RLock()
m.fmut.RLock()
for _, conn := range m.connections {
report.TransportStats[conn.Transport()]++
}
m.pmut.RUnlock()
m.fmut.RUnlock()

// Ignore stats
var seenPrefix [3]bool
Expand Down Expand Up @@ -736,8 +723,8 @@ type ConnectionInfo struct {

// ConnectionStats returns a map with connection statistics for each device.
func (m *model) ConnectionStats() map[string]interface{} {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()

res := make(map[string]interface{})
devs := m.cfg.Devices()
Expand Down Expand Up @@ -812,8 +799,6 @@ func (m *model) ConnectionStats() map[string]interface{} {
func (m *model) DeviceStatistics() (map[protocol.DeviceID]stats.DeviceStatistics, error) {
m.fmut.RLock()
defer m.fmut.RUnlock()
m.pmut.RLock()
defer m.pmut.RUnlock()
res := make(map[protocol.DeviceID]stats.DeviceStatistics, len(m.deviceStatRefs))
for id, sr := range m.deviceStatRefs {
stats, err := sr.GetStatistics()
Expand Down Expand Up @@ -965,10 +950,10 @@ func (m *model) folderCompletion(device protocol.DeviceID, folder string) (Folde
}
defer snap.Release()

m.pmut.RLock()
m.fmut.RLock()
state := m.remoteFolderStates[device][folder]
downloaded := m.deviceDownloads[device].BytesDownloaded(folder)
m.pmut.RUnlock()
m.fmut.RUnlock()

need := snap.NeedSize(device)
need.Bytes -= downloaded
Expand Down Expand Up @@ -1191,9 +1176,9 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
return fmt.Errorf("%s: %w", folder, ErrFolderPaused)
}

m.pmut.RLock()
m.fmut.RLock()
indexHandler, ok := m.getIndexHandlerPRLocked(conn)
m.pmut.RUnlock()
m.fmut.RUnlock()
if !ok {
// This should be impossible, as an index handler is registered when
// we send a cluster config, and that is what triggers index
Expand Down Expand Up @@ -1306,9 +1291,9 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
return err
}

m.pmut.Lock()
m.fmut.Lock()
m.remoteFolderStates[deviceID] = states
m.pmut.Unlock()
m.fmut.Unlock()

m.evLogger.Log(events.ClusterConfigReceived, ClusterConfigReceivedEventData{
Device: deviceID,
Expand All @@ -1317,11 +1302,11 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
if len(tempIndexFolders) > 0 {
var connOK bool
var conn protocol.Connection
m.pmut.RLock()
m.fmut.RLock()
if connIDs, connIDOK := m.deviceConnIDs[deviceID]; connIDOK {
conn, connOK = m.connections[connIDs[0]]
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// In case we've got ClusterConfig, and the connection disappeared
// from infront of our nose.
if connOK {
Expand Down Expand Up @@ -1354,11 +1339,8 @@ func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegist
deviceID := conn.DeviceID()
connID := conn.ConnectionID()

// We must acquire fmut first when acquiring both locks.
m.fmut.RLock()
defer m.fmut.RUnlock()
m.pmut.Lock()
defer m.pmut.Unlock()
m.fmut.Lock()
defer m.fmut.Unlock()

indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID)
if ok && indexHandlerRegistry.conn.ConnectionID() == connID {
Expand Down Expand Up @@ -1650,13 +1632,13 @@ func (m *model) sendClusterConfig(ids []protocol.DeviceID) {
return
}
ccConns := make([]protocol.Connection, 0, len(ids))
m.pmut.RLock()
m.fmut.RLock()
for _, id := range ids {
if connIDs, ok := m.deviceConnIDs[id]; ok {
ccConns = append(ccConns, m.connections[connIDs[0]])
}
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
for _, conn := range ccConns {
cm, passwords := m.generateClusterConfig(conn.DeviceID())
Expand Down Expand Up @@ -1893,10 +1875,10 @@ func (m *model) Closed(conn protocol.Connection, err error) {
connID := conn.ConnectionID()
deviceID := conn.DeviceID()

m.pmut.Lock()
m.fmut.Lock()
conn, ok := m.connections[connID]
if !ok {
m.pmut.Unlock()
m.fmut.Unlock()
return
}

Expand Down Expand Up @@ -1927,7 +1909,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
m.deviceConnIDs[deviceID] = remainingConns
}

m.pmut.Unlock()
m.fmut.Unlock()
if wait != nil {
<-wait
}
Expand Down Expand Up @@ -2029,9 +2011,9 @@ func (m *model) Request(conn protocol.Connection, folder, name string, _, size i

// Restrict parallel requests by connection/device

m.pmut.RLock()
m.fmut.RLock()
limiter := m.connRequestLimiters[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()

// The requestResponse releases the bytes to the buffer pool and the
// limiters when its Close method is called.
Expand Down Expand Up @@ -2218,9 +2200,9 @@ func (m *model) GetMtimeMapping(folder string, file string) (fs.MtimeMapping, er

// Connection returns if we are connected to the given device.
func (m *model) ConnectedTo(deviceID protocol.DeviceID) bool {
m.pmut.RLock()
m.fmut.RLock()
_, ok := m.deviceConnIDs[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()
return ok
}

Expand Down Expand Up @@ -2353,7 +2335,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
connID := conn.ConnectionID()
closed := make(chan struct{})

m.pmut.Lock()
m.fmut.Lock()

m.connections[connID] = conn
m.closed[connID] = closed
Expand Down Expand Up @@ -2384,7 +2366,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
l.Infof(`Additional connection (+%d) for device %s at %s`, len(m.deviceConnIDs[deviceID])-1, deviceID.Short(), conn)
}

m.pmut.Unlock()
m.fmut.Unlock()

if (deviceCfg.Name == "" || m.cfg.Options().OverwriteRemoteDevNames) && hello.DeviceName != "" {
m.cfg.Modify(func(cfg *config.Configuration) {
Expand Down Expand Up @@ -2415,11 +2397,8 @@ func (m *model) scheduleConnectionPromotion() {
// be called after adding new connections, and after closing a primary
// device connection.
func (m *model) promoteConnections() {
m.fmut.RLock() // for generateClusterConfigFRLocked
defer m.fmut.RUnlock()

m.pmut.Lock() // for most other things
defer m.pmut.Unlock()
m.fmut.Lock()
defer m.fmut.Unlock()

for deviceID, connIDs := range m.deviceConnIDs {
cm, passwords := m.generateClusterConfigFRLocked(deviceID)
Expand Down Expand Up @@ -2464,9 +2443,9 @@ func (m *model) DownloadProgress(conn protocol.Connection, folder string, update
return nil
}

m.pmut.RLock()
m.fmut.RLock()
downloads := m.deviceDownloads[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()
downloads.Update(folder, updates)
state := downloads.GetBlockCounts(folder)

Expand Down Expand Up @@ -2511,8 +2490,8 @@ func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, f
// ("primary") connection, which is dedicated to index data, and pick a
// random one of the others.
func (m *model) requestConnectionForDevice(deviceID protocol.DeviceID) (protocol.Connection, bool) {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()

connIDs, ok := m.deviceConnIDs[deviceID]
if !ok {
Expand Down Expand Up @@ -2903,12 +2882,10 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc
// get heavily modified on Close()), but also must acquire fmut before
// pmut. (The locks can be *released* in any order.)
m.fmut.RLock()
m.pmut.RLock()
defer m.pmut.RUnlock()
defer m.fmut.RUnlock()

fs, ok := m.folderFiles[folder]
cfg := m.folderCfgs[folder]
m.fmut.RUnlock()

if !ok {
return nil, ErrFolderMissing
Expand All @@ -2924,8 +2901,8 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc
}

func (m *model) availabilityInSnapshot(cfg config.FolderConfiguration, snap *db.Snapshot, file protocol.FileInfo, block protocol.BlockInfo) []Availability {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()
return m.availabilityInSnapshotPRlocked(cfg, snap, file, block)
}

Expand Down Expand Up @@ -3113,9 +3090,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}

if toCfg.MaxRequestKiB != fromCfg.MaxRequestKiB {
m.pmut.Lock()
m.fmut.Lock()
m.setConnRequestLimitersPLocked(toCfg)
m.pmut.Unlock()
m.fmut.Unlock()
}
}

Expand All @@ -3129,7 +3106,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
m.fmut.Unlock()

m.pmut.RLock()
m.fmut.RLock()
for _, id := range closeDevices {
delete(clusterConfigDevices, id)
if conns, ok := m.deviceConnIDs[id]; ok {
Expand All @@ -3146,7 +3123,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
}
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
m.sendClusterConfig(clusterConfigDevices.AsSlice())

Expand Down

0 comments on commit 6f10236

Please sign in to comment.