Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloud): Introduce connections to proxy unikernel #1680

Open
wants to merge 4 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ require (
k8s.io/apimachinery v0.30.1
k8s.io/apiserver v0.30.1
oras.land/oras-go/v2 v2.5.0
sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b
sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9
sigs.k8s.io/kustomize/kyaml v0.17.1
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1784,8 +1784,8 @@ oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZH
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b h1:eclQfuRZ92MAY5YYttJsyIQ0O80Fzg7mMeD2hv828Qk=
sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b/go.mod h1:d44Ht2AeYURqTjCOw5cJCewGFaM8hh3gy2CeekwKa20=
sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9 h1:OzS+PqaHaylsUffvZtdZEDqrSTkVRZxvw/xiPj6n0dM=
sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9/go.mod h1:GSauhAoV4f75IfPVwqC0cpkEbbQeOz+uT4XkZ6cMDOw=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.14/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 h1:/U5vjBbQn3RChhv7P11uhYvCSm5G2GaIi5AIGBS6r4c=
Expand Down
128 changes: 96 additions & 32 deletions internal/cli/kraft/cloud/tunnel/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,31 @@
package tunnel

import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"strings"
"time"

"kraftkit.sh/log"
)

// Relay relays TCP connections to a local listener to a remote host over TLS.
// Relay relays connections from a local listener to a remote host over TLS.
type Relay struct {
lAddr string
rAddr string
ctype string
auth string
name string
}

const Heartbeat = "\xf0\x9f\x91\x8b\xf0\x9f\x90\x92\x00"

func (r *Relay) Up(ctx context.Context) error {
l, err := r.listenLocal(ctx)
if err != nil {
Expand All @@ -31,7 +39,7 @@ func (r *Relay) Up(ctx context.Context) error {
defer func() { l.Close() }()
go func() { <-ctx.Done(); l.Close() }()

log.G(ctx).Info("Tunnelling ", l.Addr(), " to ", r.rAddr)
log.G(ctx).Info("tunnelling ", l.Addr(), " to ", r.rAddr)

for {
conn, err := l.Accept()
Expand All @@ -43,7 +51,28 @@ func (r *Relay) Up(ctx context.Context) error {
}

c := r.newConnection(conn)
go c.handle(ctx)
go c.handle(ctx, []byte(r.auth), r.name)
}
}

func (r *Relay) ControlUp(ctx context.Context, ready chan struct{}) error {
rc, err := r.dialRemote(ctx)
if err != nil {
return err
}
defer rc.Close()
go func() { <-ctx.Done(); rc.Close() }()

ready <- struct{}{}
close(ready)

// Heartbeat every minute to keep the connection alive
for {
_, err := io.CopyN(rc, bytes.NewReader([]byte(Heartbeat)), 9)
if err != nil {
return err
}
time.Sleep(time.Minute)
}
}

Expand All @@ -62,7 +91,7 @@ func (r *Relay) dialRemote(ctx context.Context) (net.Conn, error) {

func (r *Relay) listenLocal(ctx context.Context) (net.Listener, error) {
var lc net.ListenConfig
return lc.Listen(ctx, "tcp4", r.lAddr)
return lc.Listen(ctx, r.ctype+"4", r.lAddr)
}

// connection represents the server side of a connection to a local TCP socket.
Expand All @@ -75,20 +104,21 @@ type connection struct {

// handle handles the client connection by relaying reads and writes from/to
// the remote host.
func (c *connection) handle(ctx context.Context) {
log.G(ctx).Info("Accepted client connection ", c.conn.RemoteAddr())
func (c *connection) handle(ctx context.Context, auth []byte, instance string) {
defer func() {
c.conn.Close()
log.G(ctx).Info("Closed client connection ", c.conn.RemoteAddr())
log.G(ctx).Info("closed client connection ", c.conn.RemoteAddr())
}()

rc, err := c.relay.dialRemote(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("Failed to connect to remote host")
log.G(ctx).WithError(err).Error("failed to connect to remote host")
return
}
defer rc.Close()

log.G(ctx).Info("accepted client connection ", c.conn.RemoteAddr(), " to ", rc.LocalAddr(), "->", rc.RemoteAddr())

// NOTE(antoineco): these calls are critical as they allow reads/writes to be
// later cancelled, because the deadline applies to all future and pending
// I/O and can be dynamically extended or reduced.
Expand All @@ -99,7 +129,42 @@ func (c *connection) handle(ctx context.Context) {
_ = c.conn.SetDeadline(immediateNetCancel)
}()

const bufSize = 32 * 1024 // same as io.Copy
if len(auth) > 0 {
_, err = rc.Write(auth)
if err != nil {
log.G(ctx).WithError(err).Error("failed to write auth to remote host")
return
}

var status []byte
statusRaw := bytes.NewBuffer(status)
n, err := io.CopyN(statusRaw, rc, 2)
if err != nil {
log.G(ctx).WithError(err).Error("failed to read auth status from remote host")
return
}

if n != 2 {
log.G(ctx).Error("invalid auth status from remote host")
return
}

var statusParsed int16
err = binary.Read(statusRaw, binary.LittleEndian, &statusParsed)
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse auth status from remote host")
return
}

if statusParsed == 0 {
log.G(ctx).Error("no more available connections to remote host. Try again later")
return
} else if statusParsed < 0 {
log.G(ctx).Errorf("internal tunnel error (C=%d), to view logs run:", statusParsed)
fmt.Fprintf(log.G(ctx).Out, "\n kraft cloud instance logs %s\n\n", instance)
return
}
}

writerDone := make(chan struct{})
go func() {
Expand All @@ -108,36 +173,25 @@ func (c *connection) handle(ctx context.Context) {
writerDone <- struct{}{}
}()

writeBuf := make([]byte, bufSize)
for {
n, err := c.conn.Read(writeBuf)
if err != nil {
if !errors.Is(err, io.EOF) {
log.G(ctx).WithError(err).Error("Failed to read from client")
}
_, err = io.Copy(rc, c.conn)
if err != nil {
if isNetClosedError(err) {
return
}
if _, err := rc.Write(writeBuf[:n]); err != nil {
log.G(ctx).WithError(err).Error("Failed to write to remote host")
return
if !isNetTimeoutError(err) {
log.G(ctx).WithError(err).Error("failed to copy data from client to remote host")
}
}
}()

readBuf := make([]byte, bufSize)
for {
n, err := rc.Read(readBuf)
if err != nil {
// expected when the connection gets aborted by a deadline
if !isNetTimeoutError(err) {
log.G(ctx).WithError(err).Error("Failed to read from remote host")
}
break
}
if _, err := c.conn.Write(readBuf[:n]); err != nil {
log.G(ctx).WithError(err).Error("Failed to write to client")
break
_, err = io.Copy(c.conn, rc)
if err != nil {
if !isNetTimeoutError(err) {
log.G(ctx).WithError(err).Error("failed to copy data from remote host to client")
}
} else {
// Connection was closed remote so we just return to close our side
return
}

<-writerDone
Expand All @@ -157,3 +211,13 @@ func isNetTimeoutError(err error) bool {
}
return false
}

// isNetClosedError reports whether err is a network closed error.
// - first error is for the case when the writer tries to write but the main
// thread already closed the connection.
// - second error is for when reader is still reading but the remote closed
// the connection.
func isNetClosedError(err error) bool {
return strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "connection reset by peer")
}