Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Simplify scheduling and other fixes #159

Merged
merged 12 commits into from
Mar 13, 2023
2 changes: 1 addition & 1 deletion captain/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
// prepBootstrapHubFlag checks the bootstrap-hub argument if it is valid.
func prepBootstrapHubFlag() error {
if bootstrapHubFlag != "" {
_, err := hub.ParseBootstrapHub(bootstrapHubFlag, conf.MainMapName)
_, _, _, err := hub.ParseBootstrapHub(bootstrapHubFlag)
return err
}
return nil
Expand Down
9 changes: 9 additions & 0 deletions cmds/hub/pack
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,27 @@ function check_all {
GOOS=linux GOARCH=amd64 check
GOOS=windows GOARCH=amd64 check
GOOS=darwin GOARCH=amd64 check
GOOS=linux GOARCH=arm64 check
GOOS=windows GOARCH=arm64 check
GOOS=darwin GOARCH=arm64 check
}

function build_all {
GOOS=linux GOARCH=amd64 build
GOOS=windows GOARCH=amd64 build
GOOS=darwin GOARCH=amd64 build
GOOS=linux GOARCH=arm64 build
GOOS=windows GOARCH=arm64 build
GOOS=darwin GOARCH=arm64 build
}

function reset_all {
GOOS=linux GOARCH=amd64 reset
GOOS=windows GOARCH=amd64 reset
GOOS=darwin GOARCH=amd64 reset
GOOS=linux GOARCH=arm64 reset
GOOS=windows GOARCH=arm64 reset
GOOS=darwin GOARCH=arm64 reset
}

case $1 in
Expand Down
33 changes: 12 additions & 21 deletions crew/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -122,27 +123,16 @@ func (t *Tunnel) establish(ctx context.Context) (err error) {
case sticksTo.Avoid:
log.Tracer(ctx).Tracef("spn/crew: avoiding %s", sticksTo.Pin.Hub)

// Build avoid policy.
avoidPolicy := make([]endpoints.Endpoint, 0, 2)
// Exclude countries of the hub to be avoided.
// This helps to select a destination hub that is more different than say,
// the other hub in the same datacenter as the one to be avoided.
if sticksTo.Pin.LocationV4 != nil &&
sticksTo.Pin.LocationV4.Country.ISOCode != "" {
avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{
Country: sticksTo.Pin.LocationV4.Country.ISOCode,
})
log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv4 location", sticksTo.Pin.LocationV4.Country.ISOCode)
}
if sticksTo.Pin.LocationV6 != nil &&
sticksTo.Pin.LocationV6.Country.ISOCode != "" {
avoidPolicy = append(avoidPolicy, &endpoints.EndpointCountry{
Country: sticksTo.Pin.LocationV6.Country.ISOCode,
})
log.Tracer(ctx).Tracef("spn/crew: avoiding country %s via IPv6 location", sticksTo.Pin.LocationV6.Country.ISOCode)
// Avoid this Hub.
// TODO: Remember more than one hub to avoid.
avoidPolicy := []endpoints.Endpoint{
&endpoints.EndpointDomain{
OriginalValue: sticksTo.Pin.Hub.ID,
Domain: strings.ToLower(sticksTo.Pin.Hub.ID) + ".",
},
}

// Append to policies
// Append to policies.
t.connInfo.TunnelOpts.HubPolicies = append(t.connInfo.TunnelOpts.HubPolicies, avoidPolicy)

default:
Expand Down Expand Up @@ -314,7 +304,7 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal
return nil, nil, tErr.Wrap("failed to authenticate to %s: %w", check.pin.Hub, tErr)
}

case <-time.After(3 * time.Second):
case <-time.After(10 * time.Second):
// Mark as failing for just a minute, until server load may be less.
check.pin.MarkAsFailingFor(1 * time.Minute)
log.Warningf("spn/crew: auth to %s timed out", check.pin.Hub)
Expand All @@ -338,13 +328,14 @@ func establishRoute(route *navigator.Route) (dstPin *navigator.Pin, dstTerminal
!tErr.Is(terminal.ErrUnknownOperationType) { // TODO: remove workaround until all servers have this upgrade
// Mark as failing long enough to expire connections and session and shutdown connections.
// TODO: Should we forcibly disconnect instead?
// TODO: This might also be triggered if a relay fails and ends the operation.
check.pin.MarkAsFailingFor(7 * time.Minute)
log.Warningf("spn/crew: failed to check reachability of %s: %s", check.pin.Hub, tErr)

return nil, nil, tErr.Wrap("failed to check reachability of %s: %w", check.pin.Hub, tErr)
}

case <-time.After(3 * time.Second):
case <-time.After(10 * time.Second):
// Mark as failing for just a minute, until server load may be less.
check.pin.MarkAsFailingFor(1 * time.Minute)
log.Warningf("spn/crew: reachability check to %s timed out", check.pin.Hub)
Expand Down
3 changes: 2 additions & 1 deletion crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro
// If the op was ended locally, send all data before closing.
// If the op was ended remotely, don't bother sending remaining data.
if !err.IsExternal() {
op.dfq.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
op.dfq.Flush(5 * time.Minute)
}

// If the op was ended remotely, write all remaining received data.
Expand Down
10 changes: 5 additions & 5 deletions docks/crane.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ func (crane *Crane) AbandonTerminal(id uint32, err *terminal.Error) {
}

func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Duration) *terminal.Error {
msg.Unit.Pause()

select {
case crane.controllerMsgs <- msg:
return nil
Expand All @@ -376,8 +374,6 @@ func (crane *Crane) sendImportantTerminalMsg(msg *terminal.Msg, timeout time.Dur

// Send is used by others to send a message through the crane.
func (crane *Crane) Send(msg *terminal.Msg, timeout time.Duration) *terminal.Error {
msg.Unit.Pause()

select {
case crane.terminalMsgs <- msg:
return nil
Expand Down Expand Up @@ -456,7 +452,11 @@ func (crane *Crane) unloader(workerCtx context.Context) error {
crane.Stop(terminal.ErrMalformedData.With("failed to get container length: %w", err))
return nil
}
if containerLen > maxUnloadSize {
switch {
case containerLen <= 0:
crane.Stop(terminal.ErrMalformedData.With("received empty container with length %d", containerLen))
return nil
case containerLen > maxUnloadSize:
crane.Stop(terminal.ErrMalformedData.With("received oversized container with length %d", containerLen))
return nil
}
Expand Down
20 changes: 7 additions & 13 deletions docks/op_expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,27 @@ func (t *ExpansionRelayTerminal) Ctx() context.Context {

// Deliver delivers a message to the relay operation.
func (op *ExpandOp) Deliver(msg *terminal.Msg) *terminal.Error {
// Pause unit before handing away.
msg.Unit.Pause()

return op.deliverProxy(msg)
}

// Deliver delivers a message to the relay terminal.
func (t *ExpansionRelayTerminal) Deliver(msg *terminal.Msg) *terminal.Error {
// Pause unit before handing away.
msg.Unit.Pause()

return t.deliverProxy(msg)
}

// Flush writes all data in the queues.
func (op *ExpandOp) Flush() {
if op.flowControl != nil {
op.flowControl.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
op.flowControl.Flush(5 * time.Minute)
}
}

// Flush writes all data in the queues.
func (t *ExpansionRelayTerminal) Flush() {
if t.flowControl != nil {
t.flowControl.Flush()
// Flushing could mean sending a full buffer of 50000 packets.
t.flowControl.Flush(5 * time.Minute)
}
}

Expand Down Expand Up @@ -281,14 +277,13 @@ func (op *ExpandOp) forwardHandler(_ context.Context) error {
// Debugging:
// log.Debugf("spn/testing: forwarding at %s: %s", op.FmtID(), spew.Sdump(c.CompileData()))

// Count relayed data for metrics.
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Wait for processing slot.
msg.Unit.WaitForSlot()

// Count relayed data for metrics.
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Receive data from the origin and forward it to the relay.
msg.Unit.Pause()
op.relayTerminal.sendProxy(msg, 1*time.Minute)

case <-op.ctx.Done():
Expand All @@ -311,7 +306,6 @@ func (op *ExpandOp) backwardHandler(_ context.Context) error {
atomic.AddUint64(op.dataRelayed, uint64(msg.Data.Length()))

// Receive data from the relay and forward it to the origin.
msg.Unit.Pause()
op.sendProxy(msg, 1*time.Minute)

case <-op.ctx.Done():
Expand Down
32 changes: 7 additions & 25 deletions hub/intel.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,29 @@ func (i *Intel) ParseAdvisories() (err error) {
}

// ParseBootstrapHub parses a bootstrap hub.
func ParseBootstrapHub(bootstrapTransport string, mapName string) (*Hub, error) {
func ParseBootstrapHub(bootstrapTransport string) (t *Transport, hubID string, hubIP net.IP, err error) {
// Parse transport and check Hub ID.
t, err := ParseTransport(bootstrapTransport)
t, err = ParseTransport(bootstrapTransport)
if err != nil {
return nil, fmt.Errorf("failed to parse transport: %w", err)
return nil, "", nil, fmt.Errorf("failed to parse transport: %w", err)
}
if t.Option == "" {
return nil, errors.New("missing hub ID in URL fragment")
return nil, "", nil, errors.New("missing hub ID in URL fragment")
}
if _, err := lhash.FromBase58(t.Option); err != nil {
return nil, fmt.Errorf("hub ID is invalid: %w", err)
return nil, "", nil, fmt.Errorf("hub ID is invalid: %w", err)
}

// Parse IP address from transport.
ip := net.ParseIP(t.Domain)
if ip == nil {
return nil, errors.New("invalid IP address (domains are not supported for bootstrapping)")
return nil, "", nil, errors.New("invalid IP address (domains are not supported for bootstrapping)")
}

// Clean up transport for hub info.
id := t.Option
t.Domain = ""
t.Option = ""

// Create bootstrap hub.
bootstrapHub := &Hub{
ID: id,
Map: mapName,
Info: &Announcement{
ID: id,
Transports: []string{t.String()},
},
Status: &Status{},
}

// Set IP address.
if ip4 := ip.To4(); ip4 != nil {
bootstrapHub.Info.IPv4 = ip4
} else {
bootstrapHub.Info.IPv6 = ip
}

return bootstrapHub, nil
return t, id, ip, nil
}
16 changes: 8 additions & 8 deletions navigator/measurements.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (

// Measurements Configuration.
const (
NavigatorMeasurementTTLDefault = 2 * time.Hour
NavigatorMeasurementTTLByCostBase = 3 * time.Minute
NavigatorMeasurementTTLByCostMin = 2 * time.Hour
NavigatorMeasurementTTLByCostMax = 30 * time.Hour
NavigatorMeasurementTTLDefault = 4 * time.Hour
NavigatorMeasurementTTLByCostBase = 6 * time.Minute
NavigatorMeasurementTTLByCostMin = 4 * time.Hour
NavigatorMeasurementTTLByCostMax = 50 * time.Hour

// With a base TTL of 3m, this leads to:
// 20c -> 1h -> raised to 2h.
// 50c -> 2h30m
// 100c -> 5h
// 1000c -> 50h -> capped to 30h.
// 20c -> 2h -> raised to 4h.
// 50c -> 5h
// 100c -> 10h
// 1000c -> 100h -> capped to 50h.
)

func (m *Map) measureHubs(ctx context.Context, _ *modules.Task) error {
Expand Down
49 changes: 39 additions & 10 deletions navigator/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package navigator

import (
"context"
"errors"
"fmt"
"path"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portbase/utils"
"github.com/safing/portmaster/intel/geoip"
"github.com/safing/portmaster/netenv"
"github.com/safing/spn/hub"
Expand Down Expand Up @@ -481,22 +481,51 @@ func (m *Map) addBootstrapHubs(bootstrapTransports []string) error {

func (m *Map) addBootstrapHub(bootstrapTransport string) error {
// Parse bootstrap hub.
bootstrapHub, err := hub.ParseBootstrapHub(bootstrapTransport, m.Name)
transport, hubID, hubIP, err := hub.ParseBootstrapHub(bootstrapTransport)
if err != nil {
return fmt.Errorf("invalid bootstrap hub: %w", err)
}

// Check if hub already exists.
_, err = hub.GetHub(bootstrapHub.Map, bootstrapHub.ID)
if err == nil {
return nil
var h *hub.Hub
pin, ok := m.all[hubID]
if ok {
h = pin.Hub
} else {
h = &hub.Hub{
ID: hubID,
Map: m.Name,
Info: &hub.Announcement{
ID: hubID,
},
Status: &hub.Status{},
FirstSeen: time.Now(), // Do not garbage collect bootstrap hubs.
}
}
if !errors.Is(err, database.ErrNotFound) {
return err

// Add IP if it does not yet exist.
if hubIP4 := hubIP.To4(); hubIP4 != nil {
if h.Info.IPv4 == nil {
h.Info.IPv4 = hubIP4
} else if !h.Info.IPv4.Equal(hubIP4) {
return fmt.Errorf("additional bootstrap entry with same ID but mismatching IP address: %s", hubIP)
}
} else {
if h.Info.IPv6 == nil {
h.Info.IPv6 = hubIP
} else if !h.Info.IPv6.Equal(hubIP) {
return fmt.Errorf("additional bootstrap entry with same ID but mismatching IP address: %s", hubIP)
}
}

// Add transport if it does not yet exist.
t := transport.String()
if !utils.StringInSlice(h.Info.Transports, t) {
h.Info.Transports = append(h.Info.Transports, t)
}

// Add to map for bootstrapping.
m.updateHub(bootstrapHub, false, false)
log.Infof("spn/navigator: added bootstrap %s to map %s", bootstrapHub, m.Name)
// Add/update to map for bootstrapping.
m.updateHub(h, false, false)
log.Infof("spn/navigator: added/updated bootstrap %s to map %s", h, m.Name)
return nil
}
Loading