Skip to content

Commit

Permalink
Chore: Improve error handling
Browse files Browse the repository at this point in the history
Check errors on cache.Delete. And use errors.Join to reveal other errors where appropriate.

Signed-off-by: Brandon Mitchell <git@bmitch.net>
  • Loading branch information
sudo-bmitch committed May 29, 2024
1 parent 0f285cf commit aad61ae
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 40 deletions.
30 changes: 15 additions & 15 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ func (s *Server) blobUploadDelete(repoStr, sessionID string) http.HandlerFunc {
s.log.Error("upload session not found", "repo", repoStr, "sessionID", sessionID)
return
}
bc.Cancel()
err = bc.Cancel()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Info("failed to cancel upload", "err", err, "repo", repoStr, "sessionID", sessionID)
return
}
w.WriteHeader(http.StatusAccepted)
}
}
Expand Down Expand Up @@ -159,7 +164,6 @@ func (s *Server) blobUploadGet(repoStr, sessionID string) http.HandlerFunc {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Error("failed to marshal new state", "err", err)
bc.Cancel()
return
}
state := base64.RawURLEncoding.EncodeToString(stateJSON)
Expand Down Expand Up @@ -251,7 +255,7 @@ func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
}
err = bc.Verify(d)
if err != nil {
bc.Cancel()
_ = bc.Cancel()
w.WriteHeader(http.StatusBadRequest)
_ = types.ErrRespJSON(w, types.ErrInfoBlobUploadInvalid("digest mismatch"))
s.log.Debug("failed to verify blob digest", "repo", repoStr, "digest", d.String(), "err", err)
Expand All @@ -277,7 +281,7 @@ func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Error("failed to marshal new state", "err", err)
bc.Cancel()
_ = bc.Cancel()
return
}
state := base64.RawURLEncoding.EncodeToString(stateJSON)
Expand Down Expand Up @@ -325,24 +329,19 @@ func (s *Server) blobUploadMount(repoSrcStr, repoTgtStr, digStr string, w http.R
}
repoSrc, err := s.store.RepoGet(r.Context(), repoSrcStr)
if err != nil {
bc.Cancel()
return err
return errors.Join(err, bc.Cancel())
}
rdr, err := repoSrc.BlobGet(dig)
repoSrc.Done()
if err != nil {
bc.Cancel()
return err
return errors.Join(err, bc.Cancel())
}
// copy content from source repo
_, err = io.Copy(bc, rdr)
if err != nil {
bc.Cancel()
_ = rdr.Close()
return err
return errors.Join(err, bc.Cancel(), rdr.Close())
}
_ = rdr.Close()
err = bc.Close()
err = errors.Join(rdr.Close(), bc.Close())
if err != nil {
return err
}
Expand Down Expand Up @@ -421,7 +420,6 @@ func (s *Server) blobUploadPatch(repoStr, sessionID string) http.HandlerFunc {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
s.log.Error("failed to marshal new state", "err", err)
bc.Cancel()
return
}
state := base64.RawURLEncoding.EncodeToString(stateJSON)
Expand Down Expand Up @@ -509,7 +507,9 @@ func (s *Server) blobUploadPut(repoStr, sessionID string) http.HandlerFunc {
err = bc.Verify(d)
if err != nil {
s.log.Error("invalid digest", "err", err, "repo", repoStr, "sessionID", sessionID, "expected", bc.Digest().String(), "received", d.String(), "size", bc.Size())
bc.Cancel()
if err = bc.Cancel(); err != nil {
s.log.Error("canceling upload", "err", err, "repo", repoStr, "sessionID", sessionID, "expected", bc.Digest().String(), "received", d.String(), "size", bc.Size())
}
w.WriteHeader(http.StatusBadRequest)
_ = types.ErrRespJSON(w, types.ErrInfoBlobUploadInvalid("invalid digest, expected: "+bc.Digest().String()))
return
Expand Down
20 changes: 16 additions & 4 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ func TestCache(t *testing.T) {
prunedMu.Unlock()
// delete last 2 entries
for i := len(testData) - countDel; i < len(testData); i++ {
c.Delete(i)
err := c.Delete(i)
if err != nil {
t.Errorf("failed to delete %d: %v", i, err)
}
}
if c.IsEmpty() {
t.Errorf("cache is empty after adding entries")
Expand Down Expand Up @@ -145,9 +148,15 @@ func TestCache(t *testing.T) {
}
// set and delete a key
c.Set(42, "x")
c.Delete(42)
err = c.Delete(42)
if err != nil {
t.Errorf("failed to delete key 42: %v", err)
}
// delete non-existent key
c.Delete(42)
err = c.Delete(42)
if err != nil {
t.Errorf("failed to delete missing key 42: %v", err)
}
// delete all entries with a prune function
err = c.DeleteAll()
if !errors.Is(err, errBlocked) {
Expand All @@ -170,7 +179,10 @@ func TestCache(t *testing.T) {
if c2.IsEmpty() {
t.Errorf("cache is empty before deleting entries")
}
c2.DeleteAll()
err = c2.DeleteAll()
if err != nil {
t.Errorf("failed to delete all entries: %v", err)
}
if len(c2.entries) > 0 {
t.Errorf("entries remain after DeleteAll")
}
Expand Down
34 changes: 17 additions & 17 deletions internal/store/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (d *dir) RepoGet(ctx context.Context, repoStr string) (Repo, error) {
log: d.log,
}
uploadCacheOpts := cache.Opts[string, *dirRepoUpload]{
PruneFn: func(_ string, dru *dirRepoUpload) error { dru.delete(); return nil },
PruneFn: func(_ string, dru *dirRepoUpload) error { return dru.delete() },
PrunePreFn: func(_ string, dru *dirRepoUpload) { dru.mu.Lock() },
PrunePostFn: func(_ string, dru *dirRepoUpload) { dru.mu.Unlock() },
}
Expand Down Expand Up @@ -656,8 +656,8 @@ func (dr *dirRepo) gc() error {
}()
// prune an empty repo dir and mark the repo as empty if successful
if *dr.conf.Storage.GC.EmptyRepo && len(dr.index.Manifests) == 0 && dr.uploads.IsEmpty() {
// TODO: switch to a slice of errors instead of a function with a return
errDir := func() error {
errs := []error{}
for _, dir := range []string{
filepath.Join(dr.path, uploadDir),
filepath.Join(dr.path, blobsDir, "sha256"),
Expand All @@ -669,10 +669,10 @@ func (dr *dirRepo) gc() error {
} {
err := os.Remove(dir)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
errs = append(errs, err)
}
}
return nil
return errors.Join(errs...)
}()
if errDir == nil {
dr.exists = false
Expand Down Expand Up @@ -707,43 +707,41 @@ func (dru *dirRepoUpload) Close() error {
defer dru.mu.Unlock()
err := dru.fh.Close()
if err != nil {
_ = os.Remove(dru.filename)
return err // TODO: join multiple errors after 1.19 support is removed
return errors.Join(err, os.Remove(dru.filename))
}
if dru.expect != "" && dru.d.Digest() != dru.expect {
_ = os.Remove(dru.filename)
return fmt.Errorf("digest mismatch, expected %s, received %s", dru.expect, dru.d.Digest())
return errors.Join(fmt.Errorf("digest mismatch, expected %s, received %s", dru.expect, dru.d.Digest()),
os.Remove(dru.filename))
}
// move temp file to blob store
tgtDir := filepath.Join(dru.path, blobsDir, dru.d.Digest().Algorithm().String())
fi, err := os.Stat(tgtDir)
if err == nil && !fi.IsDir() {
_ = os.Remove(dru.filename)
return fmt.Errorf("failed to move file to blob storage, %s is not a directory", tgtDir)
return errors.Join(fmt.Errorf("failed to move file to blob storage, %s is not a directory", tgtDir),
os.Remove(dru.filename))
}
if err != nil {
//#nosec G301 directory permissions are intentionally world readable.
err = os.MkdirAll(tgtDir, 0755)
if err != nil {
_ = os.Remove(dru.filename)
return fmt.Errorf("unable to create blob storage directory %s: %w", tgtDir, err)
return errors.Join(fmt.Errorf("unable to create blob storage directory %s: %w", tgtDir, err),
os.Remove(dru.filename))
}
}
blobName := filepath.Join(tgtDir, dru.d.Digest().Encoded())
err = os.Rename(dru.filename, blobName)
_ = dru.dr.uploads.Delete(dru.sessionID)
err = errors.Join(os.Rename(dru.filename, blobName), dru.dr.uploads.Delete(dru.sessionID))
dru.dr.log.Debug("blob created", "repo", dru.dr.name, "digest", dru.d.Digest().String(), "err", err)
return err
}

// Cancel is used to stop an upload.
func (dru *dirRepoUpload) Cancel() {
func (dru *dirRepoUpload) Cancel() error {
dru.mu.Lock()
defer dru.mu.Unlock()
_ = dru.dr.uploads.Delete(dru.sessionID)
return dru.dr.uploads.Delete(dru.sessionID)
}

func (dru *dirRepoUpload) delete() {
func (dru *dirRepoUpload) delete() error {
dru.w = nil
if dru.fh != nil {
_ = dru.fh.Close()
Expand All @@ -758,6 +756,8 @@ func (dru *dirRepoUpload) delete() {
dru.dr.timeMod = time.Now()
dru.dr.mu.Unlock()
}()
// always return nil, even on errors, to allow entry to be removed from upload session list
return nil
}

// Size reports the number of bytes pushed.
Expand Down
4 changes: 2 additions & 2 deletions internal/store/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,10 @@ func (mru *memRepoUpload) Close() error {
}

// Cancel is used to stop an upload.
func (mru *memRepoUpload) Cancel() {
func (mru *memRepoUpload) Cancel() error {
mru.mu.Lock()
defer mru.mu.Unlock()
_ = mru.mr.uploads.Delete(mru.sessionID)
return mru.mr.uploads.Delete(mru.sessionID)
}

// Size reports the number of bytes pushed.
Expand Down
2 changes: 1 addition & 1 deletion internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type BlobCreator interface {
// WriteCloser is used to push the blob content.
io.WriteCloser
// Cancel is used to stop an upload.
Cancel()
Cancel() error
// Size reports the number of bytes pushed.
Size() int64
// Digest is used to get the current digest of the content.
Expand Down
5 changes: 4 additions & 1 deletion internal/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ func TestStore(t *testing.T) {
if err != nil {
t.Errorf("failed to create new blob: %v", err)
}
bc.Cancel()
err = bc.Cancel()
if err != nil {
t.Errorf("failed canceling upload: %v", err)
}
// verify closed and canceled sessions are no longer available
for i, sessionID := range []string{session1, session2, session3} {
_, err := repo.BlobSession(sessionID)
Expand Down

0 comments on commit aad61ae

Please sign in to comment.