Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: Use multiple QUIC streams (fixes #8879) #8880

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/syncthing/perfstats_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"syscall"
"time"

"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/netutil"
)

func startPerfStats() {
Expand Down Expand Up @@ -48,7 +48,8 @@ func savePerfStats(file string) {
cpuUsagePercent := 100 * float64(usageDiff) / float64(timeDiff)
prevTime = curTime
prevUsage = curUsage
in, out := protocol.TotalInOut()
cnt := netutil.RootCounter()
in, out := cnt.BytesRead(), cnt.BytesWritten()
var inRate, outRate float64
if timeDiff > 0 {
inRate = float64(in-prevIn) / (float64(timeDiff) / 1e9) // bytes per second
Expand Down
115 changes: 37 additions & 78 deletions lib/connections/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package connections
import (
"context"
"fmt"
"io"
"math"
"sync/atomic"

"github.com/syncthing/syncthing/lib/config"
Expand All @@ -34,6 +34,7 @@ type waiter interface {
// This is the rate limiting operation
WaitN(ctx context.Context, n int) error
Limit() rate.Limit
Burst() int
}

const (
Expand Down Expand Up @@ -177,33 +178,27 @@ func (*limiter) String() string {
return "connections.limiter"
}

func (lim *limiter) getLimiters(remoteID protocol.DeviceID, rw io.ReadWriter, isLAN bool) (io.Reader, io.Writer) {
func (lim *limiter) getLimiters(remoteID protocol.DeviceID, isLAN bool) (waiterHolder, waiterHolder) {
lim.mu.Lock()
wr := lim.newLimitedWriterLocked(remoteID, rw, isLAN)
rd := lim.newLimitedReaderLocked(remoteID, rw, isLAN)
wr := lim.newWriteLimiterLocked(remoteID, isLAN)
rd := lim.newReadLimiterLocked(remoteID, isLAN)
lim.mu.Unlock()
return rd, wr
}

func (lim *limiter) newLimitedReaderLocked(remoteID protocol.DeviceID, r io.Reader, isLAN bool) io.Reader {
return &limitedReader{
reader: r,
waiterHolder: waiterHolder{
waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read},
limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
},
func (lim *limiter) newReadLimiterLocked(remoteID protocol.DeviceID, isLAN bool) waiterHolder {
return waiterHolder{
waiter: totalWaiter{lim.getReadLimiterLocked(remoteID), lim.read},
limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
}
}

func (lim *limiter) newLimitedWriterLocked(remoteID protocol.DeviceID, w io.Writer, isLAN bool) io.Writer {
return &limitedWriter{
writer: w,
waiterHolder: waiterHolder{
waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write},
limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
},
func (lim *limiter) newWriteLimiterLocked(remoteID protocol.DeviceID, isLAN bool) waiterHolder {
return waiterHolder{
waiter: totalWaiter{lim.getWriteLimiterLocked(remoteID), lim.write},
limitsLAN: &lim.limitsLAN,
isLAN: isLAN,
}
}

Expand All @@ -224,60 +219,6 @@ func getRateLimiter(m map[protocol.DeviceID]*rate.Limiter, deviceID protocol.Dev
return limiter
}

// limitedReader is a rate limited io.Reader
type limitedReader struct {
reader io.Reader
waiterHolder
}

func (r *limitedReader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf)
if !r.unlimited() {
r.take(n)
}
return n, err
}

// limitedWriter is a rate limited io.Writer
type limitedWriter struct {
writer io.Writer
waiterHolder
}

func (w *limitedWriter) Write(buf []byte) (int, error) {
if w.unlimited() {
return w.writer.Write(buf)
}

// This does (potentially) multiple smaller writes in order to be less
// bursty with large writes and slow rates. At the same time we don't
// want to do hilarious amounts of tiny writes when the rate is high, so
// try to be a bit adaptable. We range from the minimum write size of 1
// KiB up to the limiter burst size, aiming for about a write every
// 10ms.
singleWriteSize := int(w.waiter.Limit() / 100) // 10ms worth of data
singleWriteSize = ((singleWriteSize / 1024) + 1) * 1024 // round up to the next kibibyte
if singleWriteSize > limiterBurstSize {
singleWriteSize = limiterBurstSize
}

written := 0
for written < len(buf) {
toWrite := singleWriteSize
if toWrite > len(buf)-written {
toWrite = len(buf) - written
}
w.take(toWrite)
n, err := w.writer.Write(buf[written : written+toWrite])
written += n
if err != nil {
return written, err
}
}

return written, nil
}

// waiterHolder is the common functionality around having and evaluating a
// waiter, valid for both writers and readers
type waiterHolder struct {
Expand All @@ -286,17 +227,17 @@ type waiterHolder struct {
isLAN bool
}

// unlimited returns true if the waiter is not limiting the rate
func (w waiterHolder) unlimited() bool {
// Unlimited returns true if the waiter is not limiting the rate
func (w waiterHolder) Unlimited() bool {
if w.isLAN && !w.limitsLAN.Load() {
return true
}
return w.waiter.Limit() == rate.Inf
}

// take is a utility function to consume tokens, because no call to WaitN
// Take is a utility function to consume tokens, because no call to WaitN
// must be larger than the limiter burst size or it will hang.
func (w waiterHolder) take(tokens int) {
func (w waiterHolder) Take(tokens int) {
// For writes we already split the buffer into smaller operations so those
// will always end up in the fast path below. For reads, however, we don't
// control the size of the incoming buffer and don't split the calls
Expand All @@ -322,6 +263,14 @@ func (w waiterHolder) take(tokens int) {
}
}

func (w waiterHolder) Limit() int {
return int(w.waiter.Limit())
}

func (w waiterHolder) Burst() int {
return w.waiter.Burst()
}

// totalWaiter waits for all of the waiters
type totalWaiter []waiter

Expand All @@ -345,3 +294,13 @@ func (tw totalWaiter) Limit() rate.Limit {
}
return min
}

func (tw totalWaiter) Burst() int {
min := math.MaxInt
for _, w := range tw {
if l := w.Burst(); l < min {
min = l
}
}
return min
}
65 changes: 28 additions & 37 deletions lib/connections/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (

"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/netutil"
"github.com/syncthing/syncthing/lib/protocol"
"golang.org/x/time/rate"
)

var device1, device2, device3, device4 protocol.DeviceID
var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
var (
device1, device2, device3, device4 protocol.DeviceID
dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
)

func init() {
device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
Expand Down Expand Up @@ -231,14 +234,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// regardless of the rate.
dst := new(bytes.Buffer)
cw := &countingWriter{w: dst}
lw := &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
},
}
lw := netutil.NewLimitedWriter(cw, waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
})
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
Expand All @@ -260,14 +260,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// count the write calls. Now we make sure the fast path is used.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomic.Bool),
isLAN: true, // disables limiting
},
}
lw = netutil.NewLimitedWriter(cw, waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomic.Bool),
isLAN: true, // disables limiting
})
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
Expand All @@ -284,14 +281,11 @@ func TestLimitedWriterWrite(t *testing.T) {
// rate, with multiple unlimited raters even (global and per-device).
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
},
}
lw = netutil.NewLimitedWriter(cw, waiterHolder{
waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
})
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
Expand All @@ -308,18 +302,15 @@ func TestLimitedWriterWrite(t *testing.T) {
// is a combo of limited and unlimited writers.
dst = new(bytes.Buffer)
cw = &countingWriter{w: dst}
lw = &limitedWriter{
writer: cw,
waiterHolder: waiterHolder{
waiter: totalWaiter{
rate.NewLimiter(rate.Inf, limiterBurstSize),
rate.NewLimiter(rate.Limit(42), limiterBurstSize),
rate.NewLimiter(rate.Inf, limiterBurstSize),
},
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
lw = netutil.NewLimitedWriter(cw, waiterHolder{
waiter: totalWaiter{
rate.NewLimiter(rate.Inf, limiterBurstSize),
rate.NewLimiter(rate.Limit(42), limiterBurstSize),
rate.NewLimiter(rate.Inf, limiterBurstSize),
},
}
limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting
})
if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion lib/connections/quic_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL
if isLocal {
priority = d.lanPriority
}
return newInternalConn(&quicTlsConn{session, stream, createdConn}, connTypeQUICClient, isLocal, priority), nil
qtlsc := &quicTlsConn{
Connection: session,
Stream: stream,
createdConn: createdConn,
supportsSubstreams: false, // set later based on handshake
}
return newInternalConn(qtlsc, connTypeQUICClient, isLocal, priority), nil
}

type quicDialerFactory struct{}
Expand Down
28 changes: 20 additions & 8 deletions lib/connections/quic_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,21 @@ func (t *quicListener) serve(ctx context.Context) error {
}
defer func() { _ = udpConn.Close() }()

svc, conn := stun.New(t.cfg, t, udpConn)
defer conn.Close()

go svc.Serve(ctx)
// Wrap the UDP connection in a STUN service if STUN is enabled.
var pktConn net.PacketConn
if t.cfg.Options().IsStunDisabled() {
pktConn = udpConn
} else {
svc, stunConn := stun.New(t.cfg, t, udpConn)
defer stunConn.Close()
go svc.Serve(ctx)
pktConn = stunConn
}

t.registry.Register(t.uri.Scheme, conn)
defer t.registry.Unregister(t.uri.Scheme, conn)
t.registry.Register(t.uri.Scheme, pktConn)
defer t.registry.Unregister(t.uri.Scheme, pktConn)

listener, err := quic.Listen(conn, t.tlsCfg, quicConfig)
listener, err := quic.Listen(pktConn, t.tlsCfg, quicConfig)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
return err
Expand Down Expand Up @@ -174,7 +180,13 @@ func (t *quicListener) serve(ctx context.Context) error {
if isLocal {
priority = t.cfg.Options().ConnectionPriorityQUICLAN
}
t.conns <- newInternalConn(&quicTlsConn{session, stream, nil}, connTypeQUICServer, isLocal, priority)
qtlsc := &quicTlsConn{
Connection: session,
Stream: stream,
createdConn: nil,
supportsSubstreams: false, // set later based on handshake
}
t.conns <- newInternalConn(qtlsc, connTypeQUICServer, isLocal, priority)
}
}

Expand Down
Loading