Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Simplify scheduling and other fixes #159

Merged
merged 12 commits into from
Mar 13, 2023
2 changes: 1 addition & 1 deletion captain/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
// prepBootstrapHubFlag checks the bootstrap-hub argument if it is valid.
func prepBootstrapHubFlag() error {
if bootstrapHubFlag != "" {
_, err := hub.ParseBootstrapHub(bootstrapHubFlag, conf.MainMapName)
_, _, _, err := hub.ParseBootstrapHub(bootstrapHubFlag)
return err
}
return nil
Expand Down
9 changes: 9 additions & 0 deletions cmds/hub/pack
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,27 @@ function check_all {
GOOS=linux GOARCH=amd64 check
GOOS=windows GOARCH=amd64 check
GOOS=darwin GOARCH=amd64 check
GOOS=linux GOARCH=arm64 check
GOOS=windows GOARCH=arm64 check
GOOS=darwin GOARCH=arm64 check
}

function build_all {
GOOS=linux GOARCH=amd64 build
GOOS=windows GOARCH=amd64 build
GOOS=darwin GOARCH=amd64 build
GOOS=linux GOARCH=arm64 build
GOOS=windows GOARCH=arm64 build
GOOS=darwin GOARCH=arm64 build
}

function reset_all {
GOOS=linux GOARCH=amd64 reset
GOOS=windows GOARCH=amd64 reset
GOOS=darwin GOARCH=amd64 reset
GOOS=linux GOARCH=arm64 reset
GOOS=windows GOARCH=arm64 reset
GOOS=darwin GOARCH=arm64 reset
}

case $1 in
Expand Down
33 changes: 12 additions & 21 deletions crew/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -122,27 +123,16 @@ func (t *Tunnel) establish(ctx context.Context) (err error) {
case sticksTo.Avoid:
log.Tracer(ctx).Tracef("spn/crew: avoiding %s", sticksTo.Pin.Hub)

// Build avoid policy.
avoidPolicy := make([]endpoints.Endpoint, 0, 2)
// Exclude countries of the hub to be avoided.
// This helps to select a destination hub that is more different than say,
// the other hub in the same datacenter as the one to be avoided.
if sticksTo.Pin.LocationV4 != nil &&
sticksTo.Pin.LocationV4.Country.ISOCode != "" {
avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{
Country: sticksTo.Pin.LocationV4.Country.ISOCode,
})
log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv4 location", sticksTo.Pin.LocationV4.Country.ISOCode)
}
if sticksTo.Pin.LocationV6 != nil &&
sticksTo.Pin.LocationV6.Country.ISOCode != "" {
avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{
Country: sticksTo.Pin.LocationV6.Country.ISOCode,
})
log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv6 location", sticksTo.Pin.LocationV6.Country.ISOCode)
// Avoid this Hub.
// TODO: Remember more than one hub to avoid.
avoidPolicy := []endpoints.Endpoint{
&endpoints.EndpointDomain{
OriginalValue: sticksTo.Pin.Hub.ID,
Domain: strings.ToLower(sticksTo.Pin.Hub.ID) + ".",
},
}

// Append to policies
// Append to policies.
t.connInfo.TunnelOpts.HubPolicies = append(t.connInfo.TunnelOpts.HubPolicies, avoidPolicy)

default:
Expand Down Expand Up @@ -314,7 +304,7 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal
return nil, nil, tErr.Wrap("failed to authenticate to %s: %w", check.pin.Hub, tErr)
}

case <-time.After(3 * time.Second):
case <-time.After(10 * time.Second):
// Mark as failing for just a minute, until server load may be less.
check.pin.MarkAsFailingFor(1 * time.Minute)
log.Warningf("spn/crew: auth to %s timed out", check.pin.Hub)
Expand All @@ -338,13 +328,14 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal
!tErr.Is(terminal.ErrUnknownOperationType) { // TODO: remove workaround until all servers have this upgrade
// Mark as failing long enough to expire connections and session and shutdown connections.
// TODO: Should we forcibly disconnect instead?
// TODO: This might also be triggered if a relay fails and ends the operation.
check.pin.MarkAsFailingFor(7 * time.Minute)
log.Warningf("spn/crew: failed to check reachability of %s: %s", check.pin.Hub, tErr)

return nil, nil, tErr.Wrap("failed to check reachability of %s: %w", check.pin.Hub, tErr)
}

case <-time.After(3 * time.Second):
case <-time.After(10 * time.Second):
// Mark as failing for just a minute, until server load may be less.
check.pin.MarkAsFailingFor(1 * time.Minute)
log.Warningf("spn/crew: reachability check to %s timed out", check.pin.Hub)
Expand Down
3 changes: 2 additions & 1 deletion crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro
// If the op was ended locally, send all data before closing.
// If the op was ended remotely, don't bother sending remaining data.
if !err.IsExternal() {
op.dfq.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
op.dfq.Flush(5 * time.Minute)
}

// If the op was ended remotely, write all remaining received data.
Expand Down
10 changes: 5 additions & 5 deletions docks/crane.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ func (crane *Crane) AbandonTerminal(id uint32, err *terminal.Error) {
}

func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Duration) *terminal.Error {
msg.Unit.Pause()

select {
case crane.controllerMsgs <- msg:
return nil
Expand All @@ -376,8 +374,6 @@ func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Dur

// Send is used by others to send a message through the crane.
func (crane *Crane) Send(msg *terminal.Msg, timeout time.Duration) *terminal.Error {
msg.Unit.Pause()

select {
case crane.terminalMsgs <- msg:
return nil
Expand Down Expand Up @@ -456,7 +452,11 @@ func (crane *Crane) unloader(workerCtx context.Context) error {
crane.Stop(terminal.ErrMalformedData.With("failed to get container length: %w", err))
return nil
}
if containerLen > maxUnloadSize {
switch {
case containerLen <= 0:
crane.Stop(terminal.ErrMalformedData.With("received empty container with length %d", containerLen))
return nil
case containerLen > maxUnloadSize:
crane.Stop(terminal.ErrMalformedData.With("received oversized container with length %d", containerLen))
return nil
}
Expand Down
20 changes: 7 additions & 13 deletions docks/op_expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,27 @@ func (t *ExpansionRelayTerminal) Ctx() context.Context {

// Deliver delivers a message to the relay operation.
func (op *ExpandOp) Deliver(msg *terminal.Msg) *terminal.Error {
// Pause unit before handing away.
msg.Unit.Pause()

return op.deliverProxy(msg)
}

// Deliver delivers a message to the relay terminal.
func (t *ExpansionRelayTerminal) Deliver(msg *terminal.Msg) *terminal.Error {
// Pause unit before handing away.
msg.Unit.Pause()

return t.deliverProxy(msg)
}

// Flush writes all data in the queues.
func (op *ExpandOp) Flush() {
if op.flowControl != nil {
op.flowControl.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
op.flowControl.Flush(5 * time.Minute)
}
}

// Flush writes all data in the queues.
func (t *ExpansionRelayTerminal) Flush() {
if t.flowControl != nil {
t.flowControl.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
t.flowControl.Flush(5 * time.Minute)
}
}

Expand Down Expand Up @@ -281,14 +277,13 @@ func (op *ExpandOp) forwardHandler(_ context.Context) error {
// Debugging:
// log.Debugf("spn/testing: forwarding at %s: %s", op.FmtID(), spew.Sdump(c.CompileData()))

// Count relayed data for metrics.
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Wait for processing slot.
msg.Unit.WaitForSlot()

// Count relayed data for metrics.
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Receive data from the origin and forward it to the relay.
msg.Unit.Pause()
op.relayTerminal.sendProxy(msg, 1*time.Minute)

case <-op.ctx.Done():
Expand All @@ -311,7 +306,6 @@ func (op *ExpandOp) backwardHandler(_ context.Context) error {
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Receive data from the relay and forward it to the origin.
msg.Unit.Pause()
op.sendProxy(msg, 1*time.Minute)

case <-op.ctx.Done():
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ require (
github.com/rot256/pblind v0.0.0-20211117203330-22455f90b565
github.com/safing/jess v0.3.1
github.com/safing/portbase v0.16.4
github.com/safing/portmaster v1.0.6
github.com/safing/portmaster v1.0.7
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
github.com/tevino/abool v1.2.0
golang.org/x/net v0.7.0
golang.org/x/net v0.8.0
)

require (
Expand All @@ -37,7 +37,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/miekg/dns v1.1.52 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/oschwald/maxminddb-golang v1.10.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -62,11 +62,11 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.7.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading