diff --git a/captain/bootstrap.go b/captain/bootstrap.go index 876f92c..2182d0b 100644 --- a/captain/bootstrap.go +++ b/captain/bootstrap.go @@ -37,7 +37,7 @@ func init() { // prepBootstrapHubFlag checks the bootstrap-hub argument if it is valid. func prepBootstrapHubFlag() error { if bootstrapHubFlag != "" { - _, err := hub.ParseBootstrapHub(bootstrapHubFlag, conf.MainMapName) + _, _, _, err := hub.ParseBootstrapHub(bootstrapHubFlag) return err } return nil diff --git a/cmds/hub/pack b/cmds/hub/pack index 220819e..73c2027 100755 --- a/cmds/hub/pack +++ b/cmds/hub/pack @@ -74,18 +74,27 @@ function check_all { GOOS=linux GOARCH=amd64 check GOOS=windows GOARCH=amd64 check GOOS=darwin GOARCH=amd64 check + GOOS=linux GOARCH=arm64 check + GOOS=windows GOARCH=arm64 check + GOOS=darwin GOARCH=arm64 check } function build_all { GOOS=linux GOARCH=amd64 build GOOS=windows GOARCH=amd64 build GOOS=darwin GOARCH=amd64 build + GOOS=linux GOARCH=arm64 build + GOOS=windows GOARCH=arm64 build + GOOS=darwin GOARCH=arm64 build } function reset_all { GOOS=linux GOARCH=amd64 reset GOOS=windows GOARCH=amd64 reset GOOS=darwin GOARCH=amd64 reset + GOOS=linux GOARCH=arm64 reset + GOOS=windows GOARCH=arm64 reset + GOOS=darwin GOARCH=arm64 reset } case $1 in diff --git a/crew/connect.go b/crew/connect.go index b602893..f1a3cff 100644 --- a/crew/connect.go +++ b/crew/connect.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "strings" "sync" "time" @@ -122,27 +123,16 @@ func (t *Tunnel) establish(ctx context.Context) (err error) { case sticksTo.Avoid: log.Tracer(ctx).Tracef("spn/crew: avoiding %s", sticksTo.Pin.Hub) - // Build avoid policy. - avoidPolicy := make([]endpoints.Endpoint, 0, 2) - // Exclude countries of the hub to be avoided. - // This helps to select a destination hub that is more different than say, - // the other hub in the same datacenter as the one to be avoided. - if sticksTo.Pin.LocationV4 != nil && - sticksTo.Pin.LocationV4.Country.ISOCode != "" { - avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{ - Country: sticksTo.Pin.LocationV4.Country.ISOCode, - }) - log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv4 location", sticksTo.Pin.LocationV4.Country.ISOCode) - } - if sticksTo.Pin.LocationV6 != nil && - sticksTo.Pin.LocationV6.Country.ISOCode != "" { - avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{ - Country: sticksTo.Pin.LocationV6.Country.ISOCode, - }) - log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv6 location", sticksTo.Pin.LocationV6.Country.ISOCode) + // Avoid this Hub. + // TODO: Remember more than one hub to avoid. + avoidPolicy := []endpoints.Endpoint{ + &endpoints.EndpointDomain{ + OriginalValue: sticksTo.Pin.Hub.ID, + Domain: strings.ToLower(sticksTo.Pin.Hub.ID) + ".", + }, } - // Append to policies + // Append to policies. t.connInfo.TunnelOpts.HubPolicies = append(t.connInfo.TunnelOpts.HubPolicies, avoidPolicy) default: @@ -314,7 +304,7 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal return nil, nil, tErr.Wrap("failed to authenticate to %s: %w", check.pin.Hub, tErr) } - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): // Mark as failing for just a minute, until server load may be less. check.pin.MarkAsFailingFor(1 * time.Minute) log.Warningf("spn/crew: auth to %s timed out", check.pin.Hub) @@ -338,13 +328,14 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal !tErr.Is(terminal.ErrUnknownOperationType) { // TODO: remove workaround until all servers have this upgrade // Mark as failing long enough to expire connections and session and shutdown connections. // TODO: Should we forcibly disconnect instead? + // TODO: This might also be triggered if a relay fails and ends the operation. check.pin.MarkAsFailingFor(7 * time.Minute) log.Warningf("spn/crew: failed to check reachability of %s: %s", check.pin.Hub, tErr) return nil, nil, tErr.Wrap("failed to check reachability of %s: %w", check.pin.Hub, tErr) } - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): // Mark as failing for just a minute, until server load may be less. check.pin.MarkAsFailingFor(1 * time.Minute) log.Warningf("spn/crew: reachability check to %s timed out", check.pin.Hub) diff --git a/crew/op_connect.go b/crew/op_connect.go index 798fba2..dc4253e 100644 --- a/crew/op_connect.go +++ b/crew/op_connect.go @@ -403,7 +403,8 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro // If the op was ended locally, send all data before closing. // If the op was ended remotely, don't bother sending remaining data. if !err.IsExternal() { - op.dfq.Flush() + // Flushing could mean sending a full buffer of 50000 packets. + op.dfq.Flush(5 * time.Minute) } // If the op was ended remotely, write all remaining received data. diff --git a/docks/crane.go b/docks/crane.go index 214ee23..f083434 100644 --- a/docks/crane.go +++ b/docks/crane.go @@ -363,8 +363,6 @@ func (crane *Crane) AbandonTerminal(id uint32, err *terminal.Error) { } func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Duration) *terminal.Error { - msg.Unit.Pause() - select { case crane.controllerMsgs <- msg: return nil @@ -376,8 +374,6 @@ func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Dur // Send is used by others to send a message through the crane. func (crane *Crane) Send(msg *terminal.Msg, timeout time.Duration) *terminal.Error { - msg.Unit.Pause() - select { case crane.terminalMsgs <- msg: return nil @@ -456,7 +452,11 @@ func (crane *Crane) unloader(workerCtx context.Context) error { crane.Stop(terminal.ErrMalformedData.With("failed to get container length: %w", err)) return nil } - if containerLen > maxUnloadSize { + switch { + case containerLen <= 0: + crane.Stop(terminal.ErrMalformedData.With("received empty container with length %d", containerLen)) + return nil + case containerLen > maxUnloadSize: crane.Stop(terminal.ErrMalformedData.With("received oversized container with length %d", containerLen)) return nil } diff --git a/docks/op_expand.go b/docks/op_expand.go index eb27bef..b5d9b15 100644 --- a/docks/op_expand.go +++ b/docks/op_expand.go @@ -86,31 +86,27 @@ func (t *ExpansionRelayTerminal) Ctx() context.Context { // Deliver delivers a message to the relay operation. func (op *ExpandOp) Deliver(msg *terminal.Msg) *terminal.Error { - // Pause unit before handing away. - msg.Unit.Pause() - return op.deliverProxy(msg) } // Deliver delivers a message to the relay terminal. func (t *ExpansionRelayTerminal) Deliver(msg *terminal.Msg) *terminal.Error { - // Pause unit before handing away. - msg.Unit.Pause() - return t.deliverProxy(msg) } // Flush writes all data in the queues. func (op *ExpandOp) Flush() { if op.flowControl != nil { - op.flowControl.Flush() + // Flushing could mean sending a full buffer of 50000 packets. + op.flowControl.Flush(5 * time.Minute) } } // Flush writes all data in the queues. func (t *ExpansionRelayTerminal) Flush() { if t.flowControl != nil { - t.flowControl.Flush() + // Flushing could mean sending a full buffer of 50000 packets. + t.flowControl.Flush(5 * time.Minute) } } @@ -281,14 +277,13 @@ func (op *ExpandOp) forwardHandler(_ context.Context) error { // Debugging: // log.Debugf("spn/testing: forwarding at %s: %s", op.FmtID(), spew.Sdump(c.CompileData())) - // Count relayed data for metrics. - atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length())) - // Wait for processing slot. msg.Unit.WaitForSlot() + // Count relayed data for metrics. + atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length())) + // Receive data from the origin and forward it to the relay. - msg.Unit.Pause() op.relayTerminal.sendProxy(msg, 1*time.Minute) case <-op.ctx.Done(): @@ -311,7 +306,6 @@ func (op *ExpandOp) backwardHandler(_ context.Context) error { atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length())) // Receive data from the relay and forward it to the origin. - msg.Unit.Pause() op.sendProxy(msg, 1*time.Minute) case <-op.ctx.Done(): diff --git a/go.mod b/go.mod index b982ca4..ee48c7e 100644 --- a/go.mod +++ b/go.mod @@ -11,11 +11,11 @@ require ( github.com/rot256/pblind v0.0.0-20211117203330-22455f90b565 github.com/safing/jess v0.3.1 github.com/safing/portbase v0.16.4 - github.com/safing/portmaster v1.0.6 + github.com/safing/portmaster v1.0.7 github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.1 github.com/tevino/abool v1.2.0 - golang.org/x/net v0.7.0 + golang.org/x/net v0.8.0 ) require ( @@ -37,7 +37,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/miekg/dns v1.1.50 // indirect + github.com/miekg/dns v1.1.52 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/oschwald/maxminddb-golang v1.10.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -62,11 +62,11 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.etcd.io/bbolt v1.3.7 // indirect - golang.org/x/crypto v0.6.0 // indirect - golang.org/x/mod v0.8.0 // indirect + golang.org/x/crypto v0.7.0 // indirect + golang.org/x/mod v0.9.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/tools v0.7.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 653b374..56b4080 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,8 @@ github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= @@ -101,8 +101,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= -github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= -github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= +github.com/miekg/dns v1.1.52 h1:Bmlc/qsNNULOe6bpXcUTsuOajd0DzRHwup6D9k1An0c= +github.com/miekg/dns v1.1.52/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -130,10 +130,8 @@ github.com/safing/portbase v0.15.2/go.mod h1:5bHi99fz7Hh/wOsZUOI631WF9ePSHk57c4f github.com/safing/portbase v0.16.2/go.mod h1:mzNCWqPbO7vIYbbK5PElGbudwd2vx4YPNawymL8Aro8= github.com/safing/portbase v0.16.4 h1:3w4Sc0TlSDvUS4Uk5TrsFXf98cwMbCf+BG5Bw6coxNk= github.com/safing/portbase v0.16.4/go.mod h1:mzNCWqPbO7vIYbbK5PElGbudwd2vx4YPNawymL8Aro8= -github.com/safing/portmaster v1.0.5 h1:fXtZqMSozu10kjb9TQtnUv3P3neWhU/gSdEt0MQBN6M= -github.com/safing/portmaster v1.0.5/go.mod h1:ZMiAiYflqcG4pMy8uIu3TQ/urMebxhwgs2wKcR4Pw2g= -github.com/safing/portmaster v1.0.6 h1:iqs4VKDvTcyutAm99WFmNZtq1almoaBIBgLoCItsUkE= -github.com/safing/portmaster v1.0.6/go.mod h1:TQvev+HLWLjSAcxxLdhN83lTUQxAOq40PCn8oi59gMk= +github.com/safing/portmaster v1.0.7 h1:qwil09vzG335/x+omGzFfmohqCF3RFKsnTrKFTYwqFU= +github.com/safing/portmaster v1.0.7/go.mod h1:zWPKyBSQljj2mzOe+KgJZ3JyZ3a/Ceif2D0zghCs8To= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/seehuhn/fortuna v1.0.1 h1:lu9+CHsmR0bZnx5Ay646XvCSRJ8PJTi5UYJwDBX68H0= @@ -167,8 +165,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tannerryan/ring v1.1.2 h1:iXayOjqHQOLzuy9GwSKuG3nhWfzQkldMlQivcgIr7gQ= github.com/tannerryan/ring v1.1.2/go.mod h1:DkELJEjbZhJBtFKR9Xziwj3HKZnb/knRgljNqp65vH4= @@ -208,11 +206,9 @@ github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zalando/go-keyring v0.2.1/go.mod h1:g63M2PPn0w5vjmEbwAX3ib5I+41zdm4esSETOn9Y6Dw= -go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= @@ -222,28 +218,19 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= -golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= -golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= -golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -258,11 +245,8 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -275,10 +259,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210503060354-a79de5458b56/go.mod h1:tfny5GFUkzUvx4ps4ajbZsCe5lw1metzhBm9T3x7oIY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -288,14 +270,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4= -golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/hub/intel.go b/hub/intel.go index 5332868..d434bbc 100644 --- a/hub/intel.go +++ b/hub/intel.go @@ -163,23 +163,23 @@ func (i *Intel) ParseAdvisories() (err error) { } // ParseBootstrapHub parses a bootstrap hub. -func ParseBootstrapHub(bootstrapTransport string, mapName string) (*Hub, error) { +func ParseBootstrapHub(bootstrapTransport string) (t *Transport, hubID string, hubIP net.IP, err error) { // Parse transport and check Hub ID. - t, err := ParseTransport(bootstrapTransport) + t, err = ParseTransport(bootstrapTransport) if err != nil { - return nil, fmt.Errorf("failed to parse transport: %w", err) + return nil, "", nil, fmt.Errorf("failed to parse transport: %w", err) } if t.Option == "" { - return nil, errors.New("missing hub ID in URL fragment") + return nil, "", nil, errors.New("missing hub ID in URL fragment") } if _, err := lhash.FromBase58(t.Option); err != nil { - return nil, fmt.Errorf("hub ID is invalid: %w", err) + return nil, "", nil, fmt.Errorf("hub ID is invalid: %w", err) } // Parse IP address from transport. ip := net.ParseIP(t.Domain) if ip == nil { - return nil, errors.New("invalid IP address (domains are not supported for bootstrapping)") + return nil, "", nil, errors.New("invalid IP address (domains are not supported for bootstrapping)") } // Clean up transport for hub info. @@ -187,23 +187,5 @@ func ParseBootstrapHub(bootstrapTransport string, mapName string) (*Hub, error) t.Domain = "" t.Option = "" - // Create bootstrap hub. - bootstrapHub := &Hub{ - ID: id, - Map: mapName, - Info: &Announcement{ - ID: id, - Transports: []string{t.String()}, - }, - Status: &Status{}, - } - - // Set IP address. - if ip4 := ip.To4(); ip4 != nil { - bootstrapHub.Info.IPv4 = ip4 - } else { - bootstrapHub.Info.IPv6 = ip - } - - return bootstrapHub, nil + return t, id, ip, nil } diff --git a/navigator/measurements.go b/navigator/measurements.go index 1d6a760..0f8cd3f 100644 --- a/navigator/measurements.go +++ b/navigator/measurements.go @@ -13,16 +13,16 @@ import ( // Measurements Configuration. const ( - NavigatorMeasurementTTLDefault = 2 * time.Hour - NavigatorMeasurementTTLByCostBase = 3 * time.Minute - NavigatorMeasurementTTLByCostMin = 2 * time.Hour - NavigatorMeasurementTTLByCostMax = 30 * time.Hour + NavigatorMeasurementTTLDefault = 4 * time.Hour + NavigatorMeasurementTTLByCostBase = 6 * time.Minute + NavigatorMeasurementTTLByCostMin = 4 * time.Hour + NavigatorMeasurementTTLByCostMax = 50 * time.Hour // With a base TTL of 3m, this leads to: - // 20c -> 1h -> raised to 2h. - // 50c -> 2h30m - // 100c -> 5h - // 1000c -> 50h -> capped to 30h. + // 20c -> 2h -> raised to 4h. + // 50c -> 5h + // 100c -> 10h + // 1000c -> 100h -> capped to 50h. ) func (m *Map) measureHubs(ctx context.Context, _ *modules.Task) error { diff --git a/navigator/update.go b/navigator/update.go index 2eb03af..81981c5 100644 --- a/navigator/update.go +++ b/navigator/update.go @@ -2,7 +2,6 @@ package navigator import ( "context" - "errors" "fmt" "path" "time" @@ -14,6 +13,7 @@ import ( "github.com/safing/portbase/database/record" "github.com/safing/portbase/log" "github.com/safing/portbase/modules" + "github.com/safing/portbase/utils" "github.com/safing/portmaster/intel/geoip" "github.com/safing/portmaster/netenv" "github.com/safing/spn/hub" @@ -481,22 +481,51 @@ func (m *Map) addBootstrapHubs(bootstrapTransports []string) error { func (m *Map) addBootstrapHub(bootstrapTransport string) error { // Parse bootstrap hub. - bootstrapHub, err := hub.ParseBootstrapHub(bootstrapTransport, m.Name) + transport, hubID, hubIP, err := hub.ParseBootstrapHub(bootstrapTransport) if err != nil { return fmt.Errorf("invalid bootstrap hub: %w", err) } // Check if hub already exists. - _, err = hub.GetHub(bootstrapHub.Map, bootstrapHub.ID) - if err == nil { - return nil + var h *hub.Hub + pin, ok := m.all[hubID] + if ok { + h = pin.Hub + } else { + h = &hub.Hub{ + ID: hubID, + Map: m.Name, + Info: &hub.Announcement{ + ID: hubID, + }, + Status: &hub.Status{}, + FirstSeen: time.Now(), // Do not garbage collect bootstrap hubs. + } } - if !errors.Is(err, database.ErrNotFound) { - return err + + // Add IP if it does not yet exist. + if hubIP4 := hubIP.To4(); hubIP4 != nil { + if h.Info.IPv4 == nil { + h.Info.IPv4 = hubIP4 + } else if !h.Info.IPv4.Equal(hubIP4) { + return fmt.Errorf("additional bootstrap entry with same ID but mismatching IP address: %s", hubIP) + } + } else { + if h.Info.IPv6 == nil { + h.Info.IPv6 = hubIP + } else if !h.Info.IPv6.Equal(hubIP) { + return fmt.Errorf("additional bootstrap entry with same ID but mismatching IP address: %s", hubIP) + } + } + + // Add transport if it does not yet exist. + t := transport.String() + if !utils.StringInSlice(h.Info.Transports, t) { + h.Info.Transports = append(h.Info.Transports, t) } - // Add to map for bootstrapping. - m.updateHub(bootstrapHub, false, false) - log.Infof("spn/navigator: added bootstrap %s to map %s", bootstrapHub, m.Name) + // Add/update to map for bootstrapping. + m.updateHub(h, false, false) + log.Infof("spn/navigator: added/updated bootstrap %s to map %s", h, m.Name) return nil } diff --git a/patrol/domains.go b/patrol/domains.go index ff14d63..5f125f9 100644 --- a/patrol/domains.go +++ b/patrol/domains.go @@ -17,784 +17,295 @@ func getRandomTestDomain() string { // Use TestCleanDomains to clean a new/updated list. // Treat as a constant. var testDomains = []string{ - "115.com", - "1688.com", - "2gis.com", - "33across.com", - "360yield.com", - "3dmgame.com", - "4399.com", - "51cto.com", - "9gag.com", - "abc.net.au", "about.com", - "aboutads.info", - "academia.edu", - "acs.org", - "actionverb.com", - "adcolony.com", - "addthis.com", "addtoany.com", "adobe.com", - "adobe.io", - "adp.com", - "adsrvr.org", - "airbnb.com", - "akismet.com", - "algolia.net", - "ali213.net", - "alibaba.com", - "alidns.com", - "aliexpress.com", "aliyun.com", - "aliyundrive.com", - "amazon.ca", - "amazon.co.jp", - "amazon.co.uk", - "amazon.com", - "amazon.de", - "amazon.es", - "amazon.fr", - "amazon.in", - "amazon.it", - "amazontrust.com", - "amazonvideo.com", - "americanexpress.com", - "amplitude.com", "ampproject.org", - "amzn.to", - "anchor.fm", - "ancomunn.co.uk", "android.com", - "animeflv.net", - "aol.com", - "apa.org", "apache.org", - "aparat.com", - "apnews.com", - "appcenter.ms", "apple.com", "apple.news", - "appsflyer.com", "appspot.com", - "arcgis.com", - "archive.org", "arnebrachhold.de", - "arstechnica.com", - "arxiv.org", - "asana.com", - "asus.com", - "atlassian.com", - "atlassian.net", - "att.com", - "att.net", - "autodesk.com", "avast.com", - "avira.com", - "avito.st", - "azure.com", - "b-cdn.net", - "baidu.com", - "bandcamp.com", - "bankofamerica.com", - "battle.net", - "bbb.org", "bbc.co.uk", "bbc.com", - "beget.com", - "behance.net", - "berkeley.edu", - "bestbuy.com", - "bilibili.com", - "binance.com", "bing.com", - "biomedcentral.com", - "bit.ly", - "bitly.com", - "bizjournals.com", - "blizzard.com", "blogger.com", "blogspot.com", - "bloomberg.com", - "bmj.com", - "bol.com", - "book118.com", - "booking.com", - "box.com", "branch.io", - "braze.com", - "britannica.com", - "business.site", - "businessinsider.com", - "businesswire.com", - "buzzfeed.com", - "bytedance.net", - "ca.gov", "calendly.com", "cam.ac.uk", - "cambridge.org", - "canada.ca", "canonical.com", "canva.com", - "capitalone.com", - "casalemedia.com", - "cbc.ca", - "cbsnews.com", - "cctv.com", - "cdc.gov", - "change.org", - "chaoxing.com", - "chase.com", - "chaturbate.com", - "checkpoint.com", - "chess.com", - "chinaz.com", "cisco.com", "cloudflare.com", - "cloudflare.net", "cloudns.net", - "cmu.edu", - "cnbc.com", "cnblogs.com", - "cnet.com", - "cnki.net", "cnn.com", - "coinbase.com", - "coinmarketcap.com", - "columbia.edu", - "constantcontact.com", - "conviva.com", - "cornell.edu", - "coupang.com", - "coursera.org", - "cpanel.net", - "craigslist.org", - "crashlytics.com", "creativecommons.org", "criteo.com", - "criteo.net", - "csdn.net", "cupfox.app", "dailymail.co.uk", - "dailymotion.com", - "datadoghq.com", - "daum.net", - "db.com", - "dcinside.com", "ddnss.de", "debian.org", - "deepl.com", - "dell.com", - "deloitte.com", - "desjardins.com", - "deviantart.com", - "dhl.com", - "digg.com", - "digicert.com", - "digikala.com", "digitalocean.com", - "discord.com", - "discord.gg", - "discordapp.com", - "disqus.com", - "dmm.co.jp", - "dns.com", - "dnsimple.com", - "dnsmadeeasy.com", - "docin.com", - "docker.com", "doi.org", "domainmarket.com", "doubleclick.net", - "doubleverify.com", - "douyin.com", - "douyu.com", "dreamhost.com", - "dribbble.com", "dropbox.com", - "dtdc.com", - "duckdns.org", - "duckduckgo.com", - "duolingo.com", - "dw.com", - "dyndns.org", "dynect.net", - "e-hentai.org", - "ea.com", - "eastday.com", - "ebay.co.uk", - "ebay.com", - "economist.com", "ed.gov", - "eepurl.com", "elegantthemes.com", "elpais.com", - "elsevier.com", - "engadget.com", - "entergy.com", - "entrepreneur.com", - "entrust.net", - "envato.com", "epa.gov", - "epam.com", - "epicgames.com", "eporner.com", - "erome.com", - "eset.com", "espn.com", - "etsy.com", "europa.eu", - "eventbrite.com", - "evernote.com", - "exacttarget.com", "example.com", - "expedia.com", "facebook.com", - "fandom.com", - "fao.org", - "fastly.net", "fb.com", "fb.me", "fb.watch", "fbcdn.net", - "fc2.com", - "fda.gov", - "fedex.com", "feedburner.com", - "figma.com", - "fiverr.com", - "flickr.com", - "flipkart.com", - "fmkorea.com", - "forbes.com", - "force.com", - "ford.com", - "forter.com", - "fortune.com", - "foxnews.com", "free.fr", - "freepik.com", - "frontiersin.org", - "ft.com", "ftc.gov", "g.page", - "gamersky.com", - "gamespot.com", - "gandi.net", - "gartner.com", - "geeksforgeeks.org", - "gene.com", - "genius.com", "getbootstrap.com", - "getpocket.com", - "giphy.com", - "gitee.com", - "github.com", - "github.io", "gitlab.com", - "glance.net", - "globo.com", "gmail.com", "gnu.org", - "gofile.io", - "gofundme.com", "goo.gl", - "goodreads.com", "google-analytics.com", "google.ca", "google.co.in", "google.co.jp", "google.co.th", "google.co.uk", - "google.com", "google.com.au", "google.com.br", "google.com.hk", "google.com.mx", "google.com.tr", "google.com.tw", + "google.com", "google.de", "google.es", "google.fr", "google.it", "googledomains.com", "googlesyndication.com", - "grammarly.com", - "gravatar.com", "gstatic.com", - "guardian.co.uk", "harvard.edu", - "hbo.com", - "hbr.org", - "healthline.com", - "herokuapp.com", - "hhs.gov", - "hilton.com", - "hinet.net", "hitomi.la", - "homeadvisor.com", - "homedepot.com", - "hostgator.com", - "hostgator.com.br", - "hotjar.com", - "hotstar.com", - "house.gov", - "hp.com", - "huawei.com", "hubspot.com", - "huffingtonpost.com", - "huffpost.com", "hugedomains.com", - "hulu.com", - "hupu.com", - "huya.com", "ibm.com", - "icloud-content.com", "icloud.com", - "ieee.org", - "ietf.org", - "ifeng.com", - "ign.com", "ikea.com", "ilovepdf.com", - "imdb.com", - "imgur.com", - "immunet.com", - "independent.co.uk", "indiatimes.com", - "infonline.de", "instagram.com", - "instructure.com", - "intel.com", - "intermedia.net", - "intuit.com", "investing.com", "investopedia.com", - "iqiyi.com", "irs.gov", - "issuu.com", - "istockphoto.com", - "ixigua.com", - "jagex.com", - "jb51.net", - "jd.com", - "jetbrains.com", - "jhu.edu", - "jianshu.com", - "jimdo.com", - "jotform.com", - "jquery.com", - "jstor.org", - "kakao.com", - "kaspersky.com", "kickstarter.com", - "klaviyo.com", - "lagou.com", - "latimes.com", - "launchdarkly.com", "launchpad.net", "lencr.org", - "lenovo.com", "lijit.com", - "line.me", "linkedin.com", - "linktr.ee", "linode.com", - "live.com", - "livejasmin.com", - "loc.gov", - "magento.cloud", - "mailchi.mp", - "mailchimp.com", - "marca.com", - "marketwatch.com", - "markmonitor.com", - "marriott.com", "mashable.com", - "mathtag.com", - "mayoclinic.org", - "mcafee.com", - "mckinsey.com", - "mdpi.com", - "medallia.com", - "media.net", - "mediafire.com", - "mediatek.com", - "medicalnewstoday.com", "medium.com", "mega.co.nz", "mega.nz", - "meraki.com", "merriam-webster.com", - "mi.com", - "microsoft.com", - "mirror.co.uk", "mit.edu", - "miui.com", - "moatads.com", - "mongodb.com", - "mozilla.com", - "mozilla.org", - "msn.com", - "my.com", - "myanimelist.net", - "myfritz.net", - "myspace.com", - "mzstatic.com", - "name.com", - "nasa.gov", - "nationalgeographic.com", - "nature.com", - "naver.com", - "nba.com", - "nbcnews.com", - "nest.com", "netflix.com", - "networkadvertising.org", - "newrelic.com", - "newsweek.com", - "newyorker.com", - "ngenix.net", - "nginx.com", "nginx.org", - "nicovideo.jp", - "nih.gov", - "nike.com", - "nintendo.com", "nist.gov", - "no-ip.com", - "noaa.gov", - "nordvpn.com", - "norton.com", "notion.so", - "npr.org", - "nps.gov", "nsone.net", - "ntp.org", - "nvidia.com", - "ny.gov", - "nypost.com", - "nytimes.com", - "oecd.org", "office.com", - "office365.com", "onetrust.com", - "onlyfans.com", - "openai.com", - "opendns.com", "openstreetmap.org", - "opera.com", - "optimizely.com", - "oreilly.com", - "otm-r.com", - "oup.com", - "outbrain.com", - "outlook.com", - "ovh.net", - "ox.ac.uk", - "paloaltonetworks.com", "patreon.com", - "paypal.com", - "pbs.org", - "pewresearch.org", "pexels.com", "photobucket.com", "php.net", - "pinimg.com", - "pinterest.com", - "pixiv.net", "pki.goog", - "playstation.com", - "plesk.com", "plos.org", - "pnas.org", - "pornhub.com", - "primevideo.com", - "princeton.edu", - "prnewswire.com", "ps.kz", - "psu.edu", - "psychologytoday.com", - "pubmatic.com", - "purdue.edu", - "python.org", - "qidian.com", - "qq.com", - "qualtrics.com", - "quora.com", - "rackspace.com", - "rakuten.co.jp", - "rapid7.com", "readthedocs.io", "redd.it", "reddit.com", - "redhat.com", "remove.bg", - "researchgate.net", - "reuters.com", - "revopush.com", "rfc-editor.org", - "ring.com", - "roblox.com", - "roche.com", - "roku.com", - "rspamd.com", - "rt.com", - "rubiconproject.com", - "sagepub.com", - "salesforce.com", - "samsung.com", - "samsungapps.com", "savefrom.net", - "science.org", - "sciencemag.org", - "scientificamerican.com", - "scorecardresearch.com", - "scribd.com", - "secureserver.net", "sedo.com", - "sensic.net", - "sentry.io", - "service-now.com", - "sfgate.com", - "sharepoint.com", - "sharethrough.com", - "shein.com", - "shopee.tw", - "shopify.com", - "shutterstock.com", - "sitemaps.org", - "skype.com", - "skyrock.net", - "slack.com", - "slate.com", - "slideshare.net", - "smi2.net", - "smzdm.com", - "snapchat.com", "so-net.ne.jp", - "so.com", - "softonic.com", - "sogou.com", - "sohu.com", - "sophos.com", - "soundcloud.com", "sourceforge.net", "spamhaus.org", - "spankbang.com", "speedtest.net", - "spiegel.de", "spotify.com", - "springer.com", - "squarespace.com", - "stackexchange.com", - "stackoverflow.com", "stanford.edu", - "statcounter.com", "state.gov", - "statista.com", - "steamcommunity.com", - "steampowered.com", - "stripe.com", - "stumbleupon.com", "substack.com", - "superhosting.bg", - "surveymonkey.com", - "swrve.com", - "sxyprn.com", - "t-mobile.com", - "t.co", "t.me", "taboola.com", - "tandfonline.com", - "taobao.com", - "tapad.com", - "target.com", - "tds.net", - "teamviewer.com", "techcrunch.com", - "ted.com", "telegram.me", "telegram.org", - "telegraph.co.uk", - "tencent.com", - "tesla.com", - "theatlantic.com", - "theconversation.com", - "theguardian.com", - "thelancet.com", - "themeforest.net", - "thesun.co.uk", - "theverge.com", "threema.ch", - "tiktok.com", - "time.com", - "timeanddate.com", "tinyurl.com", - "tistory.com", - "tmall.com", - "toutiao.com", - "tradingview.com", - "trello.com", - "trendmicro.com", - "trendyol.com", - "tripadvisor.com", - "tripod.com", - "trustpilot.com", - "tumblr.com", - "twitch.tv", - "twitter.com", - "typeform.com", - "typepad.com", - "uber.com", - "ubi.com", "ubuntu.com", - "uchicago.edu", - "uci.edu", - "ucla.edu", - "udemy.com", "ui.com", - "umbrella.com", "umich.edu", - "umn.edu", - "un.org", - "unity3d.com", - "unsplash.com", "uol.com.br", "upenn.edu", - "ups.com", - "usatoday.com", - "usda.gov", - "userapi.com", "usgs.gov", - "usps.com", "utexas.edu", - "utorrent.com", - "uwindsor.ca", "va.gov", - "variety.com", - "varzesh3.com", "verisign.com", - "verizon.com", - "vice.com", - "vimeo.com", - "visualstudio.com", - "vk.com", "vmware.com", - "vox.com", - "vungle.com", "w3.org", - "w3schools.com", "wa.me", - "walmart.com", - "warnerbros.com", - "washington.edu", - "washingtonpost.com", - "wattpad.com", - "wbx2.com", - "weather.com", - "webex.com", - "webmd.com", "webs.com", - "weebly.com", - "weforum.org", - "weibo.com", - "wellsfargo.com", - "wetransfer.com", "whatsapp.com", "whatsapp.net", "whitehouse.gov", - "who.int", - "wikihow.com", "wikimedia.org", "wikipedia.org", "wiktionary.org", - "wiley.com", - "windows.com", - "wired.com", - "wisc.edu", - "wix.com", - "wordpress.com", - "wordpress.org", - "wp.com", - "wpengine.com", - "www.alipay.com", - "www.azurefd.net", + "www.aliyundrive.com", + "www.amazon.ca", + "www.amazon.co.jp", + "www.amazon.co.uk", + "www.amazon.com", + "www.amazon.de", + "www.amazon.es", + "www.amazon.fr", + "www.amazon.in", + "www.amazon.it", + "www.aol.com", + "www.appsflyer.com", + "www.att.com", + "www.business.site", + "www.ca.gov", + "www.canada.ca", + "www.cctv.com", + "www.cdc.gov", + "www.chinaz.com", "www.cloud.com", + "www.cnet.com", "www.comcast.com", "www.comcast.net", - "www.dbankedge.net", - "www.demdex.net", - "www.eastmoney.com", - "www.exelator.com", - "www.fisglobal.com", - "www.fontawesome.com", - "www.fraiche.com.mx", + "www.cornell.edu", + "www.crashlytics.com", + "www.datadoghq.com", + "www.db.com", + "www.deloitte.com", + "www.dw.com", + "www.engadget.com", + "www.eset.com", + "www.fao.org", + "www.fedex.com", + "www.flickr.com", + "www.force.com", + "www.ford.com", + "www.frontiersin.org", + "www.geeksforgeeks.org", + "www.gene.com", + "www.genius.com", + "www.github.io", "www.gov.uk", - "www.hao123.com", - "www.hbonow.com", + "www.gravatar.com", + "www.healthline.com", + "www.hhs.gov", "www.hichina.com", - "www.hicloud.com", + "www.hinet.net", + "www.house.gov", + "www.hp.com", + "www.huawei.com", + "www.hupu.com", + "www.ietf.org", + "www.immunet.com", + "www.independent.co.uk", + "www.intel.com", + "www.jotform.com", + "www.klaviyo.com", + "www.launchdarkly.com", + "www.live.com", "www.macromedia.com", - "www.maricopa.gov", + "www.medallia.com", + "www.mediatek.com", + "www.medicalnewstoday.com", + "www.microsoft.com", + "www.mongodb.com", "www.mysql.com", "www.namu.wiki", - "www.netease.com", + "www.nasa.gov", + "www.nba.com", + "www.nbcnews.com", + "www.nih.gov", + "www.noaa.gov", + "www.npr.org", + "www.nps.gov", + "www.ny.gov", "www.okta.com", + "www.openai.com", + "www.optimizely.com", "www.oracle.com", + "www.outlook.com", + "www.paloaltonetworks.com", + "www.pbs.org", "www.pixabay.com", "www.plala.or.jp", - "www.playfabapi.com", - "www.prudential.com", + "www.playstation.com", + "www.plesk.com", + "www.princeton.edu", + "www.prnewswire.com", + "www.psu.edu", + "www.python.org", + "www.qq.com", "www.quantserve.com", "www.quillbot.com", - "www.rayjump.com", - "www.riotgames.com", - "www.runoob.com", - "www.smartadserver.com", - "www.stripchat.com", + "www.rackspace.com", + "www.redhat.com", + "www.researchgate.net", + "www.roku.com", + "www.salesforce.com", + "www.skype.com", "www.sun.com", - "www.supersonicads.com", - "www.tiktokv.com", - "www.unesco.org", - "www.webrootcloudav.com", - "www.wixsite.com", + "www.teamviewer.com", + "www.ted.com", + "www.tesla.com", + "www.theguardian.com", + "www.typeform.com", + "www.uchicago.edu", + "www.ucla.edu", + "www.usda.gov", + "www.usps.com", + "www.utorrent.com", + "www.warnerbros.com", + "www.webex.com", + "www.who.int", "www.worldbank.org", - "www.xhamsterlive.com", - "www.xinhuanet.com", - "xbox.com", - "xerox.com", - "xfinity.com", - "xhamster.com", - "xiaohongshu.com", - "xiaomi.com", - "xing.com", - "xnxx.com", - "xvideos.com", - "xxxxx520.com", - "y2mate.com", - "yahoo.co.jp", + "www.xbox.com", + "www.xerox.com", + "www.youdao.com", + "www.zdnet.com", + "www.zebra.com", "yahoo.com", "yale.edu", "yandex.com", "yandex.net", - "yelp.com", - "yimg.com", - "yiyouliao.com", - "youdao.com", "youku.com", - "youronlinechoices.com", "youtu.be", "youtube.com", - "ys7.com", - "yts.mx", - "zdnet.com", - "zebra.com", "zemanta.com", - "zendesk.com", - "zhihu.com", - "zillow.com", - "zoho.com", - "zoom.us", "zoro.to", } diff --git a/patrol/domains_test.go b/patrol/domains_test.go index 060eb15..4549db6 100644 --- a/patrol/domains_test.go +++ b/patrol/domains_test.go @@ -19,30 +19,29 @@ func TestCleanDomains(t *testing.T) { //nolint:paralleltest return } + // Setup context. + ctx := context.Background() + // Go through all domains and check if they are reachable. goodDomains := make([]string, 0, len(testDomains)) for _, domain := range testDomains { - if domain == "addtoany.com" { - break - } - // Check if domain is reachable. - code, err := CheckHTTPSConnection(context.Background(), domain) + code, err := domainIsUsable(ctx, domain) if err != nil { - t.Logf("FAIL: %s: %s", domain, err) + fmt.Printf("FAIL: %s: %s\n", domain, err) } else { - t.Logf("OK: %s [%d]", domain, code) + fmt.Printf("OK: %s [%d]\n", domain, code) goodDomains = append(goodDomains, domain) continue } // If failed, try again with a www. prefix wwwDomain := "www." + domain - code, err = CheckHTTPSConnection(context.Background(), wwwDomain) + code, err = domainIsUsable(ctx, wwwDomain) if err != nil { - t.Logf("FAIL: %s: %s", wwwDomain, err) + fmt.Printf("FAIL: %s: %s\n", wwwDomain, err) } else { - t.Logf("OK: %s [%d]", wwwDomain, code) + fmt.Printf("OK: %s [%d]\n", wwwDomain, code) goodDomains = append(goodDomains, wwwDomain) } @@ -56,3 +55,13 @@ func TestCleanDomains(t *testing.T) { //nolint:paralleltest fmt.Println("IMPORTANT: do not forget to go through list and check if everything looks good") } + +func domainIsUsable(ctx context.Context, domain string) (statusCode int, err error) { + // Try IPv6 first as it is way more likely to fail. + statusCode, err = CheckHTTPSConnection(ctx, "tcp6", domain) + if err != nil { + return + } + + return CheckHTTPSConnection(ctx, "tcp4", domain) +} diff --git a/patrol/http.go b/patrol/http.go index 32701ec..c08974b 100644 --- a/patrol/http.go +++ b/patrol/http.go @@ -11,6 +11,7 @@ import ( "github.com/safing/portbase/log" "github.com/safing/portbase/modules" + "github.com/safing/spn/conf" ) var httpsConnectivityConfirmed = abool.NewBool(true) @@ -21,92 +22,60 @@ func HTTPSConnectivityConfirmed() bool { return httpsConnectivityConfirmed.IsSet() } -// CheckHTTPSConnection checks if a HTTPS connection to the given domain can be established. -func CheckHTTPSConnection(ctx context.Context, domain string) (statusCode int, err error) { - // Build URL. - // Use HTTPS to ensure that we have really communicated with the desired - // server and not with an intermediate. - url := fmt.Sprintf("https://%s/", domain) - - // Prepare all parts of the request. - // TODO: Evaluate if we want to change the User-Agent. - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return 0, err - } - dialer := &net.Dialer{ - Timeout: 5 * time.Second, - } - client := &http.Client{ - Transport: &http.Transport{ - DialContext: dialer.DialContext, - DisableKeepAlives: true, - DisableCompression: true, - TLSHandshakeTimeout: 5 * time.Second, - }, - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - Timeout: 10 * time.Second, - } - - // Make request to server. - resp, err := client.Do(req) - if err != nil { - return 0, fmt.Errorf("failed to send http request: %w", err) - } - defer func() { - _ = resp.Body.Close() - }() - if resp.StatusCode < 200 || resp.StatusCode >= 400 { - return resp.StatusCode, fmt.Errorf("unexpected status code: %s", resp.Status) - } - - return resp.StatusCode, nil -} - -func httpsConnectivityCheck(ctx context.Context, task *modules.Task) error { +func connectivityCheckTask(ctx context.Context, task *modules.Task) error { // Start tracing logs. ctx, tracer := log.AddTracer(ctx) defer tracer.Submit() // Run checks and report status. - success := runHTTPSConnectivityChecks(ctx) + success := runConnectivityChecks(ctx) if success { - tracer.Info("spn/patrol: https connectivity check succeeded") + tracer.Info("spn/patrol: all connectivity checks succeeded") if httpsConnectivityConfirmed.SetToIf(false, true) { module.TriggerEvent(ChangeSignalEventName, nil) } return nil } - tracer.Errorf("spn/patrol: https connectivity check failed") + tracer.Errorf("spn/patrol: connectivity check failed") if httpsConnectivityConfirmed.SetToIf(true, false) { module.TriggerEvent(ChangeSignalEventName, nil) } return nil } -func runHTTPSConnectivityChecks(ctx context.Context) (ok bool) { +func runConnectivityChecks(ctx context.Context) (ok bool) { + switch { + case conf.HubHasIPv4() && !runHTTPSConnectivityChecks(ctx, "tcp4"): + return false + case conf.HubHasIPv6() && !runHTTPSConnectivityChecks(ctx, "tcp6"): + return false + default: + // All checks passed. + return true + } +} + +func runHTTPSConnectivityChecks(ctx context.Context, network string) (ok bool) { // Step 1: Check 1 domain, require 100% - if checkHTTPSConnectivity(ctx, 1, 1) { + if checkHTTPSConnectivity(ctx, network, 1, 1) { return true } // Step 2: Check 5 domains, require 80% - if checkHTTPSConnectivity(ctx, 5, 0.8) { + if checkHTTPSConnectivity(ctx, network, 5, 0.8) { return true } - // Step 3: Check 20 domains, require 90% - if checkHTTPSConnectivity(ctx, 20, 0.9) { + // Step 3: Check 20 domains, require 70% + if checkHTTPSConnectivity(ctx, network, 20, 0.7) { return true } return false } -func checkHTTPSConnectivity(ctx context.Context, checks int, requiredSuccessFraction float32) (ok bool) { +func checkHTTPSConnectivity(ctx context.Context, network string, checks int, requiredSuccessFraction float32) (ok bool) { log.Tracer(ctx).Tracef( "spn/patrol: testing connectivity via https (%d checks; %.0f%% required)", checks, @@ -116,7 +85,7 @@ func checkHTTPSConnectivity(ctx context.Context, checks int, requiredSuccessFrac // Run tests. var succeeded int for i := 0; i < checks; i++ { - if checkHTTPSConnection(ctx) { + if checkHTTPSConnection(ctx, network) { succeeded++ } } @@ -125,7 +94,8 @@ func checkHTTPSConnectivity(ctx context.Context, checks int, requiredSuccessFrac successFraction := float32(succeeded) / float32(checks) if successFraction < requiredSuccessFraction { log.Tracer(ctx).Warningf( - "spn/patrol: https connectivity check failed: %d/%d (%.0f%%)", + "spn/patrol: https/%s connectivity check failed: %d/%d (%.0f%%)", + network, succeeded, checks, successFraction*100, @@ -134,7 +104,8 @@ func checkHTTPSConnectivity(ctx context.Context, checks int, requiredSuccessFrac } log.Tracer(ctx).Debugf( - "spn/patrol: https connectivity check succeeded: %d/%d (%.0f%%)", + "spn/patrol: https/%s connectivity check succeeded: %d/%d (%.0f%%)", + network, succeeded, checks, successFraction*100, @@ -142,14 +113,73 @@ func checkHTTPSConnectivity(ctx context.Context, checks int, requiredSuccessFrac return true } -func checkHTTPSConnection(ctx context.Context) (ok bool) { +func checkHTTPSConnection(ctx context.Context, network string) (ok bool) { testDomain := getRandomTestDomain() - code, err := CheckHTTPSConnection(ctx, testDomain) + code, err := CheckHTTPSConnection(ctx, network, testDomain) if err != nil { - log.Tracer(ctx).Debugf("spn/patrol: https connect check failed: %s: %s", testDomain, err) + log.Tracer(ctx).Debugf("spn/patrol: https/%s connect check failed: %s: %s", network, testDomain, err) return false } - log.Tracer(ctx).Tracef("spn/patrol: https connect check succeeded: %s [%d]", testDomain, code) + log.Tracer(ctx).Tracef("spn/patrol: https/%s connect check succeeded: %s [%d]", network, testDomain, code) return true } + +// CheckHTTPSConnection checks if a HTTPS connection to the given domain can be established. +func CheckHTTPSConnection(ctx context.Context, network, domain string) (statusCode int, err error) { + // Check network parameter. + switch network { + case "tcp4": + case "tcp6": + default: + return 0, fmt.Errorf("provided unsupported network: %s", network) + } + + // Build URL. + // Use HTTPS to ensure that we have really communicated with the desired + // server and not with an intermediate. + url := fmt.Sprintf("https://%s/", domain) + + // Prepare all parts of the request. + // TODO: Evaluate if we want to change the User-Agent. + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return 0, err + } + dialer := &net.Dialer{ + Timeout: 15 * time.Second, + FallbackDelay: -1, // Disables Fast Fallback from IPv6 to IPv4. + KeepAlive: -1, // Disable keep-alive. + } + dialWithNet := func(ctx context.Context, _, addr string) (net.Conn, error) { + // Ignore network by http client. + // Instead, force either tcp4 or tcp6. + return dialer.DialContext(ctx, network, addr) + } + client := &http.Client{ + Transport: &http.Transport{ + DialContext: dialWithNet, + DisableKeepAlives: true, + DisableCompression: true, + TLSHandshakeTimeout: 15 * time.Second, + }, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + Timeout: 30 * time.Second, + } + + // Make request to server. + resp, err := client.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to send http request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + return resp.StatusCode, fmt.Errorf("unexpected status code: %s", resp.Status) + } + + return resp.StatusCode, nil +} diff --git a/patrol/module.go b/patrol/module.go index 585b813..247b811 100644 --- a/patrol/module.go +++ b/patrol/module.go @@ -24,8 +24,8 @@ func prep() error { func start() error { if conf.PublicHub() { - module.NewTask("https connectivity test", httpsConnectivityCheck). - Repeat(1 * time.Minute) + module.NewTask("connectivity test", connectivityCheckTask). + Repeat(5 * time.Minute) } return nil diff --git a/terminal/control_flow.go b/terminal/control_flow.go index 17ee497..9bbb4ba 100644 --- a/terminal/control_flow.go +++ b/terminal/control_flow.go @@ -17,7 +17,7 @@ type FlowControl interface { Receive() <-chan *Msg Send(msg *Msg, timeout time.Duration) *Error ReadyToSend() <-chan struct{} - Flush() + Flush(timeout time.Duration) StartWorkers(m *modules.Module, terminalName string) RecvQueueLen() int SendQueueLen() int @@ -270,7 +270,6 @@ sending: msg.Data.Prepend(varint.Pack64(uint64(dfq.reportableRecvSpace()))) // Submit for sending upstream. - msg.Unit.Pause() dfq.submitUpstream(msg, 0) // Decrease the send space and set flag if depleted. if dfq.decrementSendSpace() <= 0 { @@ -317,7 +316,7 @@ sending: } // Flush waits for all waiting data to be sent. -func (dfq *DuplexFlowQueue) Flush() { +func (dfq *DuplexFlowQueue) Flush(timeout time.Duration) { // Create channel and function for notifying. wait := make(chan struct{}) finished := func() { @@ -328,11 +327,15 @@ func (dfq *DuplexFlowQueue) Flush() { case dfq.flush <- finished: case <-dfq.ctx.Done(): return + case <-TimedOut(timeout): + return } // Wait for flush to finish and return when stopping. select { case <-wait: case <-dfq.ctx.Done(): + case <-TimedOut(timeout): + return } } @@ -354,7 +357,6 @@ func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{} { func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error { select { case dfq.sendQueue <- msg: - msg.Unit.Pause() if msg.Unit.IsHighPriority() { // Reset prioMsgs to the current queue size, so that all waiting and the // message we just added are all handled as high priority. @@ -408,7 +410,6 @@ func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error { return nil } - msg.Unit.Pause() select { case dfq.recvQueue <- msg: diff --git a/terminal/metrics.go b/terminal/metrics.go index 6c12a03..51ccbeb 100644 --- a/terminal/metrics.go +++ b/terminal/metrics.go @@ -1,6 +1,9 @@ package terminal import ( + "sync" + "time" + "github.com/tevino/abool" "github.com/safing/portbase/api" @@ -16,11 +19,11 @@ func registerMetrics() (err error) { } _, err = metrics.NewGauge( - "spn/scheduling/unit/currentunitid", + "spn/scheduling/unit/slotpace/max", nil, - floatify(scheduler.GetCurrentUnitID), + getMaxLeveledSlotPace, &metrics.Options{ - Name: "SPN Scheduling Current Unit ID", + Name: "SPN Scheduling Max Leveled Slot Pace", Permission: api.PermitUser, }, ) @@ -29,11 +32,11 @@ func registerMetrics() (err error) { } _, err = metrics.NewGauge( - "spn/scheduling/unit/slotpace", + "spn/scheduling/unit/slotpace/avg", nil, - floatify(scheduler.GetSlotPace), + getAvgSlotPace, &metrics.Options{ - Name: "SPN Scheduling Current Slot Pace", + Name: "SPN Scheduling Avg Slot Pace", Permission: api.PermitUser, }, ) @@ -41,37 +44,41 @@ func registerMetrics() (err error) { return err } - _, err = metrics.NewGauge( - "spn/scheduling/unit/clearanceupto", - nil, - floatify(scheduler.GetClearanceUpTo), - &metrics.Options{ - Name: "SPN Scheduling Clearance Up to Unit ID", - Permission: api.PermitUser, - }, - ) - if err != nil { - return err - } + return nil +} - _, err = metrics.NewGauge( - "spn/scheduling/unit/finished", - nil, - floatify(scheduler.GetFinished), - &metrics.Options{ - Name: "SPN Scheduling Finished Units", - Permission: api.PermitUser, - }, - ) - if err != nil { - return err +var ( + nextMaxLeveledPaceReset time.Time + nextMaxLeveledPaceResetLock sync.Mutex + + nextAvgSlotPaceReset time.Time + nextAvgSlotPaceResetLock sync.Mutex +) + +func getMaxLeveledSlotPace() float64 { + value := float64(scheduler.GetMaxLeveledSlotPace()) + + nextMaxLeveledPaceResetLock.Lock() + defer nextMaxLeveledPaceResetLock.Unlock() + + if time.Now().After(nextMaxLeveledPaceReset) { + nextMaxLeveledPaceReset = time.Now().Add(50 * time.Second) + scheduler.ResetMaxLeveledSlotPace() } - return nil + return value } -func floatify(fn func() int64) func() float64 { - return func() float64 { - return float64(fn()) +func getAvgSlotPace() float64 { + value := float64(scheduler.GetAvgSlotPace()) + + nextAvgSlotPaceResetLock.Lock() + defer nextAvgSlotPaceResetLock.Unlock() + + if time.Now().After(nextAvgSlotPaceReset) { + nextAvgSlotPaceReset = time.Now().Add(50 * time.Second) + scheduler.ResetAvgSlotPace() } + + return value } diff --git a/terminal/module.go b/terminal/module.go index ad0a291..6ba30dd 100644 --- a/terminal/module.go +++ b/terminal/module.go @@ -59,12 +59,19 @@ func StopScheduler() { } func schedulerConfig() *unit.SchedulerConfig { + // Client Scheduler Config. if conf.Client() { return &unit.SchedulerConfig{ - MinSlotPace: 10, // 1000pps - Choose a small starting pace for low end devices. - AdjustFractionPerStreak: 10, // 10% - Adapt quicker, as clients will often be at min pace. + MinSlotPace: 10, // 1000pps - Choose a small starting pace for low end devices. + WorkSlotPercentage: 0.9, // 90% + SlotChangeRatePerStreak: 0.1, // 10% } } - return nil + // Server Scheduler Config. + return &unit.SchedulerConfig{ + MinSlotPace: 100, + WorkSlotPercentage: 0.7, // 70% + SlotChangeRatePerStreak: 0.02, // 2% + } } diff --git a/terminal/msg.go b/terminal/msg.go index 56745d7..f006090 100644 --- a/terminal/msg.go +++ b/terminal/msg.go @@ -95,7 +95,7 @@ func (msg *Msg) Debug() { msg.debugWithCaller(2) } -func (msg *Msg) debugWithCaller(skip int) { +func (msg *Msg) debugWithCaller(skip int) { //nolint:unparam if !debugUnitScheduling || msg == nil { return } diff --git a/terminal/operation.go b/terminal/operation.go index d72458c..b6ab49b 100644 --- a/terminal/operation.go +++ b/terminal/operation.go @@ -210,7 +210,6 @@ func (t *TerminalBase) Send(msg *Msg, timeout time.Duration) *Error { // Check if the send queue has available space. select { case t.sendQueue <- msg: - msg.Unit.Pause() return nil default: } @@ -218,7 +217,6 @@ func (t *TerminalBase) Send(msg *Msg, timeout time.Duration) *Error { // Submit message to buffer, if space is available. select { case t.sendQueue <- msg: - msg.Unit.Pause() return nil case <-TimedOut(timeout): msg.Finish() diff --git a/terminal/operation_base.go b/terminal/operation_base.go index 65ced9b..84d5572 100644 --- a/terminal/operation_base.go +++ b/terminal/operation_base.go @@ -46,7 +46,7 @@ func (op *OperationBase) NewMsg(data []byte) *Msg { msg.Type = MsgTypeData // Debug unit leaks. - msg.Debug() + msg.debugWithCaller(2) return msg } @@ -59,7 +59,7 @@ func (op *OperationBase) NewEmptyMsg() *Msg { msg.Type = MsgTypeData // Debug unit leaks. - msg.Debug() + msg.debugWithCaller(2) return msg } @@ -76,9 +76,6 @@ func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error { // Wait for processing slot. msg.Unit.WaitForSlot() - // Pause unit before handing away. - msg.Unit.Pause() - // Send message. tErr := op.terminal.Send(msg, timeout) if tErr != nil { diff --git a/terminal/terminal.go b/terminal/terminal.go index b151ffa..739df4a 100644 --- a/terminal/terminal.go +++ b/terminal/terminal.go @@ -215,9 +215,6 @@ func (t *TerminalBase) SetTimeout(d time.Duration) { // Deliver on TerminalBase only exists to conform to the interface. It must be // overridden by an actual implementation. func (t *TerminalBase) Deliver(msg *Msg) *Error { - // Pause unit before handing away. - msg.Unit.Pause() - // Deliver via configured proxy. err := t.deliverProxy(msg) if err != nil { @@ -292,9 +289,6 @@ func (t *TerminalBase) submit(msg *Msg, timeout time.Duration) { return } - // Pause unit before handing away. - msg.Unit.Pause() - // Hand over to flow control. err := t.flowControl.Send(msg, timeout) if err != nil { @@ -309,9 +303,6 @@ func (t *TerminalBase) submitToUpstream(msg *Msg, timeout time.Duration) { // Add terminal ID as flow ID. msg.FlowID = t.ID() - // Pause unit before handing away. - msg.Unit.Pause() - // Debug unit leaks. msg.Debug() @@ -418,8 +409,6 @@ handling: msgBufferMsg = msg msgBufferMsg.FlowID = t.ID() msgBufferMsg.Type = MsgTypeData - // Wait for clearance on initial msg only. - msgBufferMsg.Unit.WaitForSlot() } msgBufferLen += msg.Data.Length() @@ -483,6 +472,9 @@ handling: msgBufferMsg.Type = MsgTypePriorityData } + // Wait for clearance on initial msg only. + msgBufferMsg.Unit.WaitForSlot() + err = t.sendOpMsgs(msgBufferMsg) } @@ -538,7 +530,8 @@ func (t *TerminalBase) Flush() { // Flush flow control, if configured. if t.flowControl != nil { - t.flowControl.Flush() + // Flushing could mean sending a full buffer of 50000 packets. + t.flowControl.Flush(5 * time.Minute) } } @@ -678,9 +671,6 @@ func (t *TerminalBase) handleOpMsg(data *container.Container) *Error { msg.Unit.MakeHighPriority() } - // Pause unit before handing away. - msg.Unit.Pause() - // Deliver message to operation. tErr := op.Deliver(msg) if tErr != nil { diff --git a/unit/scheduler.go b/unit/scheduler.go index fce13e5..41e1d94 100644 --- a/unit/scheduler.go +++ b/unit/scheduler.go @@ -12,17 +12,16 @@ import ( ) const ( - defaultSlotDuration = 10 * time.Millisecond - defaultMinSlotPace = 100 // 10 000 pps - defaultEpochDuration = 1 * time.Minute + defaultSlotDuration = 10 * time.Millisecond // 100 slots per second + defaultMinSlotPace = 100 // 10 000 pps - defaultAdjustFractionPerStreak = 100 // 1% - defaultHighPriorityMaxReserveFraction = 4 // 25% + defaultWorkSlotPercentage = 0.7 // 70% + defaultSlotChangeRatePerStreak = 0.02 // 2% ) // Scheduler creates and schedules units. // Must be created using NewScheduler(). -type Scheduler struct { +type Scheduler struct { //nolint:maligned // Configuration. config SchedulerConfig @@ -30,26 +29,13 @@ type Scheduler struct { // currentUnitID holds the last assigned Unit ID. currentUnitID atomic.Int64 + // clearanceUpTo holds the current threshold up to which Unit ID Units may be processed. + clearanceUpTo atomic.Int64 // slotPace holds the current pace. This is the base value for clearance // calcuation, not the value of the current cleared Units itself. slotPace atomic.Int64 - // clearanceUpTo holds the current threshold up to which Unit ID Units may be processed. - clearanceUpTo atomic.Int64 - // finishedTotal holds the amount of units that were finished across all epochs. - finishedTotal atomic.Int64 - - // Epoch amounts. - - // epoch is the slot epoch counter for resetting special values. - epoch atomic.Int32 - // finished holds the amount of units that were finished within the current epoch. - // Not necessarily all Unit IDs below this value are actually finished. + // finished holds the amount of units that were finished within the current slot. finished atomic.Int64 - // pausedUnits holds the amount of unfinished Units which are currently marked as "paused". - // These Units are waiting for an external condition. - pausedUnits atomic.Int64 - // highPrioUnits holds the amount of unfinished Units which were marked as high priority. - highPrioUnits atomic.Int64 // Slot management. slotSignalA chan struct{} @@ -57,6 +43,11 @@ type Scheduler struct { slotSignalSwitch bool slotSignalsLock sync.RWMutex + // Stats. + maxLeveledPace atomic.Int64 + avgPaceSum atomic.Int64 + avgPaceCnt atomic.Int64 + stopping abool.AtomicBool unitDebugger *UnitDebugger @@ -71,15 +62,17 @@ type SchedulerConfig struct { // The slot pace will never fall below this value. MinSlotPace int64 - // EpochDuration defines the duration of one epoch. - // Set to 0 to disable epochs. - EpochDuration time.Duration - - // AdjustFractionPerStreak defines the fraction of the pace the pace itself changes in either direction to match the current use and load. - AdjustFractionPerStreak int64 - - // HighPriorityMaxReserveFraction defines the fraction of the pace that may - at maximum - be reserved for high priority units. - HighPriorityMaxReserveFraction int64 + // WorkSlotPercentage defines the how much of a slot should be scheduled with work. + // The remainder is for catching up and breathing room for other tasks. + // Must be between 55% (0.55) and 95% (0.95). + // The default value is 0.7 (70%). + WorkSlotPercentage float64 + + // SlotChangeRatePerStreak defines how many percent (0-1) the slot pace + // should change per streak. + // Is enforced to be able to change the minimum slot pace by at least 1. + // The default value is 0.02 (2%). + SlotChangeRatePerStreak float64 } // NewScheduler returns a new scheduler. @@ -103,28 +96,37 @@ func NewScheduler(config *SchedulerConfig) *Scheduler { if s.config.MinSlotPace == 0 { s.config.MinSlotPace = defaultMinSlotPace } - if s.config.EpochDuration == 0 { - s.config.EpochDuration = defaultEpochDuration + if s.config.WorkSlotPercentage == 0 { + s.config.WorkSlotPercentage = defaultWorkSlotPercentage } - if s.config.AdjustFractionPerStreak == 0 { - s.config.AdjustFractionPerStreak = defaultAdjustFractionPerStreak + if s.config.SlotChangeRatePerStreak == 0 { + s.config.SlotChangeRatePerStreak = defaultSlotChangeRatePerStreak } - if s.config.HighPriorityMaxReserveFraction == 0 { - s.config.HighPriorityMaxReserveFraction = defaultHighPriorityMaxReserveFraction + + // Check boundaries of WorkSlotPercentage. + switch { + case s.config.WorkSlotPercentage < 0.55: + s.config.WorkSlotPercentage = 0.55 + case s.config.WorkSlotPercentage > 0.95: + s.config.WorkSlotPercentage = 0.95 } - // The adjust fraction may not be bigger than the min slot pace. - if s.config.AdjustFractionPerStreak > s.config.MinSlotPace { - s.config.AdjustFractionPerStreak = s.config.MinSlotPace + // The slot change rate must be able to change the slot pace by at least 1. + if s.config.SlotChangeRatePerStreak < (1 / float64(s.config.MinSlotPace)) { + s.config.SlotChangeRatePerStreak = (1 / float64(s.config.MinSlotPace)) // Debug logging: - // fmt.Printf("--- reduced AdjustFractionPerStreak to %d\n", s.config.AdjustFractionPerStreak) + // fmt.Printf("--- increased SlotChangeRatePerStreak to %f\n", s.config.SlotChangeRatePerStreak) } // Initialize scheduler fields. s.clearanceUpTo.Store(s.config.MinSlotPace) s.slotPace.Store(s.config.MinSlotPace) + // Initialize stats fields. + s.avgPaceSum.Store(s.config.MinSlotPace) + s.avgPaceCnt.Store(1) + return s } @@ -159,96 +161,107 @@ func (s *Scheduler) announceNextSlot() { // Must only be started once. func (s *Scheduler) SlotScheduler(ctx context.Context) error { // Start slot ticker. - ticker := time.NewTicker(s.config.SlotDuration) + ticker := time.NewTicker(s.config.SlotDuration / 2) defer ticker.Stop() - // Calculate how many slots per epoch - var slotCnt int64 - slotsPerEpoch := s.config.EpochDuration / s.config.SlotDuration - // Give clearance to all when stopping. defer s.clearanceUpTo.Store(math.MaxInt64 - math.MaxInt32) var ( - lastClearanceAmount int64 - finishedAtStart int64 - increaseStreak int64 - decreaseStreak int64 - epochBase int64 + halfSlotID uint64 + increaseStreak float64 + decreaseStreak float64 + oneStreaks int ) for range ticker.C { - // Calculate how many units were finished in slot. - // Only load "finished" once, so we don't miss anything. - finishedAtEnd := s.finished.Load() - finishedInSlot := finishedAtEnd - finishedAtStart - s.finishedTotal.Add(finishedInSlot) - - // Adapt pace. - if finishedInSlot >= lastClearanceAmount { - // Adjust based on streak. - increaseStreak++ - decreaseStreak = 0 - s.slotPace.Add((s.slotPace.Load() / s.config.AdjustFractionPerStreak) * increaseStreak) - - // Debug logging: - // fmt.Printf("+++ slot pace: %d (finished in slot: %d, last clearance: %d, increaseStreak: %d, high: %d)\n", s.slotPace.Load(), finishedInSlot, lastClearanceAmount, increaseStreak, s.highPrioUnits.Load()) - } else { - // Adjust based on streak. - decreaseStreak++ - increaseStreak = 0 - s.slotPace.Add(-((s.slotPace.Load() / s.config.AdjustFractionPerStreak) * decreaseStreak)) - - // Enforce minimum. - if s.slotPace.Load() < s.config.MinSlotPace { - s.slotPace.Store(s.config.MinSlotPace) - decreaseStreak = 0 - } + switch { + case halfSlotID%2 == 0: - // Debug logging: - // fmt.Printf("--- slot pace: %d (finished in slot: %d, last clearance: %d, decreaseStreak: %d, high: %d)\n", s.slotPace.Load(), finishedInSlot, lastClearanceAmount, decreaseStreak, s.highPrioUnits.Load()) - } + // First Half-Slot: Work Slot - // Advance epoch if needed. - slotCnt++ - if slotCnt%int64(slotsPerEpoch) == 0 { - slotCnt = 0 + // Reset slot counters. + s.finished.Store(0) - // Switch to new epoch. - s.epoch.Add(1) + // Raise clearance according + s.clearanceUpTo.Store( + s.currentUnitID.Load() + + int64( + float64(s.slotPace.Load())*s.config.WorkSlotPercentage, + ), + ) - // Only reduce by amount we have seen, for correct metrics. - s.finished.Add(-finishedAtEnd) - finishedAtEnd = 0 + // Announce start of new slot. + s.announceNextSlot() - // Raise the epoch base to the current unit ID. - epochBase = s.currentUnitID.Load() + default: - // Reset counters. - s.highPrioUnits.Store(0) - s.pausedUnits.Store(0) + // Second Half-Slot: Catch-Up Slot - // Debug logging: - // fmt.Printf("--- new epoch\n") - } + // Calculate slot pace with performance of first half-slot. + // Get current slot pace as float64. + currentSlotPace := float64(s.slotPace.Load()) + // Calculate current raw slot pace. + newRawSlotPace := float64(s.finished.Load() * 2) + + // Move slot pace in the trending direction. + if newRawSlotPace >= currentSlotPace { + // Adjust based on streak. + increaseStreak++ + decreaseStreak = 0 + s.slotPace.Add(int64( + currentSlotPace * s.config.SlotChangeRatePerStreak * increaseStreak, + )) + + // Count one-streaks. + if increaseStreak == 1 { + oneStreaks++ + } else { + oneStreaks = 0 + } + + // Debug logging: + // fmt.Printf("+++ slot pace: %.0f (current raw pace: %.0f, increaseStreak: %.0f, clearanceUpTo: %d)\n", currentSlotPace, newRawSlotPace, increaseStreak, s.clearanceUpTo.Load()) + } else { + // Adjust based on streak. + decreaseStreak++ + increaseStreak = 0 + s.slotPace.Add(int64( + -currentSlotPace * s.config.SlotChangeRatePerStreak * decreaseStreak, + )) + + // Enforce minimum. + if s.slotPace.Load() < s.config.MinSlotPace { + s.slotPace.Store(s.config.MinSlotPace) + decreaseStreak = 0 + } + + // Count one-streaks. + if decreaseStreak == 1 { + oneStreaks++ + } else { + oneStreaks = 0 + } + + // Debug logging: + // fmt.Printf("--- slot pace: %.0f (current raw pace: %.0f, decreaseStreak: %.0f, clearanceUpTo: %d)\n", currentSlotPace, newRawSlotPace, decreaseStreak, s.clearanceUpTo.Load()) + } + + // Record Stats + // Add current pace to avg calculation. + s.avgPaceCnt.Add(1) + if s.avgPaceSum.Add(s.slotPace.Load()) < 0 { + // Reset if we wrap. + s.avgPaceCnt.Store(1) + s.avgPaceSum.Store(s.slotPace.Load()) + } + // Check if current pace is new leveled max + if oneStreaks >= 3 && s.slotPace.Load() > s.maxLeveledPace.Load() { + s.maxLeveledPace.Store(s.slotPace.Load()) + } - // Set new slot clearance. - // First, add current pace and paused units. - newClearance := s.slotPace.Load() + s.pausedUnits.Load() - // Second, subtract a fraction of the clearance for high priority units. - highPrio := s.highPrioUnits.Load() - if highPrio > newClearance/s.config.HighPriorityMaxReserveFraction { - newClearance -= newClearance / s.config.HighPriorityMaxReserveFraction - } else { - newClearance -= highPrio } - // Third, add finished to set new clearance limit. - s.clearanceUpTo.Store(epochBase + finishedAtEnd + newClearance) - // Lastly, save new clearance for comparison for next slot. - lastClearanceAmount = newClearance - - // Go to next slot. - finishedAtStart = finishedAtEnd - s.announceNextSlot() + // Switch to other slot-half. + halfSlotID++ // Check if we are stopping. select { diff --git a/unit/scheduler_stats.go b/unit/scheduler_stats.go index 431b7bd..f77b85f 100644 --- a/unit/scheduler_stats.go +++ b/unit/scheduler_stats.go @@ -1,69 +1,33 @@ package unit -// SchedulerState holds a snapshot of the current scheduler state. -type SchedulerState struct { - CurrentUnitID int64 - SlotPace int64 - ClearanceUpTo int64 - Finished int64 +// GetAvgSlotPace returns the current average slot pace. +func (s *Scheduler) GetAvgSlotPace() int64 { + // This is somewhat racy, as one value might already be updated with the + // latest slot data, while the other has been not. + // This is not so much of a problem, as slots are really short and the impact + // is very low. + cnt := s.avgPaceCnt.Load() + sum := s.avgPaceSum.Load() - Epoch int32 - FinishedInEpoch int64 - PausedUnitsInEpoch int64 - HighPrioUnitsInEpoch int64 + return sum / cnt } -// State returns the current internal state values of the scheduler. -func (s *Scheduler) State() *SchedulerState { - return &SchedulerState{ - CurrentUnitID: s.currentUnitID.Load(), - SlotPace: s.slotPace.Load(), - ClearanceUpTo: s.clearanceUpTo.Load(), - Finished: s.finishedTotal.Load(), - - Epoch: s.epoch.Load(), - FinishedInEpoch: s.finished.Load(), - PausedUnitsInEpoch: s.pausedUnits.Load(), - HighPrioUnitsInEpoch: s.highPrioUnits.Load(), - } -} - -// GetCurrentUnitID returns the current unit ID. -func (s *Scheduler) GetCurrentUnitID() int64 { - return s.currentUnitID.Load() -} - -// GetSlotPace returns the current slot pace. -func (s *Scheduler) GetSlotPace() int64 { - return s.slotPace.Load() -} - -// GetClearanceUpTo returns the current clearance limit. -func (s *Scheduler) GetClearanceUpTo() int64 { - return s.clearanceUpTo.Load() -} - -// GetFinished returns the current amount of finished units. -func (s *Scheduler) GetFinished() int64 { - return s.finishedTotal.Load() -} - -// GetEpoch returns the current epoch ID. -func (s *Scheduler) GetEpoch() int32 { - return s.epoch.Load() -} - -// GetFinishedInEpoch returns the current finished units within the current epoch. -func (s *Scheduler) GetFinishedInEpoch() int64 { - return s.finished.Load() +// ResetAvgSlotPace reset average slot pace values. +func (s *Scheduler) ResetAvgSlotPace() { + // This is somewhat racy, as one value might already be updated with the + // latest slot data, while the other has been not. + // This is not so much of a problem, as slots are really short and the impact + // is very low. + s.avgPaceCnt.Store(1) + s.avgPaceSum.Store(s.config.MinSlotPace) } -// GetPausedUnitsInEpoch returns the current paused units within the current epoch. -func (s *Scheduler) GetPausedUnitsInEpoch() int64 { - return s.pausedUnits.Load() +// GetMaxLeveledSlotPace returns the current maximum leveled slot pace. +func (s *Scheduler) GetMaxLeveledSlotPace() int64 { + return s.maxLeveledPace.Load() } -// GetHighPrioUnitsInEpoch returns the current high priority units within the current epoch. -func (s *Scheduler) GetHighPrioUnitsInEpoch() int64 { - return s.highPrioUnits.Load() +// ResetMaxLeveledSlotPace resets the maximum leveled slot pace value. +func (s *Scheduler) ResetMaxLeveledSlotPace() { + s.maxLeveledPace.Store(0) } diff --git a/unit/scheduler_test.go b/unit/scheduler_test.go index c924b69..3e3ec6b 100644 --- a/unit/scheduler_test.go +++ b/unit/scheduler_test.go @@ -9,9 +9,7 @@ func BenchmarkScheduler(b *testing.B) { workers := 10 // Create and start scheduler. - s := NewScheduler(&SchedulerConfig{ - EpochDuration: defaultSlotDuration * 10, - }) + s := NewScheduler(&SchedulerConfig{}) ctx, cancel := context.WithCancel(context.Background()) go func() { err := s.SlotScheduler(ctx) diff --git a/unit/unit.go b/unit/unit.go index 36b889b..9c35242 100644 --- a/unit/unit.go +++ b/unit/unit.go @@ -9,17 +9,14 @@ import ( type Unit struct { id int64 scheduler *Scheduler - epoch int32 finished abool.AtomicBool highPriority abool.AtomicBool - paused abool.AtomicBool } // NewUnit returns a new unit within the scheduler. func (s *Scheduler) NewUnit() *Unit { return &Unit{ id: s.currentUnitID.Add(1), - epoch: s.epoch.Load(), scheduler: s, } } @@ -36,9 +33,6 @@ func (u *Unit) ReUse() { // WaitForSlot blocks until the unit may be processed. func (u *Unit) WaitForSlot() { - // Unpause. - u.unpause() - // High priority units may always process. if u.highPriority.IsSet() { return @@ -67,65 +61,21 @@ func (u *Unit) Finish() { // Always increase finished, even if the unit is from a previous epoch. if u.finished.SetToIf(false, true) { - if u.epoch == u.scheduler.epoch.Load() { - // If in same epoch, finish in epoch. - u.scheduler.finished.Add(1) - } else { - // If not in same epoch, just increase the total counter. - u.scheduler.finishedTotal.Add(1) - } - } - u.RemovePriority() - u.unpause() -} - -// Pause signals the unit scheduler that this unit is paused and not being -// processed at the moment. May only be called if WaitForUnitSlot() was called -// at least once. -func (u *Unit) Pause() { - // Only change if within the origin epoch. - if u.epoch != u.scheduler.epoch.Load() { - return - } - - if u.finished.IsNotSet() && u.paused.SetToIf(false, true) { - u.scheduler.pausedUnits.Add(1) - - // Increase clearance by one if unit is paused, as now another unit can take its slot. - u.scheduler.clearanceUpTo.Add(1) - } -} - -// unpause signals the unit scheduler that this unit is not paused anymore and -// is now waiting for processing. -func (u *Unit) unpause() { - // Only change if within the origin epoch. - if u.epoch != u.scheduler.epoch.Load() { - return - } - - if u.paused.SetToIf(true, false) { - if u.scheduler.pausedUnits.Add(-1) < 0 { - // If we are within an epoch change, revert change and return. - u.scheduler.pausedUnits.Add(1) - return - } - - // Reduce clearance by one if paused unit is woken up, as the previously paused unit requires a slot. - // A paused unit is expected to already have been allowed to process once. - u.scheduler.clearanceUpTo.Add(-1) + u.scheduler.finished.Add(1) } } // MakeHighPriority marks the unit as high priority. func (u *Unit) MakeHighPriority() { - // Only change if within the origin epoch. - if u.epoch != u.scheduler.epoch.Load() { - return - } - - if u.finished.IsNotSet() && u.highPriority.SetToIf(false, true) { - u.scheduler.highPrioUnits.Add(1) + switch { + case u.finished.IsSet(): + // Unit is already finished. + case !u.highPriority.SetToIf(false, true): + // Unit is already set to high priority. + // Else: High Priority set. + case u.id > u.scheduler.clearanceUpTo.Load(): + // Unit is outside current clearance, reduce clearance by one. + u.scheduler.clearanceUpTo.Add(-1) } } @@ -136,15 +86,5 @@ func (u *Unit) IsHighPriority() bool { // RemovePriority removes the high priority mark. func (u *Unit) RemovePriority() { - // Only change if within the origin epoch. - if u.epoch != u.scheduler.epoch.Load() { - return - } - - if u.highPriority.SetToIf(true, false) { - if u.scheduler.highPrioUnits.Add(-1) < 0 { - // If we are within an epoch change, revert change. - u.scheduler.highPrioUnits.Add(1) - } - } + u.highPriority.UnSet() } diff --git a/unit/unit_debug.go b/unit/unit_debug.go index c00c1a0..b5ace77 100644 --- a/unit/unit_debug.go +++ b/unit/unit_debug.go @@ -66,6 +66,14 @@ func (s *Scheduler) debugStep() { } } - log.Debugf("scheduler state: %+v", s.State()) - log.Debugf("scheduler unit sources: %+v", sources) + // Print current state. + log.Debugf( + `scheduler: state: slotPace=%d avgPace=%d maxPace=%d currentUnitID=%d clearanceUpTo=%d`, + s.slotPace.Load(), + s.avgPaceSum.Load()/s.avgPaceCnt.Load(), + s.maxLeveledPace.Load(), + s.currentUnitID.Load(), + s.clearanceUpTo.Load(), + ) + log.Debugf("scheduler: unit sources: %+v", sources) } diff --git a/unit/unit_test.go b/unit/unit_test.go index 8c72d4c..30d74ee 100644 --- a/unit/unit_test.go +++ b/unit/unit_test.go @@ -19,9 +19,7 @@ func TestUnit(t *testing.T) { //nolint:paralleltest workers := 100 // Create and start scheduler. - s := NewScheduler(&SchedulerConfig{ - EpochDuration: defaultSlotDuration * 10, - }) + s := NewScheduler(&SchedulerConfig{}) ctx, cancel := context.WithCancel(context.Background()) go func() { err := s.SlotScheduler(ctx) @@ -31,36 +29,22 @@ func TestUnit(t *testing.T) { //nolint:paralleltest }() defer cancel() - // Create unit creation worker. - unitQ := make(chan *Unit, size/workers) - go func() { - for i := 0; i < size; i++ { - // Create new unit. - u := s.NewUnit() - - // Make 1% high priority. - if rand.Int()%100 == 0 { //nolint:gosec // This is a test. - u.MakeHighPriority() - } - - // Add to queue. - unitQ <- u - } - close(unitQ) - }() - // Create 10 workers. var wg sync.WaitGroup wg.Add(workers) + sizePerWorker := size / workers for i := 0; i < workers; i++ { go func() { - for u := range unitQ { - u.WaitForSlot() - u.Pause() + for i := 0; i < sizePerWorker; i++ { + u := s.NewUnit() - time.Sleep(1 * time.Microsecond) + // Make 1% high priority. + if rand.Int()%100 == 0 { //nolint:gosec // This is a test. + u.MakeHighPriority() + } u.WaitForSlot() + time.Sleep(10 * time.Microsecond) u.Finish() } wg.Done() @@ -71,37 +55,33 @@ func TestUnit(t *testing.T) { //nolint:paralleltest wg.Wait() // Wait for two slot durations for values to update. - time.Sleep(s.config.SlotDuration * 10) + time.Sleep(s.config.SlotDuration * 2) // Print current state. fmt.Printf(`scheduler state: currentUnitID = %d slotPace = %d clearanceUpTo = %d - finishedTotal = %d - - epoch = %d finished = %d - pausedUnits = %d - highPrioUnits = %d + avgPace = %d + maxPace = %d `, s.currentUnitID.Load(), s.slotPace.Load(), s.clearanceUpTo.Load(), - s.finishedTotal.Load(), - - s.epoch.Load(), s.finished.Load(), - s.pausedUnits.Load(), - s.highPrioUnits.Load(), + s.avgPaceSum.Load()/s.avgPaceCnt.Load(), + s.maxLeveledPace.Load(), ) // Check if everything seems good. assert.Equal(t, size, int(s.currentUnitID.Load()), "currentUnitID must match size") - assert.Equal(t, size, int(s.finishedTotal.Load()), "finishedTotal must match size") - assert.GreaterOrEqual(t, int(s.clearanceUpTo.Load()), size+int(s.config.MinSlotPace), "clearanceUpTo must be at least size+minSlotPace") - assert.Equal(t, 0, int(s.highPrioUnits.Load()), "high priority units must be zero when finished") - assert.Equal(t, 0, int(s.pausedUnits.Load()), "paused units must be zero when finished") + assert.GreaterOrEqual( + t, + int(s.clearanceUpTo.Load()), + size+int(float64(s.config.MinSlotPace)*s.config.SlotChangeRatePerStreak), + "clearanceUpTo must be at least size+minSlotPace", + ) // Shutdown cancel()