-
-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lib/connections, lib/config: Bandwidth throttling per remote device (fixes #4516) #4603
Changes from 23 commits
bf78111
7ca0546
e486481
79f96e5
9b14e01
c1665b3
d175118
1070dd6
b52b326
ae58320
b6d723d
a33855f
8d637d6
5157154
cc63da2
36368f1
2ce47ad
f6a92d5
353983f
6967960
0298b9f
628ae2b
56b471f
da98cfe
a38f5ea
e160d1d
c3facc1
d03b6ff
291a198
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,44 +12,121 @@ import ( | |
"sync/atomic" | ||
|
||
"github.com/syncthing/syncthing/lib/config" | ||
"github.com/syncthing/syncthing/lib/protocol" | ||
"golang.org/x/net/context" | ||
"golang.org/x/time/rate" | ||
"sync" | ||
) | ||
|
||
// limiter manages a read and write rate limit, reacting to config changes | ||
// as appropriate. | ||
type limiter struct { | ||
write *rate.Limiter | ||
read *rate.Limiter | ||
limitsLAN atomicBool | ||
write *rate.Limiter | ||
read *rate.Limiter | ||
limitsLAN atomicBool | ||
deviceReadLimiters map[protocol.DeviceID]*rate.Limiter | ||
deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter | ||
mu *sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just |
||
} | ||
|
||
const limiterBurstSize = 4 * 128 << 10 | ||
|
||
func newLimiter(cfg *config.Wrapper) *limiter { | ||
l := &limiter{ | ||
write: rate.NewLimiter(rate.Inf, limiterBurstSize), | ||
read: rate.NewLimiter(rate.Inf, limiterBurstSize), | ||
write: rate.NewLimiter(rate.Inf, limiterBurstSize), | ||
read: rate.NewLimiter(rate.Inf, limiterBurstSize), | ||
mu: &sync.Mutex{}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adjust here to nothing at all (standard lib |
||
deviceReadLimiters: make(map[protocol.DeviceID]*rate.Limiter), | ||
deviceWriteLimiters: make(map[protocol.DeviceID]*rate.Limiter), | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could just call rebuildMap here? |
||
cfg.Subscribe(l) | ||
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}} | ||
|
||
l.CommitConfiguration(prev, cfg.RawCopy()) | ||
return l | ||
} | ||
|
||
func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader { | ||
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN} | ||
// This function sets limiters according to corresponding DeviceConfiguration | ||
func (lim *limiter) setLimitsLocked(device config.DeviceConfiguration) bool { | ||
readLimiter := lim.getReadLimiterLocked(device.DeviceID) | ||
writeLimiter := lim.getWriteLimiterLocked(device.DeviceID) | ||
|
||
// limiters for this device are created so we can store previous rates for logging | ||
previousReadLimit := readLimiter.Limit() | ||
previousWriteLimit := writeLimiter.Limit() | ||
currentReadLimit := rate.Limit(device.MaxRecvKbps) * 1024 | ||
currentWriteLimit := rate.Limit(device.MaxSendKbps) * 1024 | ||
if device.MaxSendKbps <= 0 { | ||
currentWriteLimit = rate.Inf | ||
} | ||
if device.MaxRecvKbps <= 0 { | ||
currentReadLimit = rate.Inf | ||
} | ||
// Nothing about this device has changed. Start processing next device | ||
if previousWriteLimit == currentWriteLimit && previousReadLimit == currentReadLimit { | ||
return false | ||
} | ||
|
||
readLimiter.SetLimit(currentReadLimit) | ||
writeLimiter.SetLimit(currentWriteLimit) | ||
|
||
return true | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now this lives on the lim struct, but doesn't actually use it, meaning it should either be a utility function not part of lim, or use lim and reach into lim.device{Write,Read}Limiters directly. |
||
|
||
func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer { | ||
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN} | ||
// This function handles removing, adding and updating of device limiters. | ||
// Pass pointer to avoid copying. Pointer already points to copy of configuration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not clear what the pointer comments are about. The configuration parameters are not pointers. |
||
// so we don't have to worry about modifying real config. | ||
func (lim *limiter) processDevicesConfigurationLocked(from, to config.Configuration) { | ||
seen := make(map[protocol.DeviceID]struct{}) | ||
|
||
// Mark devices which should not be removed, create new limiters if needed and assign new limiter rate | ||
for _, dev := range to.Devices { | ||
if dev.DeviceID == to.MyID { | ||
// This limiter was created for local device. Should skip this device | ||
continue | ||
} | ||
seen[dev.DeviceID] = struct{}{} | ||
|
||
if lim.setLimitsLocked(dev) { | ||
readLimitStr := "is unlimited" | ||
if dev.MaxRecvKbps > 0 { | ||
readLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxRecvKbps) | ||
} | ||
writeLimitStr := "is unlimited" | ||
if dev.MaxSendKbps > 0 { | ||
writeLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxSendKbps) | ||
} | ||
|
||
l.Infof("Device %s send rate %s, receive rate %s", dev.DeviceID, readLimitStr, writeLimitStr) | ||
} | ||
} | ||
|
||
// Delete remote devices which were removed in new configuration | ||
for _, dev := range from.Devices { | ||
if _, ok := seen[dev.DeviceID]; !ok { | ||
l.Debugf("deviceID: %s should be removed", dev.DeviceID) | ||
|
||
delete(lim.deviceWriteLimiters, dev.DeviceID) | ||
delete(lim.deviceReadLimiters, dev.DeviceID) | ||
} | ||
} | ||
|
||
l.Debugln("Processing of device limiters map finished") | ||
} | ||
|
||
func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error { | ||
return nil | ||
} | ||
|
||
func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { | ||
// to ensure atomic update of configuration | ||
lim.mu.Lock() | ||
defer lim.mu.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary? The map mutex should be protection enough, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess when many routines try to save configs at the same time it could cause some trouble when handling device limiters. I.e. first we remove device X then add the same device X and then update of device X rates. Routines could get scheduled so that remove stops right before the loop where deleting is done. Then add would process the device and next config would update it's rates. After removing is resumed we're left with no limiter for X despite it was set it in the most recent config. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. To me it seems redundant to have two locks protecting the limiter. Couldn't you just have a single lock that you acquire during |
||
|
||
// Delete, add or update limiters for devices | ||
lim.processDevicesConfigurationLocked(from, to) | ||
|
||
if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps && | ||
from.Options.MaxSendKbps == to.Options.MaxSendKbps && | ||
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan { | ||
|
@@ -58,7 +135,6 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { | |
|
||
// The rate variables are in KiB/s in the config (despite the camel casing | ||
// of the name). We multiply by 1024 to get bytes/s. | ||
|
||
if to.Options.MaxRecvKbps <= 0 { | ||
lim.read.SetLimit(rate.Inf) | ||
} else { | ||
|
@@ -81,7 +157,7 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool { | |
if to.Options.MaxRecvKbps > 0 { | ||
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps) | ||
} | ||
l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr) | ||
l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr) | ||
|
||
if to.Options.LimitBandwidthInLan { | ||
l.Infoln("Rate limits apply to LAN connections") | ||
|
@@ -97,52 +173,67 @@ func (lim *limiter) String() string { | |
return "connections.limiter" | ||
} | ||
|
||
func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader { | ||
deviceLimiter := lim.getReadLimiterLocked(remoteID) | ||
return &limitedReader{reader: r, limiter: lim, deviceLimiter: deviceLimiter, isLAN: isLAN} | ||
} | ||
|
||
func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer { | ||
deviceLimiter := lim.getWriteLimiterLocked(remoteID) | ||
return &limitedWriter{writer: w, limiter: lim, deviceLimiter: deviceLimiter, isLAN: isLAN} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fact that processblabla does locking outside the function, and this does inside, feels wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @AudriusButkevicius We are starting to contradict each other once again x-): My reasoning to suggest this was that the locking now happens in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine, but why can't we lock lim.mu before calling the newblabla twice in the connection service, this way we lock the same way in both places. Also a function that needs locking before called, we should add the locked word into the name like we do in the model. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Merge these two two line functions into the callee to reduce the mental stack depth. |
||
} | ||
|
||
// limitedReader is a rate limited io.Reader | ||
type limitedReader struct { | ||
reader io.Reader | ||
limiter *limiter | ||
isLAN bool | ||
reader io.Reader | ||
limiter *limiter | ||
deviceLimiter *rate.Limiter | ||
isLAN bool | ||
} | ||
|
||
func (r *limitedReader) Read(buf []byte) (int, error) { | ||
n, err := r.reader.Read(buf) | ||
if !r.isLAN || r.limiter.limitsLAN.get() { | ||
take(r.limiter.read, n) | ||
take(r.limiter.read, r.deviceLimiter, n) | ||
} | ||
return n, err | ||
} | ||
|
||
// limitedWriter is a rate limited io.Writer | ||
type limitedWriter struct { | ||
writer io.Writer | ||
limiter *limiter | ||
isLAN bool | ||
writer io.Writer | ||
limiter *limiter | ||
deviceLimiter *rate.Limiter | ||
isLAN bool | ||
} | ||
|
||
func (w *limitedWriter) Write(buf []byte) (int, error) { | ||
if !w.isLAN || w.limiter.limitsLAN.get() { | ||
take(w.limiter.write, len(buf)) | ||
take(w.limiter.write, w.deviceLimiter, len(buf)) | ||
} | ||
return w.writer.Write(buf) | ||
} | ||
|
||
// take is a utility function to consume tokens from a rate.Limiter. No call | ||
// to WaitN can be larger than the limiter burst size so we split it up into | ||
// take is a utility function to consume tokens from a overall rate.Limiter and deviceLimiter. | ||
// No call to WaitN can be larger than the limiter burst size so we split it up into | ||
// several calls when necessary. | ||
func take(l *rate.Limiter, tokens int) { | ||
func take(l, deviceLimiter *rate.Limiter, tokens int) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the l could do with a better name now, and the doc string should be updated. |
||
if tokens < limiterBurstSize { | ||
// This is the by far more common case so we get it out of the way | ||
// early. | ||
deviceLimiter.WaitN(context.TODO(), tokens) | ||
l.WaitN(context.TODO(), tokens) | ||
return | ||
} | ||
|
||
for tokens > 0 { | ||
// Consume limiterBurstSize tokens at a time until we're done. | ||
if tokens > limiterBurstSize { | ||
deviceLimiter.WaitN(context.TODO(), limiterBurstSize) | ||
l.WaitN(context.TODO(), limiterBurstSize) | ||
tokens -= limiterBurstSize | ||
} else { | ||
deviceLimiter.WaitN(context.TODO(), tokens) | ||
l.WaitN(context.TODO(), tokens) | ||
tokens = 0 | ||
} | ||
|
@@ -162,3 +253,26 @@ func (b *atomicBool) set(v bool) { | |
func (b *atomicBool) get() bool { | ||
return atomic.LoadInt32((*int32)(b)) != 0 | ||
} | ||
|
||
// Utility functions for atomic operations on device limiters map | ||
func (lim *limiter) getWriteLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter { | ||
limiter, ok := lim.deviceWriteLimiters[deviceID] | ||
|
||
if !ok { | ||
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize) | ||
lim.deviceWriteLimiters[deviceID] = limiter | ||
} | ||
|
||
return limiter | ||
} | ||
|
||
func (lim *limiter) getReadLimiterLocked(deviceID protocol.DeviceID) *rate.Limiter { | ||
limiter, ok := lim.deviceReadLimiters[deviceID] | ||
|
||
if !ok { | ||
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize) | ||
lim.deviceReadLimiters[deviceID] = limiter | ||
} | ||
|
||
return limiter | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be our
lib/sync