Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/connections, lib/config: Bandwidth throttling per remote device (fixes #4516) #4603

Merged
merged 29 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bf78111
lib/connections: Bandwidth throttling per remote device (fixes #4516)
qepasa Dec 17, 2017
7ca0546
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 17, 2017
e486481
device limiters are now working together with global limiter
qepasa Dec 17, 2017
79f96e5
go fmt changes
qepasa Dec 17, 2017
9b14e01
fixed panic when removing device
qepasa Dec 17, 2017
c1665b3
switched some logging to debug
qepasa Dec 17, 2017
d175118
changed map to sync.Map
qepasa Dec 18, 2017
1070dd6
gofmt
qepasa Dec 18, 2017
b52b326
updated processing of device configurations, changed sync.Map to map …
qepasa Dec 27, 2017
ae58320
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 27, 2017
b6d723d
adjustments after code review
qepasa Jan 8, 2018
a33855f
go fmt changes
qepasa Jan 8, 2018
8d637d6
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 8, 2018
5157154
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 13, 2018
cc63da2
adjustments after code review
qepasa Jan 14, 2018
36368f1
fixed limits change check when rate is unlimited
qepasa Jan 14, 2018
2ce47ad
go fmt
qepasa Jan 14, 2018
f6a92d5
removed unnecessary field from limiter, updated take method comment
qepasa Jan 15, 2018
353983f
Further code review adjustments
qepasa Jan 29, 2018
6967960
clean up rate conversions and remove extra mutex
qepasa Feb 6, 2018
0298b9f
code review adjustments
qepasa Feb 7, 2018
628ae2b
code review adjustments
qepasa Feb 10, 2018
56b471f
go fmt
qepasa Feb 10, 2018
da98cfe
code review adjustments
qepasa Feb 13, 2018
a38f5ea
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Feb 13, 2018
e160d1d
removed unnecessary debug message
qepasa Feb 13, 2018
c3facc1
go fmt
qepasa Feb 13, 2018
d03b6ff
defer removed
qepasa Feb 13, 2018
291a198
refactor newLimiter*
qepasa Feb 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
260 changes: 167 additions & 93 deletions lib/connections/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type limiter struct {
write *rate.Limiter
read *rate.Limiter
limitsLAN atomicBool
deviceReadLimiters *sync.Map
deviceWriteLimiters *sync.Map
deviceReadLimiters map[protocol.DeviceID]*rate.Limiter
deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter
myID protocol.DeviceID
mu *sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sync.Mutex, no need for a pointer (regardless of which sync package)

deviceMapMutex *sync.RWMutex
}

const limiterBurstSize = 4 * 128 << 10
Expand All @@ -39,97 +40,167 @@ func newLimiter(deviceID protocol.DeviceID, cfg *config.Wrapper) *limiter {
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
myID: deviceID,
mu: &sync.Mutex{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust here to nothing at all (standard lib sync) or sync.NewMutex() (our sync)

deviceReadLimiters: new(sync.Map),
deviceWriteLimiters: new(sync.Map),
deviceReadLimiters: make(map[protocol.DeviceID]*rate.Limiter),
deviceWriteLimiters: make(map[protocol.DeviceID]*rate.Limiter),
deviceMapMutex: &sync.RWMutex{},
}

// Get initial device configuration
devices := cfg.RawCopy().Devices
for _, value := range devices {
value.MaxRecvKbps = -1
value.MaxSendKbps = -1

// Keep read/write limiters for every connected device
l.deviceWriteLimiters[value.DeviceID] = rate.NewLimiter(rate.Inf, limiterBurstSize)
l.deviceReadLimiters[value.DeviceID] = rate.NewLimiter(rate.Inf, limiterBurstSize)
}
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}, Devices: devices}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the entire above block isn't necessary, as CommitConfiguration will add all missing devices anyway, so https://github.com/syncthing/syncthing/pull/4603/files#diff-da45d4f40dfa16a9aae760dd650fe79aL35 should be sufficient.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just call rebuildMap here?

// Keep read/write limiters for every connected device
l.rebuildMap(prev)

cfg.Subscribe(l)
l.CommitConfiguration(prev, cfg.RawCopy())
return l
}

// Create new maps with new limiters
func (lim *limiter) rebuildMap(to config.Configuration) {
deviceWriteLimiters := new(sync.Map)
deviceReadLimiters := new(sync.Map)
// This function sets limiters according to corresponding DeviceConfiguration
func (lim *limiter) setLimitsForDevice(v config.DeviceConfiguration, readLimiter, writeLimiter *rate.Limiter) {
// The rate variables are in KiB/s in the config (despite the camel casing
// of the name). We multiply by 1024 to get bytes/s.
if v.MaxRecvKbps <= 0 {
readLimiter.SetLimit(rate.Inf)
} else {
readLimiter.SetLimit(1024 * rate.Limit(v.MaxRecvKbps))
}

if v.MaxSendKbps <= 0 {
writeLimiter.SetLimit(rate.Inf)
} else {
writeLimiter.SetLimit(1024 * rate.Limit(v.MaxSendKbps))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this lives on the lim struct, but doesn't actually use it, meaning it should either be a utility function not part of lim, or use lim and reach into lim.device{Write,Read}Limiters directly.


// This function handles removing, adding and updating of device limiters.
// Pass pointer to avoid copying. Pointer already points to copy of configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear what the pointer comments are about. The configuration parameters are not pointers.

// so we don't have to worry about modifying real config.
func (lim *limiter) processDevicesConfiguration(from, to *config.Configuration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to pass configs as pointers.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noop space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty

// copy *limiter in case remote device is still connected, when remote device is added we create new read/write limiters
devicesToRemove := make(map[protocol.DeviceID]bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be map[DeviceID]struct{}, and devicesToRemove[x] = false is essentially delete(devicesToRemove, x)

Or better would be:

for dev in to.Devices {
   seen[dev] = struct{}{}
}

for dev in from.Devices {
   if  _, ok := seen[dev]; !ok {
      lim.deleteLimiters(dev)
   }
}

// Get all devices that were previously in map, these are candidates for removal
for _, v := range from.Devices {
if v.DeviceID != to.MyID {
devicesToRemove[v.DeviceID] = true
}
}
Copy link
Member

@imsodin imsodin Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be map[protocol.DeviceID]struct{} and then later instead of = false do delete(devicesToRemove, v.DeviceID). Then you can just iterate over the final map in the end without checking a bool.


// Mark devices which should not be removed, create new limiters if needed and assign new limiter rate
for _, v := range to.Devices {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "v" stand for? We usually use "dev".

if readLimiter, ok := lim.deviceReadLimiters.Load(v.DeviceID); ok {
deviceReadLimiters.Store(v.DeviceID, readLimiter)
} else {
deviceReadLimiters.Store(v.DeviceID, rate.NewLimiter(rate.Inf, limiterBurstSize))
if v.DeviceID == to.MyID {
// This limiter was created for local device. Should skip this device
continue
}

if writeLimiter, ok := lim.deviceWriteLimiters.Load(v.DeviceID); ok {
deviceWriteLimiters.Store(v.DeviceID, writeLimiter)
} else {
deviceWriteLimiters.Store(v.DeviceID, rate.NewLimiter(rate.Inf, limiterBurstSize))
readLimiter, okR := lim.getDeviceReadLimiter(v.DeviceID)
// Device has not been removed or added, this is no longer a candidate for removal
if okR {
devicesToRemove[v.DeviceID] = false
}

writeLimiter, okW := lim.getDeviceWriteLimiter(v.DeviceID)
if okW {
if devicesToRemove[v.DeviceID] {
// Device has not been removed or added, we should've
// already marked it as false in devicesToRemove
panic("broken symmetry in device read/write limiters")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complicated. The only way these things get set are under a lock, so it's very unlikely they get out of sync.
Also, perhaps it's better to get getDevice{Read,Write}Limiter always return a new limiter even if one does not exists (and add it to the map).

I am thinking about a case where a new device gets added, and there is a race between CommitConfiguration here, and us making a connection. It's probably always better wrap every connection in a limiter (and create one if it doesn't exist) and then adjust the rates later.


// There was no limiter for this ID in map.
// This means that we added this device and we should create new limiter
if !okR && !okW {
readLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
writeLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)

lim.setDeviceLimiters(v.DeviceID, writeLimiter, readLimiter)
} else if !okR || !okW {
// One of the read/write limiters is not present while the
// corresponding write/read one exists. Something has gone wrong.
panic("broken symmetry in device read/write limiters")
}

// limiters for this device are created so we can store previous rates for logging
previousReadLimit := readLimiter.Limit()
previousWriteLimit := writeLimiter.Limit()

l.Debugf("okR: %t, okW: %t, prevWriteLim: %d, prevReadLim: %d", okR, okW, previousReadLimit, previousWriteLimit)

lim.setLimitsForDevice(v, readLimiter, writeLimiter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not better not to do this if the limits are the same as previously?
As in, move this after the if?


// Nothing about this device has changed. Start processing next device
if okR && okW &&
readLimiter.Limit() == previousReadLimit &&
writeLimiter.Limit() == previousWriteLimit {
continue
}

readLimitStr := "is unlimited"
if v.MaxRecvKbps > 0 {
readLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxRecvKbps)
}
writeLimitStr := "is unlimited"
if v.MaxSendKbps > 0 {
writeLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxSendKbps)
}

l.Infof("Device %s send rate %s, receive rate %s", v.DeviceID, readLimitStr, writeLimitStr)
}

// assign new maps
lim.mu.Lock()
defer lim.mu.Unlock()
lim.deviceWriteLimiters = deviceWriteLimiters
lim.deviceReadLimiters = deviceReadLimiters
// Delete remote devices which were removed in new configuration
for deviceID, shouldBeRemoved := range devicesToRemove {
l.Debugf("deviceID: %s, shouldBeRemoved: %t", deviceID, shouldBeRemoved)
if shouldBeRemoved {
lim.deleteDeviceLimiters(deviceID)
}
}

l.Debugln("Rebuild of device limiters map finished")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better for rebuild to happen in place, this way getting rid of the lock that we have to get on every read and write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lie, as I don't think we're rebuilding stuff here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine stuff getting out of sync a bit if two saves happen in parallel, but to be honest, I am not worried about a case like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I added mutex at the beginning of CommitConfiguration call

}

// Compare read/write limits in configurations
func (lim *limiter) checkDeviceLimits(from, to config.Configuration) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, doesn't need to live on lim.
Also the function name doesn't explain well what this does. Should probably be called deviceLimitsChanged() and invert what it returns.

if len(from.Devices) != len(to.Devices) {
return false
}
// len(from.Devices) == len(to.Devices) so we can do range from.Devices
for i := range from.Devices {
if from.Devices[i].DeviceID != to.Devices[i].DeviceID {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should blow up if a device is removed, as len(to) < len(from).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably write a test case for this logic.
Test case for adding
Test case for removing
Test case for adding and removing in the same config commit cycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit-pick, but we don't really need to rely on the device order here. It's possible that rates aren't changed, just the order gets changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be some sort of cfg.DeviceMap() or something like that utility function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be moved from model to config: https://github.com/syncthing/syncthing/blob/master/lib/model/model.go#L2569
That also makes it easier to remove limiters, just treat it like folders in models CommitConfiguration.

// Something has changed in device configuration
lim.rebuildMap(to)
return false
}
// Read/write limits were changed for this device
if from.Devices[i].MaxSendKbps != to.Devices[i].MaxSendKbps || from.Devices[i].MaxRecvKbps != to.Devices[i].MaxRecvKbps {
if from.Devices[i].MaxSendKbps != to.Devices[i].MaxSendKbps ||
from.Devices[i].MaxRecvKbps != to.Devices[i].MaxRecvKbps {
return false
}
}
return true
}

func (lim *limiter) newReadLimiter(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

func (lim *limiter) newWriteLimiter(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
return nil
}

func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
if len(from.Devices) == len(to.Devices) &&
from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
// to ensure atomic update of configuration
lim.mu.Lock()
defer lim.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? The map mutex should be protection enough, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess when many routines try to save configs at the same time it could cause some trouble when handling device limiters. I.e. first we remove device X then add the same device X and then update of device X rates. Routines could get scheduled so that remove stops right before the loop where deleting is done. Then add would process the device and next config would update it's rates. After removing is resumed we're left with no limiter for X despite it was set it in the most recent config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. To me it seems redundant to have two locks protecting the limiter. Couldn't you just have a single lock that you acquire during CommitConfiguration and newLimitedReader/Writer (and then obviously not acquiring it in getReadLimiter)?


if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan &&
lim.checkDeviceLimits(from, to) {
return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this global check is not very useful as global limits are reset even if just a device changed and whether device change is checked again later on. So I would either not check at all and just reset everything or check the different parameters separately.


// A device has been added or removed
if len(from.Devices) != len(to.Devices) {
lim.rebuildMap(to)
}

// The rate variables are in KiB/s in the config (despite the camel casing
// of the name). We multiply by 1024 to get bytes/s.
if to.Options.MaxRecvKbps <= 0 {
Expand All @@ -146,38 +217,8 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {

lim.limitsLAN.set(to.Options.LimitBandwidthInLan)

// Set limits for devices
for _, v := range to.Devices {
if v.DeviceID == lim.myID {
// This limiter was created for local device. Should skip this device
continue
}

readLimiter, _ := lim.deviceReadLimiters.Load(v.DeviceID)
if v.MaxRecvKbps <= 0 {
readLimiter.(*rate.Limiter).SetLimit(rate.Inf)
} else {
readLimiter.(*rate.Limiter).SetLimit(1024 * rate.Limit(v.MaxRecvKbps))
}

writeLimiter, _ := lim.deviceWriteLimiters.Load(v.DeviceID)
if v.MaxSendKbps <= 0 {
writeLimiter.(*rate.Limiter).SetLimit(rate.Inf)
} else {
writeLimiter.(*rate.Limiter).SetLimit(1024 * rate.Limit(v.MaxSendKbps))
}

sendLimitStr := "is unlimited"
recvLimitStr := "is unlimited"
if v.MaxSendKbps > 0 {
sendLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxSendKbps)
}

if v.MaxRecvKbps > 0 {
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxRecvKbps)
}
l.Infof("Device %s: send rate %s, receive rate %s", v.DeviceID, sendLimitStr, recvLimitStr)
}
// Delete, add or update limiters for devices
lim.processDevicesConfiguration(&from, &to)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a pointer?


sendLimitStr := "is unlimited"
recvLimitStr := "is unlimited"
Expand All @@ -203,6 +244,14 @@ func (lim *limiter) String() string {
return "connections.limiter"
}

func (lim *limiter) newReadLimiter(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're changing this: Wouldn't newLimitedReader be a more appropriate name? (same for the writer below)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

return &limitedReader{reader: r, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

func (lim *limiter) newWriteLimiter(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

// limitedReader is a rate limited io.Reader
type limitedReader struct {
reader io.Reader
Expand All @@ -214,16 +263,11 @@ type limitedReader struct {
func (r *limitedReader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf)
if !r.isLAN || r.limiter.limitsLAN.get() {

// in case rebuildMap was called
r.limiter.mu.Lock()
deviceLimiter, ok := r.limiter.deviceReadLimiters.Load(r.remoteID)
r.limiter.mu.Unlock()
deviceLimiter, ok := r.limiter.getDeviceReadLimiter(r.remoteID)
if !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, if !ok, I'd panic (or do nothing), as it should clearly be there.
Now it's like a bandaid for a problem we can't explain.

I'd do a if deviceLimiter != nil check in take

l.Debugln("deviceReadLimiter was not in the map")
deviceLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
}
take(r.limiter.read, deviceLimiter.(*rate.Limiter), n)
take(r.limiter.read, deviceLimiter, n)
}
return n, err
}
Expand All @@ -238,41 +282,42 @@ type limitedWriter struct {

func (w *limitedWriter) Write(buf []byte) (int, error) {
if !w.isLAN || w.limiter.limitsLAN.get() {

// in case rebuildMap was called
w.limiter.mu.Lock()
deviceLimiter, ok := w.limiter.deviceWriteLimiters.Load(w.remoteID)
w.limiter.mu.Unlock()
deviceLimiter, ok := w.limiter.getDeviceWriteLimiter(w.remoteID)
if !ok {
l.Debugln("deviceWriteLimiter was not in the map")
deviceLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
}
take(w.limiter.write, deviceLimiter.(*rate.Limiter), len(buf))
take(w.limiter.write, deviceLimiter, len(buf))
}
return w.writer.Write(buf)
}

// take is a utility function to consume tokens from a rate.Limiter. No call
// to WaitN can be larger than the limiter burst size so we split it up into
// take is a utility function to consume tokens from a overall rate.Limiter and, if present, deviceLimiter.
// No call to WaitN can be larger than the limiter burst size so we split it up into
// several calls when necessary.
func take(l, deviceLimiter *rate.Limiter, tokens int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the l could do with a better name now, and the doc string should be updated.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

if tokens < limiterBurstSize {
// This is the by far more common case so we get it out of the way
// early.
deviceLimiter.WaitN(context.TODO(), tokens)
if deviceLimiter != nil {
deviceLimiter.WaitN(context.TODO(), tokens)
}
l.WaitN(context.TODO(), tokens)
return
}

for tokens > 0 {
// Consume limiterBurstSize tokens at a time until we're done.
if tokens > limiterBurstSize {
deviceLimiter.WaitN(context.TODO(), limiterBurstSize)
if deviceLimiter != nil {
deviceLimiter.WaitN(context.TODO(), limiterBurstSize)
}
l.WaitN(context.TODO(), limiterBurstSize)
tokens -= limiterBurstSize
} else {
deviceLimiter.WaitN(context.TODO(), tokens)
if deviceLimiter != nil {
deviceLimiter.WaitN(context.TODO(), tokens)
}
l.WaitN(context.TODO(), tokens)
tokens = 0
}
Expand All @@ -292,3 +337,32 @@ func (b *atomicBool) set(v bool) {
func (b *atomicBool) get() bool {
return atomic.LoadInt32((*int32)(b)) != 0
}

// Utility functions for atomic operations on device limiters map
func (lim *limiter) getDeviceWriteLimiter(deviceID protocol.DeviceID) (*rate.Limiter, bool) {
lim.deviceMapMutex.RLock()
defer lim.deviceMapMutex.RUnlock()
limiter, ok := lim.deviceWriteLimiters[deviceID]
return limiter, ok
}

func (lim *limiter) getDeviceReadLimiter(deviceID protocol.DeviceID) (*rate.Limiter, bool) {
lim.deviceMapMutex.RLock()
defer lim.deviceMapMutex.RUnlock()
limiter, ok := lim.deviceReadLimiters[deviceID]
return limiter, ok
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this boolean is needed, all you care about when deciding to print or not print the message about rates is whether the rates are different from what they were before.

}

func (lim *limiter) setDeviceLimiters(deviceID protocol.DeviceID, writeLimiter, readLimiter *rate.Limiter) {
lim.deviceMapMutex.Lock()
defer lim.deviceMapMutex.Unlock()
lim.deviceWriteLimiters[deviceID] = writeLimiter
lim.deviceReadLimiters[deviceID] = readLimiter
}

func (lim *limiter) deleteDeviceLimiters(deviceID protocol.DeviceID) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is literally used in one place, just embed this code in that place and be done with it.

lim.deviceMapMutex.Lock()
defer lim.deviceMapMutex.Unlock()
delete(lim.deviceWriteLimiters, deviceID)
delete(lim.deviceReadLimiters, deviceID)
}