Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/model: Use a single lock #9275

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 42 additions & 65 deletions lib/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ type model struct {
folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders)
folderEncryptionFailures map[string]map[protocol.DeviceID]error // folder -> device -> error regarding encryption consistency (may be missing)

// fields protected by pmut
pmut sync.RWMutex
// fields also protected by fmut
connections map[string]protocol.Connection // connection ID -> connection
deviceConnIDs map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice)
promotedConnID map[protocol.DeviceID]string // device -> latest promoted connection ID
Expand Down Expand Up @@ -238,8 +237,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, ldb *db.Lowlevel, protec
folderEncryptionPasswordTokens: make(map[string][]byte),
folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error),

// fields protected by pmut
pmut: sync.NewRWMutex(),
// ditto
connections: make(map[string]protocol.Connection),
deviceConnIDs: make(map[protocol.DeviceID][]string),
promotedConnID: make(map[protocol.DeviceID]string),
Expand Down Expand Up @@ -312,13 +310,13 @@ func (m *model) initFolders(cfg config.Configuration) error {
}

func (m *model) closeAllConnectionsAndWait() {
m.pmut.RLock()
m.fmut.RLock()
closed := make([]chan struct{}, 0, len(m.connections))
for connID, conn := range m.connections {
closed = append(closed, m.closed[connID])
go conn.Close(errStopped)
}
m.pmut.RUnlock()
m.fmut.RUnlock()
for _, c := range closed {
<-c
}
Expand All @@ -338,7 +336,6 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) {
l.Infof("Starting deadlock detector with %v timeout", timeout)
detector := newDeadlockDetector(timeout, m.evLogger, m.fatal)
detector.Watch("fmut", m.fmut)
detector.Watch("pmut", m.pmut)
}

// Need to hold lock on m.fmut when calling this.
Expand Down Expand Up @@ -472,7 +469,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
// We need to hold both fmut and pmut and must acquire locks in the same
// order always. (The locks can be *released* in any order.)
m.fmut.Lock()
m.pmut.RLock()

isPathUnique := true
for folderID, folderCfg := range m.folderCfgs {
Expand All @@ -498,7 +494,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
})

m.fmut.Unlock()
m.pmut.RUnlock()

// Remove it from the database
db.DropFolder(m.db, cfg.ID)
Expand Down Expand Up @@ -563,15 +558,11 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
}

// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
runner, _ := m.folderRunners.Get(to.ID)
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
r.RegisterFolderState(to, fset, runner)
return nil
})
m.pmut.RUnlock()

var infoMsg string
switch {
Expand Down Expand Up @@ -603,15 +594,11 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Cluster configs might be received and processed before reaching this
// point, i.e. before the folder is started. If that's the case, start
// index senders here.
// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
runner, _ := m.folderRunners.Get(cfg.ID)
r.RegisterFolderState(cfg, fset, runner)
return nil
})
m.pmut.RUnlock()

return nil
}
Expand Down Expand Up @@ -645,11 +632,11 @@ func (m *model) UsageReportingStats(report *contract.Report, version int, previe
blockStatsMut.Unlock()

// Transport stats
m.pmut.RLock()
m.fmut.RLock()
for _, conn := range m.connections {
report.TransportStats[conn.Transport()]++
}
m.pmut.RUnlock()
m.fmut.RUnlock()

// Ignore stats
var seenPrefix [3]bool
Expand Down Expand Up @@ -736,8 +723,8 @@ type ConnectionInfo struct {

// ConnectionStats returns a map with connection statistics for each device.
func (m *model) ConnectionStats() map[string]interface{} {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()

res := make(map[string]interface{})
devs := m.cfg.Devices()
Expand Down Expand Up @@ -812,8 +799,6 @@ func (m *model) ConnectionStats() map[string]interface{} {
func (m *model) DeviceStatistics() (map[protocol.DeviceID]stats.DeviceStatistics, error) {
m.fmut.RLock()
defer m.fmut.RUnlock()
m.pmut.RLock()
defer m.pmut.RUnlock()
res := make(map[protocol.DeviceID]stats.DeviceStatistics, len(m.deviceStatRefs))
for id, sr := range m.deviceStatRefs {
stats, err := sr.GetStatistics()
Expand Down Expand Up @@ -965,10 +950,10 @@ func (m *model) folderCompletion(device protocol.DeviceID, folder string) (Folde
}
defer snap.Release()

m.pmut.RLock()
m.fmut.RLock()
state := m.remoteFolderStates[device][folder]
downloaded := m.deviceDownloads[device].BytesDownloaded(folder)
m.pmut.RUnlock()
m.fmut.RUnlock()

need := snap.NeedSize(device)
need.Bytes -= downloaded
Expand Down Expand Up @@ -1191,9 +1176,9 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
return fmt.Errorf("%s: %w", folder, ErrFolderPaused)
}

m.pmut.RLock()
m.fmut.RLock()
indexHandler, ok := m.getIndexHandlerPRLocked(conn)
m.pmut.RUnlock()
m.fmut.RUnlock()
if !ok {
// This should be impossible, as an index handler is registered when
// we send a cluster config, and that is what triggers index
Expand Down Expand Up @@ -1306,9 +1291,9 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
return err
}

m.pmut.Lock()
m.fmut.Lock()
m.remoteFolderStates[deviceID] = states
m.pmut.Unlock()
m.fmut.Unlock()

m.evLogger.Log(events.ClusterConfigReceived, ClusterConfigReceivedEventData{
Device: deviceID,
Expand All @@ -1317,11 +1302,11 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
if len(tempIndexFolders) > 0 {
var connOK bool
var conn protocol.Connection
m.pmut.RLock()
m.fmut.RLock()
if connIDs, connIDOK := m.deviceConnIDs[deviceID]; connIDOK {
conn, connOK = m.connections[connIDs[0]]
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// In case we've got ClusterConfig, and the connection disappeared
// from infront of our nose.
if connOK {
Expand Down Expand Up @@ -1354,11 +1339,8 @@ func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegist
deviceID := conn.DeviceID()
connID := conn.ConnectionID()

// We must acquire fmut first when acquiring both locks.
m.fmut.RLock()
defer m.fmut.RUnlock()
m.pmut.Lock()
defer m.pmut.Unlock()
m.fmut.Lock()
defer m.fmut.Unlock()

indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID)
if ok && indexHandlerRegistry.conn.ConnectionID() == connID {
Expand Down Expand Up @@ -1650,13 +1632,13 @@ func (m *model) sendClusterConfig(ids []protocol.DeviceID) {
return
}
ccConns := make([]protocol.Connection, 0, len(ids))
m.pmut.RLock()
m.fmut.RLock()
for _, id := range ids {
if connIDs, ok := m.deviceConnIDs[id]; ok {
ccConns = append(ccConns, m.connections[connIDs[0]])
}
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
for _, conn := range ccConns {
cm, passwords := m.generateClusterConfig(conn.DeviceID())
Expand Down Expand Up @@ -1893,10 +1875,10 @@ func (m *model) Closed(conn protocol.Connection, err error) {
connID := conn.ConnectionID()
deviceID := conn.DeviceID()

m.pmut.Lock()
m.fmut.Lock()
conn, ok := m.connections[connID]
if !ok {
m.pmut.Unlock()
m.fmut.Unlock()
return
}

Expand Down Expand Up @@ -1927,7 +1909,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
m.deviceConnIDs[deviceID] = remainingConns
}

m.pmut.Unlock()
m.fmut.Unlock()
if wait != nil {
<-wait
}
Expand Down Expand Up @@ -2029,9 +2011,9 @@ func (m *model) Request(conn protocol.Connection, folder, name string, _, size i

// Restrict parallel requests by connection/device

m.pmut.RLock()
m.fmut.RLock()
limiter := m.connRequestLimiters[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()

// The requestResponse releases the bytes to the buffer pool and the
// limiters when its Close method is called.
Expand Down Expand Up @@ -2218,9 +2200,9 @@ func (m *model) GetMtimeMapping(folder string, file string) (fs.MtimeMapping, er

// Connection returns if we are connected to the given device.
func (m *model) ConnectedTo(deviceID protocol.DeviceID) bool {
m.pmut.RLock()
m.fmut.RLock()
_, ok := m.deviceConnIDs[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()
return ok
}

Expand Down Expand Up @@ -2353,7 +2335,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
connID := conn.ConnectionID()
closed := make(chan struct{})

m.pmut.Lock()
m.fmut.Lock()

m.connections[connID] = conn
m.closed[connID] = closed
Expand Down Expand Up @@ -2384,7 +2366,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
l.Infof(`Additional connection (+%d) for device %s at %s`, len(m.deviceConnIDs[deviceID])-1, deviceID.Short(), conn)
}

m.pmut.Unlock()
m.fmut.Unlock()

if (deviceCfg.Name == "" || m.cfg.Options().OverwriteRemoteDevNames) && hello.DeviceName != "" {
m.cfg.Modify(func(cfg *config.Configuration) {
Expand Down Expand Up @@ -2415,11 +2397,8 @@ func (m *model) scheduleConnectionPromotion() {
// be called after adding new connections, and after closing a primary
// device connection.
func (m *model) promoteConnections() {
m.fmut.RLock() // for generateClusterConfigFRLocked
defer m.fmut.RUnlock()

m.pmut.Lock() // for most other things
defer m.pmut.Unlock()
m.fmut.Lock()
defer m.fmut.Unlock()

for deviceID, connIDs := range m.deviceConnIDs {
cm, passwords := m.generateClusterConfigFRLocked(deviceID)
Expand Down Expand Up @@ -2464,9 +2443,9 @@ func (m *model) DownloadProgress(conn protocol.Connection, folder string, update
return nil
}

m.pmut.RLock()
m.fmut.RLock()
downloads := m.deviceDownloads[deviceID]
m.pmut.RUnlock()
m.fmut.RUnlock()
downloads.Update(folder, updates)
state := downloads.GetBlockCounts(folder)

Expand Down Expand Up @@ -2511,8 +2490,8 @@ func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, f
// ("primary") connection, which is dedicated to index data, and pick a
// random one of the others.
func (m *model) requestConnectionForDevice(deviceID protocol.DeviceID) (protocol.Connection, bool) {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()

connIDs, ok := m.deviceConnIDs[deviceID]
if !ok {
Expand Down Expand Up @@ -2903,12 +2882,10 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc
// get heavily modified on Close()), but also must acquire fmut before
// pmut. (The locks can be *released* in any order.)
m.fmut.RLock()
m.pmut.RLock()
defer m.pmut.RUnlock()
defer m.fmut.RUnlock()

fs, ok := m.folderFiles[folder]
cfg := m.folderCfgs[folder]
m.fmut.RUnlock()

if !ok {
return nil, ErrFolderMissing
Expand All @@ -2924,8 +2901,8 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc
}

func (m *model) availabilityInSnapshot(cfg config.FolderConfiguration, snap *db.Snapshot, file protocol.FileInfo, block protocol.BlockInfo) []Availability {
m.pmut.RLock()
defer m.pmut.RUnlock()
m.fmut.RLock()
defer m.fmut.RUnlock()
return m.availabilityInSnapshotPRlocked(cfg, snap, file, block)
}

Expand Down Expand Up @@ -3113,9 +3090,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}

if toCfg.MaxRequestKiB != fromCfg.MaxRequestKiB {
m.pmut.Lock()
m.fmut.Lock()
m.setConnRequestLimitersPLocked(toCfg)
m.pmut.Unlock()
m.fmut.Unlock()
}
}

Expand All @@ -3129,7 +3106,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
m.fmut.Unlock()

m.pmut.RLock()
m.fmut.RLock()
for _, id := range closeDevices {
delete(clusterConfigDevices, id)
if conns, ok := m.deviceConnIDs[id]; ok {
Expand All @@ -3146,7 +3123,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
}
}
m.pmut.RUnlock()
m.fmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
m.sendClusterConfig(clusterConfigDevices.AsSlice())

Expand Down
Loading
Loading