Skip to content

Commit

Permalink
wip(import): Ensure works for all cases and tidy up
Browse files Browse the repository at this point in the history
Signed-off-by: Cezar Craciunoiu <cezar.craciunoiu@unikraft.io>
  • Loading branch information
craciunoiuc committed May 9, 2024
1 parent 7c9176a commit 8330adb
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 99 deletions.
172 changes: 85 additions & 87 deletions internal/cli/kraft/cloud/volume/import/cpio.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,50 +33,59 @@ type okResponse struct {

// message is the message sent by the server.
// This is only set if status is not 1.
message []byte // 1500 bytes max + some space
message []byte // 1024 bytes per message
}

const (
// The size read by the `volimport` unikernel on one socket read
msgMaxSize = 32 * 1024 // 32 KiB
)

func (r *okResponse) clear() {
r.status = 0
r.msglen = 0
r.message = nil
}

func (r *okResponse) parse(resp []byte) {
func (r *okResponse) parse(resp []byte) error {
r.clear()

err := binary.Read(bytes.NewReader(resp[:4]), binary.LittleEndian, &r.status)
if err != nil {
fmt.Println(err)
return err
}

if r.status == 1 {
return
return nil
}

err = binary.Read(bytes.NewReader(resp[4:8]), binary.LittleEndian, &r.msglen)
if err != nil {
fmt.Println(err)
return err
}

r.message = resp[8 : 8+r.msglen]

return nil
}

func (r *okResponse) waitForOK(conn *tls.Conn, errorMsg string) error {
retErr := fmt.Errorf(errorMsg)
for it := 0; ; it++ {
respRaw := make([]byte, 1536) // TODO See size
// A message can have at max:
// status - 4 bytes
// msglen - 4 bytes
// msg - 1024 bytes
respRaw := make([]byte, 1032)

fmt.Println("waiting for response")
n, err := io.ReadAtLeast(conn, respRaw, 4)
fmt.Println(n)
_, err := io.ReadAtLeast(conn, respRaw, 4)
if err != nil {
return fmt.Errorf("%w: %s", retErr, err)
}

r.parse(respRaw)
fmt.Println("PARSED RAW RESPONSE:", r.message)
fmt.Println("PARSED RESPONSE:", string(r.message))
if err := r.parse(respRaw); err != nil {
return fmt.Errorf("%w: %s", retErr, err)
}
switch {
case r.status == 0:
if errorMsg != retErr.Error() {
Expand Down Expand Up @@ -120,32 +129,32 @@ func buildCPIO(ctx context.Context, source string) (path string, size int64, err
}

// copyCPIO copies the CPIO archive at the given path over the provided tls.Conn.
func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64, callback progressCallbackFunc) error {
func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, timeoutS uint64) error {
var resp okResponse

// NOTE(antoineco): this call is critical as it allows writes to be later
// cancelled, because the deadline applies to all future and pending I/O and
// can be dynamically extended or reduced.
// TODO(craciunoiuc): decide on the deadline and then add seconds for every
// file copied depending on the size and how long the last file took to copy.
_ = conn.SetWriteDeadline(time.Now().Add(20 * time.Second))
_ = conn.SetReadDeadline(time.Now().Add(20 * time.Second))
if timeoutS > 0 {
_ = conn.SetWriteDeadline(time.Now().Add(time.Duration(timeoutS) * time.Second))
_ = conn.SetReadDeadline(time.Now().Add(time.Duration(timeoutS) * time.Second))
} else {
_ = conn.SetWriteDeadline(noNetTimeout)
_ = conn.SetReadDeadline(noNetTimeout)
}
go func() {
<-ctx.Done()
_ = conn.SetWriteDeadline(immediateNetCancel)
_ = conn.SetReadDeadline(immediateNetCancel)
}()

fmt.Println("before auth copy")
if _, err := io.Copy(conn, strings.NewReader(auth)); err != nil {
return err
}

fmt.Println("before auth check")
if err := resp.waitForOK(conn, "authentication failed"); err != nil {
return err
}
fmt.Println("after auth")

fi, err := os.Open(path)
if err != nil {
Expand All @@ -156,8 +165,16 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64

reader := cpio.NewReader(fi)

// Iterate through the files in the archive.
// We need to use a sentinel variable to ensure that the CPIO header of the
// the `TRAILER!!!` entry is still sent to the importer.
shouldStop := false

// Iterate through the files in the archive.
// Sending a file has a list of steps
// 1. Send the raw CPIO header -- wait for OK
// 2. Send the name of the file (NUL terminated) -- wait for OK
// 2'. Stop if last entry detected
// 3. Copy the file content piece by piece | Link destination -- wait for OK
for {
hdr, raw, err := reader.Next()
if err == io.EOF {
Expand All @@ -166,11 +183,7 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
return err
}

fmt.Println("before header copy")
// io.Copy the header
fmt.Println(string(raw.Bytes()))
fmt.Println(raw.Bytes())

// 1. Send the header
n, err := io.CopyN(conn, bytes.NewBuffer(raw.Bytes()), int64(len(raw.Bytes())))
// NOTE(antoineco): such error can be expected if volimport exited early or
// a deadline was set due to cancellation. What we should convey in the error
Expand All @@ -179,115 +192,100 @@ func copyCPIO(ctx context.Context, conn *tls.Conn, auth, path string, size int64
if !isNetClosedError(err) {
return err
}
if n != size {
if n != int64(len(raw.Bytes())) {
return fmt.Errorf("incomplete write (%d/%d)", n, len(raw.Bytes()))
}
return err
}

fmt.Println("before header auth check")
if err := resp.waitForOK(conn, "header copy failed"); err != nil {
return err
}
fmt.Println("after header auth check")

fmt.Println("before file name copy")
fmt.Println(hdr.Name, int64(len(hdr.Name)))

nameBytesToSend := []byte(hdr.Name)

fmt.Println(hdr.NameSize)

// Add NUL-termination to name string as per CPIO spec
nameBytesToSend = append(nameBytesToSend, 0x00)

for i := 0; i < int(hdr.NamePad); i++ {
nameBytesToSend = append(nameBytesToSend, 0x00)
}

fmt.Println(bytes.NewReader(nameBytesToSend))
// io.Copy the file name
// 2. Send the file name
n, err = io.CopyN(conn, bytes.NewReader(nameBytesToSend), int64(len(nameBytesToSend)))
// NOTE(antoineco): such error can be expected if volimport exited early or
// a deadline was set due to cancellation. What we should convey in the error
// is that the data import didn't complete, not the low-level network error.
if err != nil {
if !isNetClosedError(err) {
return err
}
if n != size {
if n != int64(len(hdr.Name)) {
return fmt.Errorf("incomplete write (%d/%d)", n, len(hdr.Name))
}
return err
}

fmt.Println("before file name copy check")
if err := resp.waitForOK(conn, "name copy failed"); err != nil {
return err
}
fmt.Println("after file name copy")

// 2'. Stop when `TRAILER!!!` met
if shouldStop {
break
}

// io.Copy the file content
var totalCopy int64

fmt.Println("before file copy")
fmt.Printf("size: %d\n", hdr.Size)
for {
buf := make([]byte, 1048576)
bread, err := reader.Read(buf)
if err == io.EOF {
fmt.Println("EOF", bread)
break
} else if err != nil {
return err
// If nothing was copied the entry was a directory which has no size
empty := true

// 3. Send the file content. If the file is a link copy the destination
// as content in this step. Copy runs uninterrupted until the whole size
// was sent.
if hdr.Linkname == "" {
for {
toSend := msgMaxSize

if hdr.Size < int64(toSend) {
toSend = int(hdr.Size)
}

buf := make([]byte, toSend)
bread, err := reader.Read(buf)

if err == io.EOF {
break
} else if err != nil {
return err
}

n, err := io.CopyN(conn, bytes.NewReader(buf), int64(bread))
if err != nil {
if !isNetClosedError(err) {
return err
}
if n != int64(bread) {
return fmt.Errorf("incomplete write (%d/%d)", n, int64(bread))
}
return err
}

empty = false
}
} else {
bread := len(hdr.Linkname)

n, err := io.CopyN(conn, bytes.NewReader(buf), int64(bread))
// NOTE(antoineco): such error can be expected if volimport exited early or
// a deadline was set due to cancellation. What we should convey in the error
// is that the data import didn't complete, not the low-level network error.
n, err := io.CopyN(conn, bytes.NewReader([]byte(hdr.Linkname)), int64(bread))
if err != nil {
if !isNetClosedError(err) {
return err
}
if n != size {
if n != int64(bread) {
return fmt.Errorf("incomplete write (%d/%d)", n, int64(bread))
}
return err
}

totalCopy += int64(bread)

fmt.Println(totalCopy)
empty = false
}
fmt.Println("after file copy")

// io.Copy the padding
// Don't wait for ok if nothing was written
if totalCopy != 0 {
padding := make([]byte, hdr.EntryPad)

fmt.Printf("before padding copy %d\n", hdr.EntryPad)

// TODO(craciunoiuc): If not copied, counterpart waits infinitely.
// If copied, 1 byte spills over into the next header. Might be a
// problem with volinit.
for i := 0; i < int(hdr.EntryPad); i++ {
padding = append(padding, 0x00)
}
_, err = io.CopyN(conn, bytes.NewReader(padding), int64(len(padding)))
if err != nil {
return err
}
fmt.Println("after padding copy")

if !empty {
if err := resp.waitForOK(conn, "file copy failed"); err != nil {
return err
}
fmt.Println("after file copy check")
}
}

Expand Down
9 changes: 5 additions & 4 deletions internal/cli/kraft/cloud/volume/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type ImportOptions struct {
Token string `noattribute:"true"`
Metro string `noattribute:"true"`

Source string `local:"true" long:"source" short:"s" usage:"Path to the data source (directory, Dockerfile)" default:"."`
VolID string `local:"true" long:"volume" short:"v" usage:"Identifier of an existing volume (name or UUID)"`
Source string `local:"true" long:"source" short:"s" usage:"Path to the data source (directory, Dockerfile)" default:"."`
Timeout uint64 `local:"true" long:"timeout" short:"t" usage:"Timeout for the import process in seconds"`
VolID string `local:"true" long:"volume" short:"v" usage:"Identifier of an existing volume (name or UUID)"`
}

const (
Expand Down Expand Up @@ -145,7 +146,7 @@ func importVolumeData(ctx context.Context, opts *ImportOptions) (retErr error) {
if authStr, err = genRandAuth(); err != nil {
return fmt.Errorf("generating random authentication string: %w", err)
}
instID, instFQDN, err = runVolimport(ctx, icli, volUUID, authStr)
instID, instFQDN, err = runVolimport(ctx, icli, volUUID, authStr, opts.Timeout)
return err
},
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func importVolumeData(ctx context.Context, opts *ImportOptions) (retErr error) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
err = copyCPIO(ctx, conn, authStr, cpioPath, cpioSize, callback)
err = copyCPIO(ctx, conn, authStr, cpioPath, opts.Timeout)
copyCPIOErr = err
return err
},
Expand Down
26 changes: 18 additions & 8 deletions internal/cli/kraft/cloud/volume/import/volimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,23 @@ func volumeSanityCheck(ctx context.Context, cli kcvolumes.VolumesService, volID
}

// runVolimport spawns a volume data import instance with the given volume attached.
func runVolimport(ctx context.Context, cli kcinstances.InstancesService, volUUID, authStr string) (instID, fqdn string, err error) {
func runVolimport(ctx context.Context, cli kcinstances.InstancesService, volUUID, authStr string, timeoutS uint64) (instID, fqdn string, err error) {
args := []string{
"-p", strconv.FormatUint(uint64(volimportPort), 10),
"-a", authStr,
}

if timeoutS > 0 {
// Note(craciunoiuc): Add a 10-second buffer to the timeout.
// This is to allow the client to close the connection first.
// Otherwise there is a chance that the volume becomes corrupted.
args = append(args, "-t", strconv.FormatUint(timeoutS+10, 10))
}

crinstResp, err := cli.Create(ctx, kcinstances.CreateRequest{
Image: "sergiu.unikraft.io/volimport-test:latest",
MemoryMB: ptr(128),
Args: []string{
"-p", strconv.FormatUint(uint64(volimportPort), 10),
"-a", authStr,
"-t", "15",
},
Image: "sergiu.unikraft.io/volimport-test:latest", // TODO(craciunoiuc): Replace with official image before merging
MemoryMB: ptr(128), // TODO(craciunoiuc): see if it works fine with 64
Args: args,
ServiceGroup: &kcinstances.CreateRequestServiceGroup{
Services: []kcservices.CreateRequestService{{
Port: int(volimportPort),
Expand All @@ -65,6 +73,7 @@ func runVolimport(ctx context.Context, cli kcinstances.InstancesService, volUUID
}
inst, err := crinstResp.FirstOrErr()
if err != nil {
// TODO(craciunoiuc): Uncomment code to ensure deletion
// if inst != nil && inst.Name != "" {
// // Delete the instance if it was created but failed to start.
// crdelResp, err := cli.Delete(ctx, inst.UUID)
Expand Down Expand Up @@ -108,6 +117,7 @@ func terminateVolimport(ctx context.Context, icli kcinstances.InstancesService,
return fmt.Errorf("waiting for volume data import instance '%s' to stop: %w", instID, err)
}

// TODO(craciunoiuc): Uncomment to ensure deletion
// delinstResp, err := icli.Delete(ctx, instID)
// if err != nil {
// return fmt.Errorf("deleting volume data import instance '%s': %w", instID, err)
Expand Down

0 comments on commit 8330adb

Please sign in to comment.