Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
Add http pier, http info page and revamp docking
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Sep 18, 2023
1 parent edb32a9 commit 059d26e
Show file tree
Hide file tree
Showing 19 changed files with 785 additions and 167 deletions.
13 changes: 7 additions & 6 deletions cabin/config-public.go
Expand Up @@ -217,12 +217,13 @@ func prepPublicHubConfig() error {
publicCfgOptionIPv6 = config.GetAsString(publicCfgOptionIPv6Key, publicCfgOptionIPv6Default)

err = config.Register(&config.Option{
Name: "Transports",
Key: publicCfgOptionTransportsKey,
Description: "List of transports this Hub supports.",
OptType: config.OptTypeStringArray,
ExpertiseLevel: config.ExpertiseLevelExpert,
DefaultValue: publicCfgOptionTransportsDefault,
Name: "Transports",
Key: publicCfgOptionTransportsKey,
Description: "List of transports this Hub supports.",
OptType: config.OptTypeStringArray,
ExpertiseLevel: config.ExpertiseLevelExpert,
RequiresRestart: true,
DefaultValue: publicCfgOptionTransportsDefault,
Annotations: config.Annotations{
config.DisplayOrderAnnotation: publicCfgOptionTransportsOrder,
},
Expand Down
9 changes: 7 additions & 2 deletions captain/module.go
Expand Up @@ -30,7 +30,7 @@ var module *modules.Module
const SPNConnectedEvent = "spn connect"

func init() {
module = modules.Register("captain", prep, start, stop, "base", "terminal", "cabin", "docks", "crew", "navigator", "sluice", "patrol", "netenv")
module = modules.Register("captain", prep, start, stop, "base", "terminal", "cabin", "ships", "docks", "crew", "navigator", "sluice", "patrol", "netenv")
module.RegisterEvent(SPNConnectedEvent, false)
subsystems.Register(
"spn",
Expand Down Expand Up @@ -130,7 +130,10 @@ func start() error {
if err := prepPublicIdentityMgmt(); err != nil {
return err
}
if err := startPierMgmt(); err != nil {
// Set ID to display on http info page.
ships.DisplayHubID = publicIdentity.ID
// Start listeners.
if err := startPiers(); err != nil {
return err
}

Expand Down Expand Up @@ -174,6 +177,8 @@ func stop() error {
// Send shutdown status message.
if conf.PublicHub() {
publishShutdownStatus()
stopPiers()
closePendingDockingRequests()
}

return nil
Expand Down
95 changes: 39 additions & 56 deletions captain/piers.go
Expand Up @@ -2,11 +2,10 @@ package captain

import (
"context"
"errors"
"fmt"
"sync"

"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
"github.com/safing/portmaster/intel"
"github.com/safing/portmaster/network/netutils"
"github.com/safing/portmaster/profile/endpoints"
Expand All @@ -16,88 +15,72 @@ import (
)

var (
managePiersTask *modules.Task
pierMgmtLock sync.Mutex
pierMgmtCycleID int

dockingRequests = make(chan *ships.DockingRequest, 10)
dockingRequests = make(chan ships.Ship, 100)
piers []ships.Pier
)

func startPierMgmt() error {
managePiersTask = module.NewTask(
"manage piers",
managePiers,
)

module.StartServiceWorker("docking request handler", 0, dockingRequestHandler)

err := managePiers(module.Ctx, managePiersTask)
if err != nil {
log.Warningf("spn/captain: failed to initialize piers: %s", err)
}

return nil
}

func managePiers(ctx context.Context, task *modules.Task) error {
pierMgmtLock.Lock()
defer pierMgmtLock.Unlock()

// TODO: do proper management (this is a workaround for now)
if pierMgmtCycleID > 0 {
return nil
func startPiers() error {
// Get and check transports.
transports := publicIdentity.Hub.Info.Transports
if len(transports) == 0 {
return errors.New("no transports defined")
}
pierMgmtCycleID = 1

for _, t := range publicIdentity.Hub.Info.Transports {
piers = make([]ships.Pier, 0, len(transports))
for _, t := range transports {
// Parse transport.
transport, err := hub.ParseTransport(t)
if err != nil {
log.Warningf("spn/captain: cannot build pier for invalid transport %q: %s", t, err)
continue
return fmt.Errorf("cannot build pier for invalid transport %q: %w", t, err)
}

// create listener
// Establish pier / listener.
pier, err := ships.EstablishPier(transport, dockingRequests)
if err != nil {
log.Warningf("spn/captin: failed to establish pier for transport %q: %s", t, err)
continue
return fmt.Errorf("failed to establish pier for transport %q: %w", t, err)
}
log.Infof("spn/captain: pier for transport %q built", t)

// start accepting connections
module.StartWorker("pier docking", pier.Docking)
piers = append(piers, pier)
log.Infof("spn/captain: pier for transport %q built", t)
}

// TODO:
// task.Schedule(5 * time.Minute)
// Start worker to handle docking requests.
module.StartServiceWorker("docking request handler", 0, dockingRequestHandler)

return nil
}

func stopPiers() {
for _, pier := range piers {
pier.Abolish()
}
}

func dockingRequestHandler(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case r := <-dockingRequests:
switch {
case r.Err != nil:
// TODO: Restart pier?
// TODO: Do actual pier management.
log.Errorf("spn/captain: pier %s failed: %s", r.Pier.Transport(), r.Err)
case r.Ship != nil:
if err := checkDockingPermission(ctx, r.Ship); err != nil {
log.Warningf("spn/captain: denied ship from %s to dock at pier %s: %s", r.Ship.RemoteAddr(), r.Pier.Transport(), err)
} else {
handleDockingRequest(r.Ship)
}
default:
log.Warningf("spn/captain: received invalid docking request without ship for pier %s", r.Pier.Transport())
case ship := <-dockingRequests:
if ship == nil {
return errors.New("received nil ship")
}

if err := checkDockingPermission(ctx, ship); err != nil {
log.Warningf("spn/captain: denied ship from %s to dock at pier %s: %s", ship.RemoteAddr(), ship.Transport().String(), err)
} else {
handleDockingRequest(ship)
}
}
}
}

func closePendingDockingRequests() {
for ship := range dockingRequests {
ship.Sink()
}
}

func checkDockingPermission(ctx context.Context, ship ships.Ship) error {
remoteIP, remotePort, err := netutils.IPPortFromAddr(ship.RemoteAddr())
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions captain/public.go
Expand Up @@ -140,11 +140,6 @@ func maintainPublicIdentity(ctx context.Context, task *modules.Task) error {
// forward to other connected Hubs
gossipRelayMsg("", GossipHubAnnouncementMsg, announcementData)

// manage docks in order to react to possibly changed transports
if managePiersTask != nil {
managePiersTask.Queue()
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions docks/crane.go
Expand Up @@ -263,8 +263,7 @@ func (crane *Crane) RemoteAddr() net.Addr {

// Transport returns ship's transport.
func (crane *Crane) Transport() *hub.Transport {
t := crane.ship.Transport()
return &t
return crane.ship.Transport()
}

func (crane *Crane) getNextTerminalID() uint32 {
Expand Down
31 changes: 7 additions & 24 deletions ships/connection_test.go
Expand Up @@ -41,27 +41,20 @@ func TestConnections(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
ctx := context.Background()
ctx, cancelCtx := context.WithCancel(context.Background())

// docking requests
requests := make(chan *DockingRequest, 1)
dockingRequests := make(chan Ship, 1)
transport := &hub.Transport{
Protocol: protocol,
Port: getTestPort(),
}

// create listener
pier, err := builder.EstablishPier(transport, requests)
pier, err := builder.EstablishPier(transport, dockingRequests)
if err != nil {
t.Fatal(err)
}
wg.Add(1)
var dockingErr error
go func() {
err := pier.Docking(ctx)
dockingErr = err
wg.Done()
}()

// connect to listener
ship, err := builder.LaunchShip(ctx, transport, localhost)
Expand All @@ -76,11 +69,10 @@ func TestConnections(t *testing.T) {
}

// dock client
request := <-requests
if request.Err != nil {
t.Fatalf("%s failed to dock: %s", request.Pier, request.Err)
srvShip := <-dockingRequests
if srvShip == nil {
t.Fatalf("%s failed to dock", pier)
}
srvShip := request.Ship

// server recv
buf := getTestBuf()
Expand Down Expand Up @@ -129,20 +121,11 @@ func TestConnections(t *testing.T) {
fmt.Print(".")
}

// Check for docking error.
if dockingErr != nil {
t.Fatal(err)
}

ship.Sink()
srvShip.Sink()
pier.Abolish()
cancelCtx()
wg.Wait() // wait for docking procedure to end

// Check for docking error again.
if dockingErr != nil {
t.Fatal(err)
}
})
}
}

0 comments on commit 059d26e

Please sign in to comment.