Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/connections, lib/config: Bandwidth throttling per remote device (fixes #4516) #4603

Merged
merged 29 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bf78111
lib/connections: Bandwidth throttling per remote device (fixes #4516)
qepasa Dec 17, 2017
7ca0546
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 17, 2017
e486481
device limiters are now working together with global limiter
qepasa Dec 17, 2017
79f96e5
go fmt changes
qepasa Dec 17, 2017
9b14e01
fixed panic when removing device
qepasa Dec 17, 2017
c1665b3
switched some logging to debug
qepasa Dec 17, 2017
d175118
changed map to sync.Map
qepasa Dec 18, 2017
1070dd6
gofmt
qepasa Dec 18, 2017
b52b326
updated processing of device configurations, changed sync.Map to map …
qepasa Dec 27, 2017
ae58320
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Dec 27, 2017
b6d723d
adjustments after code review
qepasa Jan 8, 2018
a33855f
go fmt changes
qepasa Jan 8, 2018
8d637d6
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 8, 2018
5157154
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Jan 13, 2018
cc63da2
adjustments after code review
qepasa Jan 14, 2018
36368f1
fixed limits change check when rate is unlimited
qepasa Jan 14, 2018
2ce47ad
go fmt
qepasa Jan 14, 2018
f6a92d5
removed unnecessary field from limiter, updated take method comment
qepasa Jan 15, 2018
353983f
Further code review adjustments
qepasa Jan 29, 2018
6967960
clean up rate conversions and remove extra mutex
qepasa Feb 6, 2018
0298b9f
code review adjustments
qepasa Feb 7, 2018
628ae2b
code review adjustments
qepasa Feb 10, 2018
56b471f
go fmt
qepasa Feb 10, 2018
da98cfe
code review adjustments
qepasa Feb 13, 2018
a38f5ea
Merge remote-tracking branch 'upstream/master' into device-throttling
qepasa Feb 13, 2018
e160d1d
removed unnecessary debug message
qepasa Feb 13, 2018
c3facc1
go fmt
qepasa Feb 13, 2018
d03b6ff
defer removed
qepasa Feb 13, 2018
291a198
refactor newLimiter*
qepasa Feb 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/config/deviceconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type DeviceConfiguration struct {
Paused bool `xml:"paused" json:"paused"`
AllowedNetworks []string `xml:"allowedNetwork,omitempty" json:"allowedNetworks"`
AutoAcceptFolders bool `xml:"autoAcceptFolders" json:"autoAcceptFolders"`
MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"`
MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"`
}

func NewDeviceConfiguration(id protocol.DeviceID, name string) DeviceConfiguration {
Expand Down
180 changes: 155 additions & 25 deletions lib/connections/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,126 @@ import (
"sync/atomic"

"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
"golang.org/x/net/context"
"golang.org/x/time/rate"
"sync"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be our lib/sync

)

// limiter manages a read and write rate limit, reacting to config changes
// as appropriate.
type limiter struct {
write *rate.Limiter
read *rate.Limiter
limitsLAN atomicBool
write *rate.Limiter
read *rate.Limiter
limitsLAN atomicBool
deviceReadLimiters *sync.Map
deviceWriteLimiters *sync.Map
Copy link
Member

@calmh calmh Dec 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync.Map is Go 1.9 and we're currently targeting 1.8. This is the build failure reported by the tests.

Although we could bump our requirements I think there should be little enough churn on these that we can actually use regular maps and locking...

myID protocol.DeviceID
mu *sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sync.Mutex, no need for a pointer (regardless of which sync package)

}

const limiterBurstSize = 4 * 128 << 10

func newLimiter(cfg *config.Wrapper) *limiter {
func newLimiter(deviceID protocol.DeviceID, cfg *config.Wrapper) *limiter {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty

l := &limiter{
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
write: rate.NewLimiter(rate.Inf, limiterBurstSize),
read: rate.NewLimiter(rate.Inf, limiterBurstSize),
myID: deviceID,
mu: &sync.Mutex{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust here to nothing at all (standard lib sync) or sync.NewMutex() (our sync)

deviceReadLimiters: new(sync.Map),
deviceWriteLimiters: new(sync.Map),
}

// Get initial device configuration
devices := cfg.RawCopy().Devices
for _, value := range devices {
value.MaxRecvKbps = -1
value.MaxSendKbps = -1
}
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}, Devices: devices}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the entire above block isn't necessary, as CommitConfiguration will add all missing devices anyway, so https://github.com/syncthing/syncthing/pull/4603/files#diff-da45d4f40dfa16a9aae760dd650fe79aL35 should be sufficient.


// Keep read/write limiters for every connected device
l.rebuildMap(prev)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just call rebuildMap here?

cfg.Subscribe(l)
prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}}
l.CommitConfiguration(prev, cfg.RawCopy())
return l
}

func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader {
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN}
// Create new maps with new limiters
func (lim *limiter) rebuildMap(to config.Configuration) {
deviceWriteLimiters := new(sync.Map)
deviceReadLimiters := new(sync.Map)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noop space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty

// copy *limiter in case remote device is still connected, when remote device is added we create new read/write limiters
for _, v := range to.Devices {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "v" stand for? We usually use "dev".

if readLimiter, ok := lim.deviceReadLimiters.Load(v.DeviceID); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are two rebuildMap calls in parallel due to frequent config save or something like that, this might be halfway through a swap too.

deviceReadLimiters.Store(v.DeviceID, readLimiter)
} else {
deviceReadLimiters.Store(v.DeviceID, rate.NewLimiter(rate.Inf, limiterBurstSize))
}

if writeLimiter, ok := lim.deviceWriteLimiters.Load(v.DeviceID); ok {
deviceWriteLimiters.Store(v.DeviceID, writeLimiter)
} else {
deviceWriteLimiters.Store(v.DeviceID, rate.NewLimiter(rate.Inf, limiterBurstSize))
}
}

// assign new maps
lim.mu.Lock()
defer lim.mu.Unlock()
lim.deviceWriteLimiters = deviceWriteLimiters
lim.deviceReadLimiters = deviceReadLimiters

l.Debugln("Rebuild of device limiters map finished")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better for rebuild to happen in place, this way getting rid of the lock that we have to get on every read and write.

}

func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN}
// Compare read/write limits in configurations
func (lim *limiter) checkDeviceLimits(from, to config.Configuration) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, doesn't need to live on lim.
Also the function name doesn't explain well what this does. Should probably be called deviceLimitsChanged() and invert what it returns.

for i := range from.Devices {
if from.Devices[i].DeviceID != to.Devices[i].DeviceID {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should blow up if a device is removed, as len(to) < len(from).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably write a test case for this logic.
Test case for adding
Test case for removing
Test case for adding and removing in the same config commit cycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit-pick, but we don't really need to rely on the device order here. It's possible that rates aren't changed, just the order gets changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be some sort of cfg.DeviceMap() or something like that utility function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be moved from model to config: https://github.com/syncthing/syncthing/blob/master/lib/model/model.go#L2569
That also makes it easier to remove limiters, just treat it like folders in models CommitConfiguration.

// Something has changed in device configuration
lim.rebuildMap(to)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, this function as it states should only do checking, and not rebuilding (as a side-effect), and rebuilding and other cruft should happen in the CommitConfiguration.

return false
}
// Read/write limits were changed for this device
if from.Devices[i].MaxSendKbps != to.Devices[i].MaxSendKbps || from.Devices[i].MaxRecvKbps != to.Devices[i].MaxRecvKbps {
return false
}
}
return true
}

func (lim *limiter) newReadLimiter(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
return &limitedReader{reader: r, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

func (lim *limiter) newWriteLimiter(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN, remoteID: remoteID}
}

func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
return nil
}

func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
if len(from.Devices) == len(to.Devices) &&
from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan {
from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan &&
lim.checkDeviceLimits(from, to) {
return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this global check is not very useful as global limits are reset even if just a device changed and whether device change is checked again later on. So I would either not check at all and just reset everything or check the different parameters separately.


// A device has been added or removed
if len(from.Devices) != len(to.Devices) {
lim.rebuildMap(to)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this is unnecessery if you handle it properly in checkDeviceLimits, or stop it from having side-effects.


// The rate variables are in KiB/s in the config (despite the camel casing
// of the name). We multiply by 1024 to get bytes/s.

if to.Options.MaxRecvKbps <= 0 {
lim.read.SetLimit(rate.Inf)
} else {
Expand All @@ -73,22 +146,55 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {

lim.limitsLAN.set(to.Options.LimitBandwidthInLan)

// Set limits for devices
for _, v := range to.Devices {
if v.DeviceID == lim.myID {
// This limiter was created for local device. Should skip this device
continue
}

readLimiter, _ := lim.deviceReadLimiters.Load(v.DeviceID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not throw away the ok, in case the value is not there.
Also, you are not locking here, so you might be accessing deviceReadLimiters as the map is being swapped out.

This is also a reason why rebuildMap should potentially do it without swapping the maps.

if v.MaxRecvKbps <= 0 {
readLimiter.(*rate.Limiter).SetLimit(rate.Inf)
} else {
readLimiter.(*rate.Limiter).SetLimit(1024 * rate.Limit(v.MaxRecvKbps))
}

writeLimiter, _ := lim.deviceWriteLimiters.Load(v.DeviceID)
if v.MaxSendKbps <= 0 {
writeLimiter.(*rate.Limiter).SetLimit(rate.Inf)
} else {
writeLimiter.(*rate.Limiter).SetLimit(1024 * rate.Limit(v.MaxSendKbps))
}

sendLimitStr := "is unlimited"
recvLimitStr := "is unlimited"
if v.MaxSendKbps > 0 {
sendLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxSendKbps)
}

if v.MaxRecvKbps > 0 {
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", v.MaxRecvKbps)
}
l.Infof("Device %s: send rate %s, receive rate %s", v.DeviceID, sendLimitStr, recvLimitStr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always print stuff, even if someones rate hasn't changed since before.
It will further always mess around with the rate limiters, regardless if the value changed or not.

}

sendLimitStr := "is unlimited"
recvLimitStr := "is unlimited"
if to.Options.MaxSendKbps > 0 {
sendLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxSendKbps)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separation doesn't seem helpful here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

if to.Options.MaxRecvKbps > 0 {
recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps)
}
l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr)

if to.Options.LimitBandwidthInLan {
l.Infoln("Rate limits apply to LAN connections")
} else {
l.Infoln("Rate limits do not apply to LAN connections")
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually separate return with an empty line, so I'd not remove it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

return true
}

Expand All @@ -99,50 +205,74 @@ func (lim *limiter) String() string {

// limitedReader is a rate limited io.Reader
type limitedReader struct {
reader io.Reader
limiter *limiter
isLAN bool
reader io.Reader
limiter *limiter
isLAN bool
remoteID protocol.DeviceID
}

func (r *limitedReader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf)
if !r.isLAN || r.limiter.limitsLAN.get() {
take(r.limiter.read, n)

// in case rebuildMap was called
r.limiter.mu.Lock()
deviceLimiter, ok := r.limiter.deviceReadLimiters.Load(r.remoteID)
r.limiter.mu.Unlock()
if !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, if !ok, I'd panic (or do nothing), as it should clearly be there.
Now it's like a bandaid for a problem we can't explain.

I'd do a if deviceLimiter != nil check in take

l.Debugln("deviceReadLimiter was not in the map")
deviceLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens after removing remote device. I'm not sure about limitedWriter but I added it just to be safe.

}
take(r.limiter.read, deviceLimiter.(*rate.Limiter), n)
}
return n, err
}

// limitedWriter is a rate limited io.Writer
type limitedWriter struct {
writer io.Writer
limiter *limiter
isLAN bool
writer io.Writer
limiter *limiter
isLAN bool
remoteID protocol.DeviceID
}

func (w *limitedWriter) Write(buf []byte) (int, error) {
if !w.isLAN || w.limiter.limitsLAN.get() {
take(w.limiter.write, len(buf))

// in case rebuildMap was called
w.limiter.mu.Lock()
deviceLimiter, ok := w.limiter.deviceWriteLimiters.Load(w.remoteID)
w.limiter.mu.Unlock()
if !ok {
l.Debugln("deviceWriteLimiter was not in the map")
deviceLimiter = rate.NewLimiter(rate.Inf, limiterBurstSize)
}
take(w.limiter.write, deviceLimiter.(*rate.Limiter), len(buf))
}
return w.writer.Write(buf)
}

// take is a utility function to consume tokens from a rate.Limiter. No call
// to WaitN can be larger than the limiter burst size so we split it up into
// several calls when necessary.
func take(l *rate.Limiter, tokens int) {
func take(l, deviceLimiter *rate.Limiter, tokens int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the l could do with a better name now, and the doc string should be updated.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

if tokens < limiterBurstSize {
// This is the by far more common case so we get it out of the way
// early.
deviceLimiter.WaitN(context.TODO(), tokens)
l.WaitN(context.TODO(), tokens)
return
}

for tokens > 0 {
// Consume limiterBurstSize tokens at a time until we're done.
if tokens > limiterBurstSize {
deviceLimiter.WaitN(context.TODO(), limiterBurstSize)
l.WaitN(context.TODO(), limiterBurstSize)
tokens -= limiterBurstSize
} else {
deviceLimiter.WaitN(context.TODO(), tokens)
l.WaitN(context.TODO(), tokens)
tokens = 0
}
Expand Down
6 changes: 3 additions & 3 deletions lib/connections/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
conns: make(chan internalConn),
bepProtocolName: bepProtocolName,
tlsDefaultCommonName: tlsDefaultCommonName,
limiter: newLimiter(cfg),
limiter: newLimiter(myID, cfg),
natService: nat.NewService(myID, cfg),

listenersMut: sync.NewRWMutex(),
Expand Down Expand Up @@ -265,8 +265,8 @@ next:
// keep up with config changes to the rate and whether or not LAN
// connections are limited.
isLAN := s.isLAN(c.RemoteAddr())
wr := s.limiter.newWriteLimiter(c, isLAN)
rd := s.limiter.newReadLimiter(c, isLAN)
wr := s.limiter.newWriteLimiter(remoteID, c, isLAN)
rd := s.limiter.newReadLimiter(remoteID, c, isLAN)

name := fmt.Sprintf("%s-%s (%s)", c.LocalAddr(), c.RemoteAddr(), c.Type())
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression)
Expand Down