diff --git a/go.mod b/go.mod index 66f11b1c9..e8ca05a9a 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( k8s.io/apimachinery v0.30.2 k8s.io/apiserver v0.30.2 oras.land/oras-go/v2 v2.5.0 - sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b + sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9 sigs.k8s.io/kustomize/kyaml v0.17.1 ) diff --git a/go.sum b/go.sum index babed26fc..7936597ae 100644 --- a/go.sum +++ b/go.sum @@ -1782,8 +1782,8 @@ oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZH rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b h1:eclQfuRZ92MAY5YYttJsyIQ0O80Fzg7mMeD2hv828Qk= -sdk.kraft.cloud v0.5.9-0.20240524120407-d0cd360e277b/go.mod h1:d44Ht2AeYURqTjCOw5cJCewGFaM8hh3gy2CeekwKa20= +sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9 h1:OzS+PqaHaylsUffvZtdZEDqrSTkVRZxvw/xiPj6n0dM= +sdk.kraft.cloud v0.5.10-0.20240612112818-be81e91810a9/go.mod h1:GSauhAoV4f75IfPVwqC0cpkEbbQeOz+uT4XkZ6cMDOw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.14/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 h1:/U5vjBbQn3RChhv7P11uhYvCSm5G2GaIi5AIGBS6r4c= diff --git a/internal/cli/kraft/cloud/tunnel/relay.go b/internal/cli/kraft/cloud/tunnel/relay.go index 808c0e0fd..1188fb6b2 100644 --- a/internal/cli/kraft/cloud/tunnel/relay.go +++ b/internal/cli/kraft/cloud/tunnel/relay.go @@ -6,23 +6,31 @@ package tunnel import ( + "bytes" "context" "crypto/tls" + "encoding/binary" "errors" "fmt" "io" "net" + "strings" "time" "kraftkit.sh/log" ) -// Relay relays TCP connections to a local listener to a remote host over TLS. +// Relay relays connections from a local listener to a remote host over TLS. type Relay struct { lAddr string rAddr string + ctype string + auth string + name string } +const Heartbeat = "\xf0\x9f\x91\x8b\xf0\x9f\x90\x92\x00" + func (r *Relay) Up(ctx context.Context) error { l, err := r.listenLocal(ctx) if err != nil { @@ -31,7 +39,7 @@ func (r *Relay) Up(ctx context.Context) error { defer func() { l.Close() }() go func() { <-ctx.Done(); l.Close() }() - log.G(ctx).Info("Tunnelling ", l.Addr(), " to ", r.rAddr) + log.G(ctx).Info("tunnelling ", l.Addr(), " to ", r.rAddr) for { conn, err := l.Accept() @@ -43,7 +51,28 @@ func (r *Relay) Up(ctx context.Context) error { } c := r.newConnection(conn) - go c.handle(ctx) + go c.handle(ctx, []byte(r.auth), r.name) + } +} + +func (r *Relay) ControlUp(ctx context.Context, ready chan struct{}) error { + rc, err := r.dialRemote(ctx) + if err != nil { + return err + } + defer rc.Close() + go func() { <-ctx.Done(); rc.Close() }() + + ready <- struct{}{} + close(ready) + + // Heartbeat every minute to keep the connection alive + for { + _, err := io.CopyN(rc, bytes.NewReader([]byte(Heartbeat)), 9) + if err != nil { + return err + } + time.Sleep(time.Minute) } } @@ -62,7 +91,7 @@ func (r *Relay) dialRemote(ctx context.Context) (net.Conn, error) { func (r *Relay) listenLocal(ctx context.Context) (net.Listener, error) { var lc net.ListenConfig - return lc.Listen(ctx, "tcp4", r.lAddr) + return lc.Listen(ctx, r.ctype+"4", r.lAddr) } // connection represents the server side of a connection to a local TCP socket. @@ -75,20 +104,21 @@ type connection struct { // handle handles the client connection by relaying reads and writes from/to // the remote host. -func (c *connection) handle(ctx context.Context) { - log.G(ctx).Info("Accepted client connection ", c.conn.RemoteAddr()) +func (c *connection) handle(ctx context.Context, auth []byte, instance string) { defer func() { c.conn.Close() - log.G(ctx).Info("Closed client connection ", c.conn.RemoteAddr()) + log.G(ctx).Info("closed client connection ", c.conn.RemoteAddr()) }() rc, err := c.relay.dialRemote(ctx) if err != nil { - log.G(ctx).WithError(err).Error("Failed to connect to remote host") + log.G(ctx).WithError(err).Error("failed to connect to remote host") return } defer rc.Close() + log.G(ctx).Info("accepted client connection ", c.conn.RemoteAddr(), " to ", rc.LocalAddr(), "->", rc.RemoteAddr()) + // NOTE(antoineco): these calls are critical as they allow reads/writes to be // later cancelled, because the deadline applies to all future and pending // I/O and can be dynamically extended or reduced. @@ -99,7 +129,42 @@ func (c *connection) handle(ctx context.Context) { _ = c.conn.SetDeadline(immediateNetCancel) }() - const bufSize = 32 * 1024 // same as io.Copy + if len(auth) > 0 { + _, err = rc.Write(auth) + if err != nil { + log.G(ctx).WithError(err).Error("failed to write auth to remote host") + return + } + + var status []byte + statusRaw := bytes.NewBuffer(status) + n, err := io.CopyN(statusRaw, rc, 2) + if err != nil { + log.G(ctx).WithError(err).Error("failed to read auth status from remote host") + return + } + + if n != 2 { + log.G(ctx).Error("invalid auth status from remote host") + return + } + + var statusParsed int16 + err = binary.Read(statusRaw, binary.LittleEndian, &statusParsed) + if err != nil { + log.G(ctx).WithError(err).Error("failed to parse auth status from remote host") + return + } + + if statusParsed == 0 { + log.G(ctx).Error("no more available connections to remote host. Try again later") + return + } else if statusParsed < 0 { + log.G(ctx).Errorf("internal tunnel error (C=%d), to view logs run:", statusParsed) + fmt.Fprintf(log.G(ctx).Out, "\n kraft cloud instance logs %s\n\n", instance) + return + } + } writerDone := make(chan struct{}) go func() { @@ -108,36 +173,25 @@ func (c *connection) handle(ctx context.Context) { writerDone <- struct{}{} }() - writeBuf := make([]byte, bufSize) - for { - n, err := c.conn.Read(writeBuf) - if err != nil { - if !errors.Is(err, io.EOF) { - log.G(ctx).WithError(err).Error("Failed to read from client") - } + _, err = io.Copy(rc, c.conn) + if err != nil { + if isNetClosedError(err) { return } - if _, err := rc.Write(writeBuf[:n]); err != nil { - log.G(ctx).WithError(err).Error("Failed to write to remote host") - return + if !isNetTimeoutError(err) { + log.G(ctx).WithError(err).Error("failed to copy data from client to remote host") } } }() - readBuf := make([]byte, bufSize) - for { - n, err := rc.Read(readBuf) - if err != nil { - // expected when the connection gets aborted by a deadline - if !isNetTimeoutError(err) { - log.G(ctx).WithError(err).Error("Failed to read from remote host") - } - break - } - if _, err := c.conn.Write(readBuf[:n]); err != nil { - log.G(ctx).WithError(err).Error("Failed to write to client") - break + _, err = io.Copy(c.conn, rc) + if err != nil { + if !isNetTimeoutError(err) { + log.G(ctx).WithError(err).Error("failed to copy data from remote host to client") } + } else { + // Connection was closed remote so we just return to close our side + return } <-writerDone @@ -157,3 +211,13 @@ func isNetTimeoutError(err error) bool { } return false } + +// isNetClosedError reports whether err is a network closed error. +// - first error is for the case when the writer tries to write but the main +// thread already closed the connection. +// - second error is for when reader is still reading but the remote closed +// the connection. +func isNetClosedError(err error) bool { + return strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "connection reset by peer") +} diff --git a/internal/cli/kraft/cloud/tunnel/tunnel.go b/internal/cli/kraft/cloud/tunnel/tunnel.go index c55f871c7..0c568466a 100644 --- a/internal/cli/kraft/cloud/tunnel/tunnel.go +++ b/internal/cli/kraft/cloud/tunnel/tunnel.go @@ -11,31 +11,70 @@ import ( "net" "strconv" "strings" + "time" "github.com/MakeNowJust/heredoc" "github.com/spf13/cobra" - "sdk.kraft.cloud" + kraftcloud "sdk.kraft.cloud" + kcinstances "sdk.kraft.cloud/instances" kcservices "sdk.kraft.cloud/services" "kraftkit.sh/cmdfactory" "kraftkit.sh/config" "kraftkit.sh/internal/cli/kraft/cloud/utils" + "kraftkit.sh/log" ) type TunnelOptions struct { - token string - metro string + Connections uint `local:"true" long:"connections" short:"c" usage:"Number of connections to forward maximum at the same time" default:"27"` + ProxyPorts []string `local:"true" long:"proxy-ports" short:"p" usage:"Ports to use for the proxies. Default start port is 4444"` + ProxyControlPort uint `local:"true" long:"proxy-control-port" short:"P" usage:"Port to use for the proxy control" default:"4443"` + TunnelServiceImage string `local:"true" long:"tunnel-service-image" usage:"Tunnel service image to use" default:"kraftcloud/tunlr:latest"` + Token string `noattribute:"true"` + Metro string `noattribute:"true"` + + // Parsed arguments + // ProxyPorts converted from string to uint16 + parsedProxyPorts []uint16 + // instance name/uuid/private-ip - gets turned into private-ip after fetching + instances []string + // port to forward on the local machine + localPorts []uint16 + // type of the port to forward (tcp/udp) + ctypes []string + // port to forward of the instance + instanceProxyPorts []uint16 + // port to expose the proxy on + exposedProxyPorts []uint16 + + // port iterator for when a single proxy port is provided + portIterator uint16 } func NewCmd() *cobra.Command { cmd, err := cmdfactory.New(&TunnelOptions{}, cobra.Command{ - Short: "Forward a local port to a service group through a TLS tunnel", - Use: "tunnel [FLAGS] SERVICE_GROUP [LOCAL_PORT:]REMOTE_PORT", - Args: cobra.ExactArgs(2), + Short: "Forward a local port to an instance through a TLS tunnel", + Use: "tunnel [FLAGS] [LOCAL_PORT:]INSTANCE:DEST_PORT[/TYPE] [[LOCAL_PORT:]INSTANCE:DEST_PORT[/TYPE]]...", + Args: cobra.MinimumNArgs(1), Example: heredoc.Doc(` - # Forward the local port 8443 to the port 443 of the "my-service" service group. - $ kraft cloud tunnel my-service 8443:443 + # Forward the local port 8080 to the tcp port 8080 of the private instance 'my-instance' + $ kraft cloud my-instance:8080 + + # Forward the local port 8443 to the tcp port 8080 of the private instance 'my-instance' and allow 5 connections + $ kraft cloud --connections 5 my-instance:8443:8080/tcp + + # Forward multiple ports to multiple instances with 10 connections each + $ kraft cloud --connections 10 my-instance1:8080:8080/tcp my-instance2:8443:8080/tcp + + # Forward the local port 8080 to the tcp port 8080 of the private instance 'my-instance' and use port 5500 for the tunnel + $ kraft cloud -p 5500 my-instance:8080 + + # Forward the local ports 8080,8081 to the tcp ports 8080,8081 of the private instance 'my-instance' and use ports 5500,5505 for the tunnel + $ kraft cloud -p 5500 -p 5505 my-instance:8080 my-instance2:8081 + + # Forward the local ports 8080,8081 to the tcp ports 8080,8081 of the private instance 'my-instance' and use ports 5500,5501 for the tunnel + $ kraft cloud my-instance:8080 my-instance2:8081 `), Annotations: map[string]string{ cmdfactory.AnnotationHelpGroup: "kraftcloud", @@ -49,99 +88,317 @@ func NewCmd() *cobra.Command { } func (opts *TunnelOptions) Pre(cmd *cobra.Command, _ []string) error { - if err := utils.PopulateMetroToken(cmd, &opts.metro, &opts.token); err != nil { + if err := utils.PopulateMetroToken(cmd, &opts.Metro, &opts.Token); err != nil { return fmt.Errorf("could not populate metro and token: %w", err) } + + if opts.Connections > 27 { + return fmt.Errorf("currently the maximum number of connections is 27") + } + return nil } func (opts *TunnelOptions) Run(ctx context.Context, args []string) error { - sgID := args[0] + var err error - lport, rport, err := parsePorts(args[1]) - if err != nil { - return err + // If no proxy ports are provided, default to 4444 + if len(opts.ProxyPorts) == 0 { + opts.ProxyPorts = []string{"4444"} } - auth, err := config.GetKraftCloudAuthConfig(ctx, opts.token) + for _, port := range opts.ProxyPorts { + if parsed, err := strconv.ParseUint(port, 10, 16); err != nil { + return fmt.Errorf("%q is not a valid port number", port) + } else { + opts.parsedProxyPorts = append(opts.parsedProxyPorts, uint16(parsed)) + } + } + + if len(opts.ProxyPorts) > 1 && len(opts.ProxyPorts) != len(args) { + return fmt.Errorf("supplied number of proxy ports must match the number of ports to forward") + } + + auth, err := config.GetKraftCloudAuthConfig(ctx, opts.Token) if err != nil { return fmt.Errorf("could not retrieve credentials: %w", err) } - cli := kraftcloud.NewServicesClient( + if err := opts.parseArgs(ctx, args); err != nil { + return fmt.Errorf("could not parse arguments: %w", err) + } + + var authStr string + cliInstance := kraftcloud.NewInstancesClient( kraftcloud.WithToken(config.GetKraftCloudTokenAuthConfig(*auth)), - ).WithMetro(opts.metro) + ).WithMetro(opts.Metro) - fqdn, err := serviceGroupSanityCheck(ctx, cli, sgID, rport) + opts.instances, err = populatePrivateIPs(ctx, cliInstance, opts.instances) if err != nil { - return err + return fmt.Errorf("could not populate private IPs: %w", err) } + authStr, err = utils.GenRandAuth() + if err != nil { + return fmt.Errorf("could not generate random authentication string: %w", err) + } + + instArgs := opts.formatProxyArgs(authStr) + + instID, sgFQDN, err := opts.runProxy(ctx, cliInstance, instArgs) + if err != nil { + return fmt.Errorf("could not run proxy: %w", err) + } + defer func() { + err := terminateProxy(context.TODO(), cliInstance, instID) + if err != nil { + log.G(ctx).Errorf("could not terminate proxy: %v\n", err) + } + }() + + // Control relay used for keeping the connection up + cr := Relay{ + rAddr: net.JoinHostPort(sgFQDN, strconv.FormatUint(uint64(opts.ProxyControlPort), 10)), + } + ready := make(chan struct{}, 1) + go func() { + err := cr.ControlUp(ctx, ready) + if err != nil { + log.G(ctx).Errorf("could not start control relay: %v\n", err) + } + }() + // Wait for the control relay to be ready to be able to connect + <-ready + r := Relay{ // TODO(antoineco): allow dual-stack by creating two separate listeners. // Alternatively, we could have defaulted to the address "::" to create a // tcp46 socket, but listening on all addresses is an insecure default. - lAddr: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(lport), 10)), - rAddr: net.JoinHostPort(fqdn, strconv.FormatUint(uint64(rport), 10)), + lAddr: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(opts.localPorts[0]), 10)), + rAddr: net.JoinHostPort(sgFQDN, strconv.FormatUint(uint64(opts.exposedProxyPorts[0]), 10)), + + // NOTE(craciunoiuc): Only TCP is supported at the moment. This refers to the + // local listener, as the remote listener is always assumed to be TCP because + // of TLS. + ctype: opts.ctypes[0], + auth: authStr, + name: instID, + } + + for i := range opts.localPorts { + if i == 0 { + continue + } + + pr := Relay{ + lAddr: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(opts.localPorts[i]), 10)), + rAddr: net.JoinHostPort(sgFQDN, strconv.FormatUint(uint64(opts.exposedProxyPorts[i]), 10)), + ctype: opts.ctypes[i], + auth: authStr, + name: instID, + } + + go func() { + err := pr.Up(ctx) + if err != nil { + log.G(ctx).Errorf("could not start relay: %v\n", err) + } + }() } + return r.Up(ctx) } -// serviceGroupSanityCheck verifies that the given service group can be tunneled to. -// In case of success, the (public) FQDN of service group is returned. -func serviceGroupSanityCheck(ctx context.Context, cli kcservices.ServicesService, sgID string, rport uint16) (fqdn string, err error) { - sgGetResp, err := cli.Get(ctx, sgID) - if err != nil { - return "", fmt.Errorf("getting service group '%s': %w", sgID, err) - } - sg, err := sgGetResp.FirstOrErr() - if err != nil { - return "", fmt.Errorf("getting service group '%s': %w", sgID, err) +// generatePort generates a port number based on the startPort and the portIterator. +// This is used when a single proxy port is provided and multiple ports are to be forwarded. +func (opts *TunnelOptions) generatePort(startPort uint16) uint16 { + defer func() { + opts.portIterator++ + }() + return startPort + opts.portIterator +} + +// parseArgs parses the command line arguments into the instance, local port, remote port and connection type. +func (opts *TunnelOptions) parseArgs(ctx context.Context, args []string) error { + for i, arg := range args { + instance, lport, rport, ctype, err := parsePorts(ctx, arg) + if err != nil { + return err + } + + opts.instances = append(opts.instances, instance) + opts.localPorts = append(opts.localPorts, lport) + opts.instanceProxyPorts = append(opts.instanceProxyPorts, rport) + opts.ctypes = append(opts.ctypes, ctype) + + if len(opts.ProxyPorts) == 1 { + opts.exposedProxyPorts = append(opts.exposedProxyPorts, opts.generatePort(opts.parsedProxyPorts[0])) + } else { + opts.exposedProxyPorts = append(opts.exposedProxyPorts, opts.parsedProxyPorts[i]) + } } - if len(sg.Domains) == 0 { - return "", fmt.Errorf("service group '%s' has no public domain", sgID) + return nil +} + +// runProxy runs a proxy instance with the given arguments. +// Information related to the proxy instance is hardcoded, but the UUID is returned. +func (opts *TunnelOptions) runProxy(ctx context.Context, cli kcinstances.InstancesService, args []string) (string, string, error) { + var parsedPorts []kcservices.CreateRequestService + for i := range opts.exposedProxyPorts { + parsedPorts = append(parsedPorts, kcservices.CreateRequestService{ + Port: int(opts.exposedProxyPorts[i]), + DestinationPort: ptr(int(opts.exposedProxyPorts[i])), + Handlers: []kcservices.Handler{ + kcservices.HandlerTLS, + }, + }) } + parsedPorts = append(parsedPorts, kcservices.CreateRequestService{ + Port: int(opts.ProxyControlPort), + DestinationPort: ptr(int(opts.ProxyControlPort)), + Handlers: []kcservices.Handler{ + kcservices.HandlerTLS, + }, + }) - var hasPort bool - var exposedPorts []int - for _, svc := range sg.Services { - if svc.Port == int(rport) { - hasPort = true - break - } - exposedPorts = append(exposedPorts, svc.Port) + crinstResp, err := cli.Create(ctx, kcinstances.CreateRequest{ + Image: opts.TunnelServiceImage, + MemoryMB: ptr(64), + Args: args, + ServiceGroup: &kcinstances.CreateRequestServiceGroup{ + Services: parsedPorts, + }, + Autostart: ptr(true), + WaitTimeoutMs: ptr(int((3 * time.Second).Milliseconds())), + Features: []kcinstances.Feature{kcinstances.FeatureDeleteOnStop}, + }) + if err != nil { + return "", "", fmt.Errorf("creating proxy instance: %w", err) } - if !hasPort { - return "", fmt.Errorf("service group '%s' does not expose port %d. Ports exposed are %v", sgID, rport, exposedPorts) + inst, err := crinstResp.FirstOrErr() + if err != nil { + return "", "", fmt.Errorf("creating proxy instance: %w", err) } - return sg.Domains[0].FQDN, nil + return inst.UUID, inst.ServiceGroup.Domains[0].FQDN, nil } -// parsePorts parses a command line argument in the format [lport:]rport into +// parsePorts parses a command line argument in the format [lport:]rport[/ctype] into // two port numbers lport and rport. If lport isn't set, a random port will be -// used by the relay. -func parsePorts(portsArg string) (lport, rport uint16, err error) { - ports := strings.SplitN(portsArg, ":", 2) +// used by the relay. If ctype isn't set, the connection will be assumed to be TCP. +func parsePorts(ctx context.Context, portsArg string) (instance string, lport, rport uint16, ctype string, err error) { + types := strings.SplitN(portsArg, "/", 2) + if len(types) == 2 { + ctype = types[1] + } else { + ctype = "tcp" + } + + if strings.ToLower(ctype) != "tcp" { + log.G(ctx).Warn("only TCP connections are supported at the moment") + } + + ports := strings.SplitN(types[0], ":", 3) - if len(ports) == 1 { - rport64, err := strconv.ParseUint(ports[0], 10, 16) + if len(ports) == 2 { + if _, err := strconv.ParseUint(ports[0], 10, 16); err == nil { + return "", 0, 0, "", fmt.Errorf("%q is not a valid instance", ports[0]) + } + + rport64, err := strconv.ParseUint(ports[1], 10, 16) if err != nil { - return 0, 0, fmt.Errorf("%q is not a valid port number", ports[0]) + return "", 0, 0, "", fmt.Errorf("%q is not a valid port number", ports[1]) } - return 0, uint16(rport64), nil + return ports[0], uint16(rport64), uint16(rport64), ctype, nil } lport64, err := strconv.ParseUint(ports[0], 10, 16) if err != nil { - return 0, 0, fmt.Errorf("%q is not a valid port number", ports[0]) + return "", 0, 0, "", fmt.Errorf("%q is not a valid port number", ports[0]) } - rport64, err := strconv.ParseUint(ports[1], 10, 16) + rport64, err := strconv.ParseUint(ports[2], 10, 16) if err != nil { - return 0, 0, fmt.Errorf("%q is not a valid port number", ports[1]) + return "", 0, 0, "", fmt.Errorf("%q is not a valid port number", ports[1]) } - return uint16(lport64), uint16(rport64), nil + return ports[1], uint16(lport64), uint16(rport64), ctype, nil } + +// formatProxyArgs formats the arguments to be passed to the proxy instance. +func (opts *TunnelOptions) formatProxyArgs(authStr string) []string { + var connections []string + + for i := range opts.instances { + connections = append(connections, + fmt.Sprintf("TCP2%s_HOOK:%s:%d:%d:%d", + strings.ToUpper(opts.ctypes[i]), + opts.instances[i], + opts.instanceProxyPorts[i], + opts.exposedProxyPorts[i], + opts.Connections, + ), + ) + } + + var allConnections string + for _, conn := range connections { + allConnections += conn + "|" + } + allConnections = "[" + strings.TrimSuffix(allConnections, "|") + "]" + + return []string{ + // HEARTBEAT_PORT + fmt.Sprintf("%d", opts.ProxyControlPort), + // AUTH_COOKIE_LEN:AUTH_TIMEOUT:AUTH_MAX_EVS:AUTH_COOKIE + fmt.Sprintf("%d:%d:%d:%s", len(authStr), 5, opts.Connections, authStr), + // MAX_EVS:GLOBAL_GPBUF:EVS_TIMEOUT + fmt.Sprintf("%d:%d:%d", opts.Connections, 8, 600), + // [HOOKSTR0:|HOOKSTR1:...] + allConnections, + } +} + +// populatePrivateIPs fetches the private IPs of the instances and replaces the instance names/uuids with the Private IPs. +func populatePrivateIPs(ctx context.Context, cli kcinstances.InstancesService, ips []string) ([]string, error) { + var instancesToGet []string + var indexesToGet []int + for i := range ips { + // If instance is not an IP (PrivateIP) or PrivateFQDN + // assume it is a name/UUID and fetch the IP + if net.ParseIP(ips[i]) == nil && !strings.HasSuffix(ips[i], ".internal") { + instancesToGet = append(instancesToGet, ips[i]) + indexesToGet = append(indexesToGet, i) + } + } + if len(instancesToGet) > 0 { + instGetResp, err := cli.Get(ctx, instancesToGet...) + if err != nil { + return nil, fmt.Errorf("getting instances: %w", err) + } + + for i, inst := range instGetResp.Data.Entries { + if inst.PrivateIP == "" { + return nil, fmt.Errorf("instance '%s' not found", instancesToGet[i]) + } + ips[indexesToGet[i]] = inst.PrivateIP + } + } + + return ips, nil +} + +// terminateProxy terminates the proxy instance with the given UUID. +func terminateProxy(ctx context.Context, icli kcinstances.InstancesService, instID string) error { + delinstResp, err := icli.Delete(ctx, instID) + if err != nil { + return fmt.Errorf("deleting proxy instance '%s': %w", instID, err) + } + if _, err = delinstResp.FirstOrErr(); err != nil { + return fmt.Errorf("deleting proxy instance '%s': %w", instID, err) + } + return nil +} + +func ptr[T comparable](v T) *T { return &v } diff --git a/internal/cli/kraft/cloud/volume/import/rand.go b/internal/cli/kraft/cloud/utils/rand.go similarity index 86% rename from internal/cli/kraft/cloud/volume/import/rand.go rename to internal/cli/kraft/cloud/utils/rand.go index 2e327acb3..6f8c0772b 100644 --- a/internal/cli/kraft/cloud/volume/import/rand.go +++ b/internal/cli/kraft/cloud/utils/rand.go @@ -3,7 +3,7 @@ // Licensed under the BSD-3-Clause License (the "License"). // You may not use this file except in compliance with the License. -package vimport +package utils import ( "crypto/rand" @@ -11,8 +11,8 @@ import ( "strings" ) -// genRandAuth generates a random authentication string. -func genRandAuth() (string, error) { +// GenRandAuth generates a random authentication string. +func GenRandAuth() (string, error) { rndChars := []byte("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz") maxIdx := big.NewInt(int64(len(rndChars))) diff --git a/internal/cli/kraft/cloud/volume/import/import.go b/internal/cli/kraft/cloud/volume/import/import.go index 69647577e..0341dfc59 100644 --- a/internal/cli/kraft/cloud/volume/import/import.go +++ b/internal/cli/kraft/cloud/volume/import/import.go @@ -138,7 +138,7 @@ func importVolumeData(ctx context.Context, opts *ImportOptions) (retErr error) { paramodel, err = processTree(ctx, "Spawning temporary volume data import instance", func(ctx context.Context) error { - if authStr, err = genRandAuth(); err != nil { + if authStr, err = utils.GenRandAuth(); err != nil { return fmt.Errorf("generating random authentication string: %w", err) } instID, instFQDN, err = runVolimport(ctx, icli, volUUID, authStr)