Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/connections, lib/config: Bandwidth throttling per remote device (fixes #4516) #4603

Merged
merged 29 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bf78111
lib/connections: Bandwidth throttling per remote device (fixes #4516)
qepasa Dec 17, 2017
7ca0546
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 17, 2017
e486481
device limiters are now working together with global limiter
qepasa Dec 17, 2017
79f96e5
go fmt changes
qepasa Dec 17, 2017
9b14e01
fixed panic when removing device
qepasa Dec 17, 2017
c1665b3
switched some logging to debug
qepasa Dec 17, 2017
d175118
changed map to sync.Map
qepasa Dec 18, 2017
1070dd6
gofmt
qepasa Dec 18, 2017
b52b326
updated processing of device configurations, changed sync.Map to map …
qepasa Dec 27, 2017
ae58320
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 27, 2017
b6d723d
adjustments after code review
qepasa Jan 8, 2018
a33855f
go fmt changes
qepasa Jan 8, 2018
8d637d6
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 8, 2018
5157154
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 13, 2018
cc63da2
adjustments after code review
qepasa Jan 14, 2018
36368f1
fixed limits change check when rate is unlimited
qepasa Jan 14, 2018
2ce47ad
go fmt
qepasa Jan 14, 2018
f6a92d5
removed unnecessary field from limiter, updated take method comment
qepasa Jan 15, 2018
353983f
Further code review adjustments
qepasa Jan 29, 2018
6967960
clean up rate conversions and remove extra mutex
qepasa Feb 6, 2018
0298b9f
code review adjustments
qepasa Feb 7, 2018
628ae2b
code review adjustments
qepasa Feb 10, 2018
56b471f
go fmt
qepasa Feb 10, 2018
da98cfe
code review adjustments
qepasa Feb 13, 2018
a38f5ea
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Feb 13, 2018
e160d1d
removed unnecessary debug message
qepasa Feb 13, 2018
c3facc1
go fmt
qepasa Feb 13, 2018
d03b6ff
defer removed
qepasa Feb 13, 2018
291a198
refactor newLimiter*
qepasa Feb 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,12 @@ func cleanSymlinks(filesystem fs.Filesystem, dir string) {
return nil
})
}

// mapDeviceConfigs returns a map of device ID to device configuration for the given configuration.
func (cfg *Configuration) DeviceMap() map[protocol.DeviceID]DeviceConfiguration {
m := make(map[protocol.DeviceID]DeviceConfiguration, len(cfg.Devices))
for _, dev := range cfg.Devices {
m[dev.DeviceID] = dev
}
return m
}
2 changes: 2 additions & 0 deletions lib/config/deviceconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type DeviceConfiguration struct {
Paused bool `xml:"paused" json:"paused"`
AllowedNetworks []string `xml:"allowedNetwork,omitempty" json:"allowedNetworks"`
AutoAcceptFolders bool `xml:"autoAcceptFolders" json:"autoAcceptFolders"`
MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"`
MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"`
}

func NewDeviceConfiguration(id protocol.DeviceID, name string) DeviceConfiguration {
Expand Down
162 changes: 140 additions & 22 deletions lib/connections/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,121 @@ import (
"sync/atomic"

"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
"golang.org/x/net/context"
"golang.org/x/time/rate"
"sync"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be our lib/sync

)

// limiter manages a read and write rate limit, reacting to config changes
// as appropriate.
type limiter struct {
write *rate.Limiter
read *rate.Limiter
limitsLAN atomicBool
write *rate.Limiter
read *rate.Limiter
limitsLAN atomicBool
deviceReadLimiters map[protocol.DeviceID]*rate.Limiter
deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter
mu *sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sync.Mutex, no need for a pointer (regardless of which sync package)

}

const limiterBurstSize = 4 * 128 << 10

func newLimiter(cfg *config.Wrapper) *limiter {
l := &limiter{
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
mu: &sync.Mutex{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust here to nothing at all (standard lib sync) or sync.NewMutex() (our sync)

deviceReadLimiters: make(map[protocol.DeviceID]*rate.Limiter),
deviceWriteLimiters: make(map[protocol.DeviceID]*rate.Limiter),
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just call rebuildMap here?

cfg.Subscribe(l)
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}}

l.CommitConfiguration(prev, cfg.RawCopy())
return l
}

func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader {
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN}
// This function sets limiters according to corresponding DeviceConfiguration
func (lim *limiter) setLimits(device config.DeviceConfiguration) bool {
readLimiter := lim.getReadLimiter(device.DeviceID)
writeLimiter := lim.getWriteLimiter(device.DeviceID)

// limiters for this device are created so we can store previous rates for logging
previousReadLimit := readLimiter.Limit()
previousWriteLimit := writeLimiter.Limit()
currentReadLimit := rate.Limit(device.MaxRecvKbps) * 1024
currentWriteLimit := rate.Limit(device.MaxSendKbps) * 1024
if device.MaxSendKbps <= 0 {
currentWriteLimit = rate.Inf
}
if device.MaxRecvKbps <= 0 {
currentReadLimit = rate.Inf
}
if previousWriteLimit == currentWriteLimit && previousReadLimit == currentReadLimit {
return false
}

readLimiter.SetLimit(currentReadLimit)
writeLimiter.SetLimit(currentWriteLimit)

return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this lives on the lim struct, but doesn't actually use it, meaning it should either be a utility function not part of lim, or use lim and reach into lim.device{Write,Read}Limiters directly.


func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN}
// This function handles removing, adding and updating of device limiters.
// Pass pointer to avoid copying. Pointer already points to copy of configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear what the pointer comments are about. The configuration parameters are not pointers.

// so we don't have to worry about modifying real config.
func (lim *limiter) processDevicesConfiguration(from, to config.Configuration) {
seen := make(map[protocol.DeviceID]struct{})

// Mark devices which should not be removed, create new limiters if needed and assign new limiter rate
for _, dev := range to.Devices {
if dev.DeviceID == to.MyID {
// This limiter was created for local device. Should skip this device
continue
}
seen[dev.DeviceID] = struct{}{}

// Nothing about this device has changed. Start processing next device
if limitsChanged := lim.setLimits(dev); limitsChanged {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be if lim.setLimits(dev) {.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable assignment is pointless, you can straight do if setLimits(dev) {

readLimitStr := "is unlimited"
if dev.MaxRecvKbps > 0 {
readLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxRecvKbps)
}
writeLimitStr := "is unlimited"
if dev.MaxSendKbps > 0 {
writeLimitStr = fmt.Sprintf("limit is %d KiB/s", dev.MaxSendKbps)
}

l.Infof("Device %s send rate %s, receive rate %s", dev.DeviceID, readLimitStr, writeLimitStr)
}
}

// Delete remote devices which were removed in new configuration
for _, dev := range from.Devices {
if _, ok := seen[dev.DeviceID]; !ok {
l.Debugf("deviceID: %s should be removed", dev.DeviceID)

delete(lim.deviceWriteLimiters, dev.DeviceID)
delete(lim.deviceReadLimiters, dev.DeviceID)
}
}

l.Debugln("Processing of device limiters map finished")
}

func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
return nil
}

func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
// to ensure atomic update of configuration
lim.mu.Lock()
defer lim.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? The map mutex should be protection enough, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess when many routines try to save configs at the same time it could cause some trouble when handling device limiters. I.e. first we remove device X then add the same device X and then update of device X rates. Routines could get scheduled so that remove stops right before the loop where deleting is done. Then add would process the device and next config would update it's rates. After removing is resumed we're left with no limiter for X despite it was set it in the most recent config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. To me it seems redundant to have two locks protecting the limiter. Couldn't you just have a single lock that you acquire during CommitConfiguration and newLimitedReader/Writer (and then obviously not acquiring it in getReadLimiter)?


// Delete, add or update limiters for devices
lim.processDevicesConfiguration(from, to)

if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan {
Expand All @@ -58,7 +135,6 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {

// The rate variables are in KiB/s in the config (despite the camel casing
// of the name). We multiply by 1024 to get bytes/s.

if to.Options.MaxRecvKbps <= 0 {
lim.read.SetLimit(rate.Inf)
} else {
Expand All @@ -81,7 +157,7 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
if to.Options.MaxRecvKbps > 0 {
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps)
}
l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr)

if to.Options.LimitBandwidthInLan {
l.Infoln("Rate limits apply to LAN connections")
Expand All @@ -97,52 +173,71 @@ func (lim *limiter) String() string {
return "connections.limiter"
}

func (lim *limiter) newLimitedReader(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
lim.mu.Lock()
defer lim.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no point using here defer as the scope is small, as this just prevents potential inlining.

deviceLimiter := lim.getReadLimiter(remoteID)
return &limitedReader{reader: r, limiter: lim, deviceLimiter: deviceLimiter, isLAN: isLAN}
}

func (lim *limiter) newLimitedWriter(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
lim.mu.Lock()
defer lim.mu.Unlock()
deviceLimiter := lim.getWriteLimiter(remoteID)
return &limitedWriter{writer: w, limiter: lim, deviceLimiter: deviceLimiter, isLAN: isLAN}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that processblabla does locking outside the function, and this does inside, feels wrong.

Copy link
Member

@imsodin imsodin Feb 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AudriusButkevicius We are starting to contradict each other once again x-): My reasoning to suggest this was that the locking now happens in CommitConfiguration and in these newLimited... functions, which are all "top-level", i.e. called "externally" (not all outside of the package, but outside of the limiter code). Also locking the entirety of CommitConfiguration was considered necessary to disallow concurrent change and thus with a single lock the locking cannot happen on get...Limiter anymore (single lock is not a requirement obviously, I just consider it cleaner).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, but why can't we lock lim.mu before calling the newblabla twice in the connection service, this way we lock the same way in both places. Also a function that needs locking before called, we should add the locked word into the name like we do in the model.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see.
@qepasa So locking should happen here instead and a whole bunch of functions needs the Locked suffix: setLimits, new..., get..., processD....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge these two two line functions into the callee to reduce the mental stack depth.

}

// limitedReader is a rate limited io.Reader
type limitedReader struct {
reader io.Reader
limiter *limiter
isLAN bool
reader io.Reader
limiter *limiter
deviceLimiter *rate.Limiter
isLAN bool
}

func (r *limitedReader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf)
if !r.isLAN || r.limiter.limitsLAN.get() {
take(r.limiter.read, n)
take(r.limiter.read, r.deviceLimiter, n)
}
return n, err
}

// limitedWriter is a rate limited io.Writer
type limitedWriter struct {
writer io.Writer
limiter *limiter
isLAN bool
writer io.Writer
limiter *limiter
deviceLimiter *rate.Limiter
isLAN bool
}

func (w *limitedWriter) Write(buf []byte) (int, error) {
if !w.isLAN || w.limiter.limitsLAN.get() {
take(w.limiter.write, len(buf))
take(w.limiter.write, w.deviceLimiter, len(buf))
}
return w.writer.Write(buf)
}

// take is a utility function to consume tokens from a rate.Limiter. No call
// to WaitN can be larger than the limiter burst size so we split it up into
// take is a utility function to consume tokens from a overall rate.Limiter and deviceLimiter.
// No call to WaitN can be larger than the limiter burst size so we split it up into
// several calls when necessary.
func take(l *rate.Limiter, tokens int) {
func take(l, deviceLimiter *rate.Limiter, tokens int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the l could do with a better name now, and the doc string should be updated.

if tokens < limiterBurstSize {
// This is the by far more common case so we get it out of the way
// early.
deviceLimiter.WaitN(context.TODO(), tokens)
l.WaitN(context.TODO(), tokens)
return
}

for tokens > 0 {
// Consume limiterBurstSize tokens at a time until we're done.
if tokens > limiterBurstSize {
deviceLimiter.WaitN(context.TODO(), limiterBurstSize)
l.WaitN(context.TODO(), limiterBurstSize)
tokens -= limiterBurstSize
} else {
deviceLimiter.WaitN(context.TODO(), tokens)
l.WaitN(context.TODO(), tokens)
tokens = 0
}
Expand All @@ -162,3 +257,26 @@ func (b *atomicBool) set(v bool) {
func (b *atomicBool) get() bool {
return atomic.LoadInt32((*int32)(b)) != 0
}

// Utility functions for atomic operations on device limiters map
func (lim *limiter) getWriteLimiter(deviceID protocol.DeviceID) *rate.Limiter {
limiter, ok := lim.deviceWriteLimiters[deviceID]

if !ok {
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
lim.deviceWriteLimiters[deviceID] = limiter
}

return limiter
}

func (lim *limiter) getReadLimiter(deviceID protocol.DeviceID) *rate.Limiter {
limiter, ok := lim.deviceReadLimiters[deviceID]

if !ok {
limiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
lim.deviceReadLimiters[deviceID] = limiter
}

return limiter
}