Skip to content

Commit

Permalink
Merge pull request #33 from sudo-bmitch/pr-blob-mount
Browse files Browse the repository at this point in the history
Add blob mount support and handle existing blobs
  • Loading branch information
sudo-bmitch committed Dec 25, 2023
2 parents fa4edde + 8d08756 commit cf73c51
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 32 deletions.
85 changes: 84 additions & 1 deletion blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,20 @@ func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
s.log.Info("failed to get repo", "err", err, "repo", repoStr)
return
}
// TODO: check for mount=digest&from=repo, consider allowing anonymous blob mounts
// check for mount=digest&from=repo, consider allowing anonymous blob mounts
mountStr := r.URL.Query().Get("mount")
fromStr := r.URL.Query().Get("from")
if mountStr != "" && fromStr != "" {
if err := s.blobUploadMount(fromStr, repoStr, mountStr, w, r); err == nil {
return
}
}
// check for digest parameter
bOpts := []store.BlobOpt{}
dStr := r.URL.Query().Get("digest")
if dStr == "" && mountStr != "" {
dStr = mountStr // a failed blob mount is still used to define the digest
}
var d digest.Digest
if dStr != "" {
d, err = digest.Parse(dStr)
Expand All @@ -169,8 +180,21 @@ func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
}
bOpts = append(bOpts, store.BlobWithDigest(d))
}
// create a new blob in the store
bc, err := repo.BlobCreate(bOpts...)
if err != nil {
if errors.Is(err, types.ErrBlobExists) {
// blob exists, indicate it was created and return the location to get
loc, err := url.JoinPath("/v2", repoStr, "blobs", d.String())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Error("failed to build location header", "repo", repoStr, "err", err)
return
}
w.Header().Set("location", loc)
w.WriteHeader(http.StatusCreated)
return
}
w.WriteHeader(http.StatusInternalServerError)
s.log.Error("create blob", "err", err)
return
Expand Down Expand Up @@ -243,6 +267,65 @@ func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
}
}

// blobUploadMount is a handler for blob mount attempts.
// Any errors return the error without writing to the response.
// This allows the registry to fall back to a standard blob push.
// If the return is nil, the location header and created status are first be written to the response.
func (s *Server) blobUploadMount(repoSrcStr, repoTgtStr, digStr string, w http.ResponseWriter, r *http.Request) error {
repoSrc, err := s.store.RepoGet(repoSrcStr)
if err != nil {
return err
}
repoTgt, err := s.store.RepoGet(repoTgtStr)
if err != nil {
return err
}
dig, err := digest.Parse(digStr)
if err != nil {
return err
}
bc, err := repoTgt.BlobCreate(store.BlobWithDigest(dig))
if err != nil {
if errors.Is(err, types.ErrBlobExists) {
// blob exists, indicate it was created and return the location to get
loc, err := url.JoinPath("/v2", repoTgtStr, "blobs", dig.String())
if err != nil {
s.log.Error("failed to build location header", "repo", repoTgtStr, "err", err)
return err
}
w.Header().Set("location", loc)
w.WriteHeader(http.StatusCreated)
return nil
}
return err
}
rdr, err := repoSrc.BlobGet(dig)
if err != nil {
bc.Cancel()
return err
}
// copy content from source repo
_, err = io.Copy(bc, rdr)
if err != nil {
bc.Cancel()
_ = rdr.Close()
return err
}
_ = rdr.Close()
err = bc.Close()
if err != nil {
return err
}
// write the success status and return nil
loc, err := url.JoinPath("/v2", repoTgtStr, "blobs", dig.String())
if err != nil {
return err
}
w.Header().Set("location", loc)
w.WriteHeader(http.StatusCreated)
return nil
}

func (s *Server) blobUploadPatch(repoStr, sessionID string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
blobUploadMu.Lock()
Expand Down
7 changes: 7 additions & 0 deletions internal/store/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ func (dr *dirRepo) BlobCreate(opts ...BlobOpt) (BlobCreator, error) {
return nil, err
}
}
// if blob exists, return the appropriate error
if conf.expect != "" {
_, err := os.Stat(filepath.Join(dr.path, blobsDir, conf.expect.Algorithm().String(), conf.expect.Encoded()))
if err == nil {
return nil, types.ErrBlobExists
}
}
// create a temp file in the repo blob store, under an upload folder
tmpDir := filepath.Join(dr.path, uploadDir)
uploadFH, err := os.Stat(tmpDir)
Expand Down
6 changes: 6 additions & 0 deletions internal/store/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (mr *memRepo) BlobCreate(opts ...BlobOpt) (BlobCreator, error) {
for _, opt := range opts {
opt(&conf)
}
// if blob exists, return the appropriate error
if conf.expect != "" {
if _, ok := mr.blobs[conf.expect]; ok {
return nil, types.ErrBlobExists
}
}
buffer := &bytes.Buffer{}
d := conf.algo.Digester()
w := io.MultiWriter(buffer, d.Hash())
Expand Down
28 changes: 15 additions & 13 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,24 @@ func (s *Server) manifestPut(repoStr, arg string) http.HandlerFunc {
}
// push to blob store
bc, err := repo.BlobCreate(store.BlobWithDigest(d))
if err != nil {
if err != nil && !errors.Is(err, types.ErrBlobExists) {
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to create blob", "repo", repoStr, "arg", arg, "err", err)
return
}
_, err = bc.Write(mRaw)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to write blob", "repo", repoStr, "arg", arg, "err", err)
return
}
err = bc.Close()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to close blob", "repo", repoStr, "arg", arg, "err", err)
return
} else if err == nil {
_, err = bc.Write(mRaw)
if err != nil {
_ = bc.Close()
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to write blob", "repo", repoStr, "arg", arg, "err", err)
return
}
err = bc.Close()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to close blob", "repo", repoStr, "arg", arg, "err", err)
return
}
}
// add entry to index
desc := types.Descriptor{
Expand Down
40 changes: 22 additions & 18 deletions referrer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,19 @@ func (s *Server) referrerAdd(repo store.Repo, subject digest.Digest, desc types.
}
dig := digest.Canonical.FromBytes(iRaw)
bc, err := repo.BlobCreate(store.BlobWithDigest(dig))
if err != nil {
return err
}
_, err = bc.Write(iRaw)
if err != nil {
_ = bc.Close()
if err != nil && !errors.Is(err, types.ErrBlobExists) {
return err
}
err = bc.Close()
if err != nil {
return err
if err == nil {
_, err = bc.Write(iRaw)
if err != nil {
_ = bc.Close()
return err
}
err = bc.Close()
if err != nil {
return err
}
}
// create new descriptor for referrers response to add into index.json
dNew := types.Descriptor{
Expand Down Expand Up @@ -170,17 +172,19 @@ func (s *Server) referrerDelete(repo store.Repo, subject digest.Digest, desc typ
}
dig := digest.Canonical.FromBytes(iRaw)
bc, err := repo.BlobCreate(store.BlobWithDigest(dig))
if err != nil {
return err
}
_, err = bc.Write(iRaw)
if err != nil {
_ = bc.Close()
if err != nil && !errors.Is(err, types.ErrBlobExists) {
return err
}
err = bc.Close()
if err != nil {
return err
if err == nil {
_, err = bc.Write(iRaw)
if err != nil {
_ = bc.Close()
return err
}
err = bc.Close()
if err != nil {
return err
}
}
// create new descriptor for referrers response
dNew := types.Descriptor{
Expand Down
2 changes: 2 additions & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

var (
// ErrBlobExists is returned when attempting to create a blob that already exists.
ErrBlobExists = errors.New("blob exists")
// ErrNotFound is returned when a resource is not found.
ErrNotFound = errors.New("not found")
// ErrRepoNotAllowed is used when a repository name is not permitted.
Expand Down

0 comments on commit cf73c51

Please sign in to comment.