Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove stcpr heartbeat #888

Merged
merged 13 commits into from
Sep 27, 2021
5 changes: 2 additions & 3 deletions cmd/apps/skychat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ skychat app for skywire visor
package main

import (
"embed"
"encoding/json"
"flag"
"fmt"
"io/fs"
"net"
"net/http"
"os"
"sync"
"time"

"embed"
"io/fs"

"github.com/skycoin/dmsg/buildinfo"
"github.com/skycoin/dmsg/cipher"

Expand Down
1 change: 1 addition & 0 deletions cmd/skywire-visor/commands/systray.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package commands

import (
"context"

"github.com/getlantern/systray"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5A
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -309,8 +308,6 @@ github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5k
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/skycoin/dmsg v0.0.0-20210827120633-3d20b41d46a2 h1:Dlf/sDocfSgjP+ipVxzOtDVkkLN1u6ZqUyXzp22AkU4=
github.com/skycoin/dmsg v0.0.0-20210827120633-3d20b41d46a2/go.mod h1:XguFKwECpSMq+/AKv8TCTsRlCEHSEIbqoaxOyceK2Ys=
github.com/skycoin/dmsg v0.0.0-20210915195912-2f9b055f39fe h1:3mNjtnypa8DC4kNiuLOJ1mqMdXPIAI6Se4k8fNNU2P0=
github.com/skycoin/dmsg v0.0.0-20210915195912-2f9b055f39fe/go.mod h1:qs+tELY7/gHRHqCK0iPp62BuYlu10OcV5zN0lpa1Scc=
github.com/skycoin/noise v0.0.0-20180327030543-2492fe189ae6 h1:1Nc5EBY6pjfw1kwW0duwyG+7WliWz5u9kgk1h5MnLuA=
Expand Down
2 changes: 1 addition & 1 deletion internal/gui/gui_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
pngIconPath = "/Applications/Skywire.app/Contents/Resources/icon.png"
iconPath = "/Applications/Skywire.app/Contents/Resources/tray_icon.tiff"
deinstallerPath = "/Applications/Skywire.app/Contents/deinstaller"
appPath = "/Applications/Skywire.app"
appPath = "/Applications/Skywire.app"
)

func preReadIcon() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/skyenv/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
TestRouteFinderAddr = "http://routefinder.skywire.cc"
TestUptimeTrackerAddr = "http://uptime.tracker.skywire.cc"
TestAddressResolverAddr = "http://address.resolver.skywire.cc"
TestSetupPK = "026c5a07de617c5c488195b76e8671bf9e7ee654d0633933e202af9e111ffa358d"
TestSetupPK = "026c2a3e92d6253c5abd71a42628db6fca9dd9aa037ab6f4e3a31108558dfd87cf"
jdknives marked this conversation as resolved.
Show resolved Hide resolved
)

// Dmsg port constants.
Expand Down
4 changes: 4 additions & 0 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,10 @@ func (tm *Manager) Close() {
tm.Logger.WithError(err).Warnf("Failed to close %s client", client.Type())
}
}
err := tm.arClient.Close()
if err != nil {
tm.Logger.WithError(err).Warnf("Failed to close arClient")
}
tm.wg.Wait()
close(tm.readCh)
}
Expand Down
126 changes: 94 additions & 32 deletions pkg/transport/network/addrresolver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"net/http"
"net/url"
"sync"
"time"

"github.com/AudriusButkevicius/pfilter"
Expand All @@ -27,14 +28,14 @@ import (

const (
// sudphPriority is used to set an order how connection filters apply.
sudphPriority = 1
stcprBindPath = "/bind/stcpr"
stcprHeartbeatPath = "/heartbeat/stcpr"
stcprKeepHeartbeatInterval = 300 * time.Second
addrChSize = 1024
udpKeepHeartbeatInterval = 10 * time.Second
udpKeepHeartbeatMessage = "heartbeat"
defaultUDPPort = "30178"
sudphPriority = 1
stcprBindPath = "/bind/stcpr"
addrChSize = 1024
udpKeepHeartbeatInterval = 10 * time.Second
udpKeepHeartbeatMessage = "heartbeat"
defaultUDPPort = "30178"
// UDPDelBindMessage is used as a deletebind packet on visor shutdown.
UDPDelBindMessage = "delBind"
)

var (
Expand All @@ -53,11 +54,11 @@ type Error struct {

// APIClient implements address resolver API client.
type APIClient interface {
io.Closer
BindSTCPR(ctx context.Context, port string) error
BindSUDPH(filter *pfilter.PacketFilter, handshake Handshake) (<-chan RemoteVisor, error)
Resolve(ctx context.Context, netType string, pk cipher.PubKey) (VisorData, error)
Health(ctx context.Context) (int, error)
Close() error
}

// VisorData stores visor data.
Expand All @@ -78,6 +79,7 @@ type httpClient struct {
sudphConn net.PacketConn
ready chan struct{}
closed chan struct{}
delBindSudphWg sync.WaitGroup
}

// NewHTTP creates a new client setting a public key to the client to be used for auth.
Expand Down Expand Up @@ -174,6 +176,25 @@ func (c *httpClient) Post(ctx context.Context, path string, payload interface{})
return c.httpClient.Do(req.WithContext(ctx))
}

// Delete performs a DELETE request.
func (c *httpClient) Delete(ctx context.Context, path string) (*http.Response, error) {
<-c.ready
var payload struct{}
body := bytes.NewBuffer(nil)
if err := json.NewEncoder(body).Encode(payload); err != nil {
return nil, err
}

addr := c.httpClient.Addr() + path

req, err := http.NewRequest(http.MethodDelete, addr, body)
if err != nil {
return nil, err
}

return c.httpClient.Do(req.WithContext(ctx))
}

// BindRequest stores bind request values.
type BindRequest struct {
Port string `json:"port"`
Expand Down Expand Up @@ -218,12 +239,34 @@ func (c *httpClient) BindSTCPR(ctx context.Context, port string) error {
return fmt.Errorf("status: %d, error: %w", resp.StatusCode, extractError(resp.Body))
}

go func() {
if err := c.keepStcprHeartbeatLoop(ctx); err != nil {
c.log.WithError(err).Errorf("Failed to send TCP heartbeat signal to address-resolver")
return nil
}

// delBindSTCPR uinbinds STCPR entry PK to IP:port on address resolver.
func (c *httpClient) delBindSTCPR(ctx context.Context) error {
if !c.isReady() {
c.log.Debugf("delBindSTCPR: Address resolver is not ready yet, waiting...")
<-c.ready
c.log.Debugf("delBindSTCPR: Address resolver became ready, unbinding")
}

c.log.Debugf("delBindSTCPR: deleting the binding pk: %v from Address resolver", c.pk.String())
resp, err := c.Delete(ctx, stcprBindPath)
if err != nil {
return err
}

defer func() {
if err := resp.Body.Close(); err != nil {
c.log.WithError(err).Warn("Failed to close response body")
}
}()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status: %d, error: %w", resp.StatusCode, extractError(resp.Body))
}

c.log.Debugf("delBindSTCPR: Deleted bind pk: %v from Address resolver successfully", c.pk.String())
return nil
}

Expand Down Expand Up @@ -286,6 +329,12 @@ func (c *httpClient) BindSUDPH(filter *pfilter.PacketFilter, hs Handshake) (<-ch
}
}()

go func() {
if err := c.delBindSUDPH(arConn); err != nil {
c.log.WithError(err).Errorf("Failed to send UDP unbind packet to address-resolver")
}
}()

return addrCh, nil
}

Expand Down Expand Up @@ -380,6 +429,10 @@ func (c *httpClient) readSUDPHMessages(reader io.Reader) <-chan RemoteVisor {
default:
n, err := reader.Read(buf)
if err != nil {
if c.isClosed() {
c.log.Infof("SUDPH conn closed on shutdown message: %v", err)
return
}
c.log.Errorf("Failed to read SUDPH message: %v", err)
return
}
Expand Down Expand Up @@ -412,31 +465,19 @@ func (c *httpClient) Close() error {
}()

if c.sudphConn != nil {
c.delBindSudphWg.Add(1)
close(c.closed)
c.delBindSudphWg.Wait()
if err := c.sudphConn.Close(); err != nil {
c.log.WithError(err).Errorf("Failed to close SUDPH")
}
close(c.closed)
}

return nil
}

// Keep stcpr heartbeat in address-resolver
func (c *httpClient) keepStcprHeartbeatLoop(ctx context.Context) error {
for {
_, err := c.Get(ctx, stcprHeartbeatPath)
if err != nil {
return err
}

c.log.Debugf("Sent TCP heartbeat signal to address-resolver")
select {
case <-c.closed:
return nil
default:
time.Sleep(stcprKeepHeartbeatInterval)
}
if err := c.delBindSTCPR(context.Background()); err != nil {
c.log.WithError(err).Errorf("Failed to delete STCPR binding")
}

return nil
}

// Keep NAT mapping alive.
Expand All @@ -449,12 +490,33 @@ func (c *httpClient) keepSudphHeartbeatLoop(w io.Writer) error {
if _, err := w.Write([]byte(udpKeepHeartbeatMessage)); err != nil {
return err
}

time.Sleep(udpKeepHeartbeatInterval)
}
}
}

// delBindSUDPH unbinds SUDPH entry in address resolver.
func (c *httpClient) delBindSUDPH(w io.Writer) error {
// send unbind packet on shutdown
<-c.closed
defer c.delBindSudphWg.Done()
if _, err := w.Write([]byte(UDPDelBindMessage)); err != nil {
return err
}
c.log.Debugf("delBindSUDPH: Deleted bind pk: %v from Address resolver successfully", c.pk.String())

return nil
}

func (c *httpClient) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}

// extractError returns the decoded error message from Body.
func extractError(r io.Reader) error {
var apiError Error
Expand Down
8 changes: 8 additions & 0 deletions pkg/transport/network/sudph.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func (c *sudphClient) listen() (net.Listener, error) {
if err != nil {
return nil, err
}

_, localPort, err := net.SplitHostPort(packetListener.LocalAddr().String())
if err != nil {
return nil, err
}

c.log.Infof("Successfully bound sudph to port %s", localPort)

go c.acceptAddresses(sudphVisorsConn, addrCh)
return kcp.ServeConn(nil, 0, 0, sudphVisorsConn)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/visor/hypervisorconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package hypervisorconfig
import (
"encoding/hex"
"encoding/json"
"io/fs"
"log"
"net/http"
"os"
"path/filepath"
"time"

"io/fs"

"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/pkg/skyenv"
Expand Down