Skip to content

Commit

Permalink
feat(vimport): Use 2 ports & expanded error message
Browse files Browse the repository at this point in the history
Signed-off-by: Cezar Craciunoiu <cezar.craciunoiu@unikraft.io>
  • Loading branch information
craciunoiuc committed Apr 23, 2024
1 parent 7a34675 commit 70a1722
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
53 changes: 37 additions & 16 deletions internal/cli/kraft/cloud/volume/import/cpio.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
Expand All @@ -21,6 +22,21 @@ import (
"kraftkit.sh/internal/cpio"
)

type okResponse struct {
status int32
message []byte
}

func (r *okResponse) Parse(resp []byte) okResponse {
err := binary.Read(bytes.NewReader(resp[:4]), binary.LittleEndian, r)
if err != nil {
return okResponse{}
}
r.message = resp[4:]

return r

Check failure on line 37 in internal/cli/kraft/cloud/volume/import/cpio.go

View workflow job for this annotation

GitHub Actions / e2e-ubuntu-cli (ubuntu-22.04)

cannot use r (variable of type *okResponse) as okResponse value in return statement

Check failure on line 37 in internal/cli/kraft/cloud/volume/import/cpio.go

View workflow job for this annotation

GitHub Actions / All

cannot use r (variable of type *okResponse) as okResponse value in return statement

Check failure on line 37 in internal/cli/kraft/cloud/volume/import/cpio.go

View workflow job for this annotation

GitHub Actions / All

cannot use r (variable of type *okResponse) as okResponse value in return statement

Check failure on line 37 in internal/cli/kraft/cloud/volume/import/cpio.go

View workflow job for this annotation

GitHub Actions / All

cannot use r (variable of type *okResponse) as okResponse value in return statement

Check failure on line 37 in internal/cli/kraft/cloud/volume/import/cpio.go

View workflow job for this annotation

GitHub Actions / kraft CLI end-to-end

cannot use r (variable of type *okResponse) as okResponse value in return statement
}

// buildCPIO generates a CPIO archive from the data at the given source.
func buildCPIO(ctx context.Context, source string) (path string, size int64, err error) {
if source == "." {
Expand Down Expand Up @@ -48,8 +64,9 @@ func buildCPIO(ctx context.Context, source string) (path string, size int64, err
}

// copyCPIO copies the CPIO archive at the given path over the provided tls.Conn.
func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64, callback progressCallbackFunc) error {
var errno []byte = []byte{0}
func copyCPIO(ctx context.Context, conn *tls.Conn, rconn *tls.Conn, auth, path string, size int64, callback progressCallbackFunc) error {
var respRaw []byte
var resp okResponse

f, err := os.Open(path)
if err != nil {
Expand All @@ -61,25 +78,26 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
// cancelled, because the deadline applies to all future and pending I/O and
// can be dynamically extended or reduced.
_ = conn.SetWriteDeadline(noNetTimeout)
_ = conn.SetReadDeadline(noNetTimeout)
_ = rconn.SetReadDeadline(noNetTimeout)
go func() {
<-ctx.Done()
_ = conn.SetWriteDeadline(immediateNetCancel)
_ = conn.SetReadDeadline(immediateNetCancel)
_ = rconn.SetReadDeadline(immediateNetCancel)
}()

if _, err = io.Copy(conn, strings.NewReader(auth)); err != nil {
return err
}

// Wait for OK
_, err = io.ReadFull(conn, errno)
respRaw, err = io.ReadAll(rconn)
if err != nil {
return err
}

if errno[0] != 0 {
return fmt.Errorf("authentication failed (code %d)", errno[0])
resp.Parse(respRaw)
if resp.status != 0 {
return fmt.Errorf("authentication failed: %s", resp.message)
}

fi, err := os.Open(path)
Expand Down Expand Up @@ -109,13 +127,14 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
}

// Wait for OK
_, err = io.ReadFull(conn, errno)
respRaw, err = io.ReadAll(rconn)
if err != nil {
return err
}

if errno[0] != 0 {
return fmt.Errorf("name copy failed (code %d)", errno[0])
resp.Parse(respRaw)
if resp.status != 0 {
return fmt.Errorf("header copy failed: %s", resp.message)
}

// io.Copy the file name
Expand All @@ -125,13 +144,14 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
}

// Wait for OK
_, err = io.ReadFull(conn, errno)
respRaw, err = io.ReadAll(rconn)
if err != nil {
return err
}

if errno[0] != 0 {
return fmt.Errorf("name copy failed (code %d)", errno[0])
resp.Parse(respRaw)
if resp.status != 0 {
return fmt.Errorf("name copy failed: %s", resp.message)
}

// io.Copy the file content
Expand All @@ -156,13 +176,14 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
// Don't wait for ok if nothing was written
if totalCopy != 0 {
// Wait for OK
_, err = io.ReadFull(conn, errno)
respRaw, err = io.ReadAll(rconn)
if err != nil {
return err
}

if errno[0] != 0 {
return fmt.Errorf("file copy failed (code %d)", errno[0])
resp.Parse(respRaw)
if resp.status != 0 {
return fmt.Errorf("file copy failed: %s", resp.message)
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions internal/cli/kraft/cloud/volume/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ type ImportOptions struct {
VolID string `local:"true" long:"volume" short:"v" usage:"Identifier of an existing volume (name or UUID)"`
}

const volimportPort uint16 = 42069
const (
volimportPort uint16 = 42069
volimportPortRecv uint16 = 42070
)

func NewCmd() *cobra.Command {
cmd, err := cmdfactory.New(&ImportOptions{}, cobra.Command{
Expand Down Expand Up @@ -169,15 +172,24 @@ func importVolumeData(ctx context.Context, opts *ImportOptions) (retErr error) {
instAddr := instFQDN + ":" + strconv.FormatUint(uint64(volimportPort), 10)
conn, err := tls.Dial("tcp4", instAddr, nil)
if err != nil {
return fmt.Errorf("connecting to volume data import instance: %w", err)
return fmt.Errorf("connecting to volume data import instance send port: %w", err)
}
defer func() {
retErr = errors.Join(retErr, conn.Close())
}()

instAddrRecv := instFQDN + ":" + strconv.FormatUint(uint64(volimportPortRecv), 10)

Check failure on line 181 in internal/cli/kraft/cloud/volume/import/import.go

View workflow job for this annotation

GitHub Actions / e2e-ubuntu-cli (ubuntu-22.04)

instAddrRecv declared and not used

Check failure on line 181 in internal/cli/kraft/cloud/volume/import/import.go

View workflow job for this annotation

GitHub Actions / All

instAddrRecv declared and not used (typecheck)

Check failure on line 181 in internal/cli/kraft/cloud/volume/import/import.go

View workflow job for this annotation

GitHub Actions / All

instAddrRecv declared and not used) (typecheck)

Check failure on line 181 in internal/cli/kraft/cloud/volume/import/import.go

View workflow job for this annotation

GitHub Actions / All

instAddrRecv declared and not used) (typecheck)

Check failure on line 181 in internal/cli/kraft/cloud/volume/import/import.go

View workflow job for this annotation

GitHub Actions / kraft CLI end-to-end

instAddrRecv declared and not used
rconn, err := tls.Dial("tcp4", instAddr, nil)
if err != nil {
return fmt.Errorf("connecting to volume data import instance receive port: %w", err)
}
defer func() {
retErr = errors.Join(retErr, rconn.Close())
}()

ctx, cancel := context.WithCancel(ctx)
defer cancel()
err = copyCPIO(ctx, conn, authStr, cpioPath, cpioSize, callback)
err = copyCPIO(ctx, conn, rconn, authStr, cpioPath, cpioSize, callback)
copyCPIOErr = err
return err
},
Expand Down
9 changes: 8 additions & 1 deletion internal/cli/kraft/cloud/volume/import/volimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ func runVolimport(ctx context.Context, cli kcinstances.InstancesService, volUUID
MemoryMB: ptr(32),
Args: []string{
"-p", strconv.FormatUint(uint64(volimportPort), 10),
"-p", strconv.FormatUint(uint64(volimportPortRecv), 10),
"-a", authStr,
},
ServiceGroup: &kcinstances.CreateRequestServiceGroup{
Services: []kcservices.CreateRequestService{{
Port: int(volimportPort),
DestinationPort: ptr(int(volimportPort)),
Handlers: []kcservices.Handler{kcservices.HandlerTLS},
}},
},
{
Port: int(volimportPortRecv),
DestinationPort: ptr(int(volimportPortRecv)),
Handlers: []kcservices.Handler{kcservices.HandlerTLS},
},
},
},
Volumes: []kcinstances.CreateRequestVolume{{
UUID: &volUUID,
Expand Down

0 comments on commit 70a1722

Please sign in to comment.