Skip to content

Commit

Permalink
Merge pull request #253 from jmorganca/upload
Browse files Browse the repository at this point in the history
use a pipe to push to registry with progress
  • Loading branch information
mxyng committed Aug 3, 2023
2 parents 85aeb42 + a71ff3f commit 29b897f
Showing 1 changed file with 41 additions and 54 deletions.
95 changes: 41 additions & 54 deletions server/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,71 +906,58 @@ func uploadBlobChunked(mp ModelPath, url string, layer *Layer, regOpts *Registry
return err
}

headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream"
totalUploaded := 0

chunkSize := 1 << 20
buf := make([]byte, chunkSize)
var totalUploaded int
r, w := io.Pipe()
defer r.Close()

for {
n, err := f.Read(buf)
if err != nil {
return err
}

headers["Content-Length"] = fmt.Sprintf("%d", n)
headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1)

fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
})
go func() {
defer w.Close()
for {
n, err := io.CopyN(w, f, 1024*1024)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error copying pipe: %v", err),
Digest: layer.Digest,
Total: layer.Size,
Completed: totalUploaded,
})
return
}

// change the buffersize for the last chunk
if n < chunkSize {
buf = buf[:n]
}
resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts)
if err != nil {
log.Printf("couldn't upload blob: %v", err)
return err
}
defer resp.Body.Close()
url = resp.Header.Get("Location")
totalUploaded += int(n)

// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
if resp.StatusCode != http.StatusAccepted {
fn(api.ProgressResponse{
Status: "error uploading layer",
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
Total: layer.Size,
Completed: totalUploaded,
})
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on layer upload registry responded with code %d: %v", resp.StatusCode, string(body))

if totalUploaded >= layer.Size {
return
}
}
}()

totalUploaded += n
if totalUploaded >= layer.Size {
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)

// finish the upload
resp, err := makeRequest("PUT", url, nil, nil, regOpts)
if err != nil {
log.Printf("couldn't finish upload: %v", err)
return err
}
defer resp.Body.Close()
headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream"
headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1)
headers["Content-Length"] = strconv.Itoa(int(layer.Size))

if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
break
}
// finish the upload
resp, err := makeRequest("PUT", url, headers, r, regOpts)
if err != nil {
log.Printf("couldn't finish upload: %v", err)
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
return nil
}
Expand Down

0 comments on commit 29b897f

Please sign in to comment.