From 929cfe64d9f544db43f103c17cc409875ba1a81c Mon Sep 17 00:00:00 2001 From: Mike Auclair Date: Tue, 18 Mar 2025 12:18:40 +0000 Subject: [PATCH 1/4] stream the retrieved get data to disk --- lib/gobuild/gobuild.go | 13 +++++++++---- lib/revproxy/cache.go | 6 +++++- lib/s3util/s3util.go | 9 ++------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/gobuild/gobuild.go b/lib/gobuild/gobuild.go index c024730..4de3b51 100644 --- a/lib/gobuild/gobuild.go +++ b/lib/gobuild/gobuild.go @@ -6,11 +6,11 @@ package gobuild import ( - "bytes" "context" "errors" "expvar" "fmt" + "io" "io/fs" "os" "path" @@ -114,9 +114,14 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath } return "", "", fmt.Errorf("[s3] read action %s: %w", actionID, err) } + defer action.Close() + actionBytes, err := io.ReadAll(action) + if err != nil { + return "", "", err + } // We got an action hit remotely, try to update the local copy. - outputID, mtime, err := parseAction(action) + outputID, mtime, err := parseAction(actionBytes) if err != nil { return "", "", err } @@ -127,6 +132,7 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath // object report it as an error rather than a cache miss. return "", "", fmt.Errorf("[s3] read object %s: %w", outputID, err) } + defer object.Close() s.getFaultHit.Add(1) // Now we should have the body; poke it into the local cache. Preserve the @@ -134,8 +140,7 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath diskPath, err = s.Local.Put(ctx, gocache.Object{ ActionID: actionID, OutputID: outputID, - Size: int64(len(object)), - Body: bytes.NewReader(object), + Body: object, ModTime: mtime, }) return outputID, diskPath, err diff --git a/lib/revproxy/cache.go b/lib/revproxy/cache.go index c5a322e..890ebef 100644 --- a/lib/revproxy/cache.go +++ b/lib/revproxy/cache.go @@ -50,7 +50,11 @@ func (s *Server) cacheLoadS3(ctx context.Context, hash string) ([]byte, http.Hea if err != nil { return nil, nil, err } - return parseCacheObject(data) + byteSlice, err := io.ReadAll(data) + if err != nil { + return nil, nil, err + } + return parseCacheObject(byteSlice) } // cacheStoreS3 returns a task that writes the contents of body to the remote diff --git a/lib/s3util/s3util.go b/lib/s3util/s3util.go index 4b724c5..62a7ebb 100644 --- a/lib/s3util/s3util.go +++ b/lib/s3util/s3util.go @@ -138,13 +138,8 @@ func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) { // GetData returns the contents of the specified key from S3. It is a shorthand // for calling Get followed by io.ReadAll on the result. -func (c *Client) GetData(ctx context.Context, key string) ([]byte, error) { - rc, err := c.Get(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - return io.ReadAll(rc) +func (c *Client) GetData(ctx context.Context, key string) (io.ReadCloser, error) { + return c.Get(ctx, key) } // PutCond writes the specified data to S3 under the given key if the key does From 98844027cde069695ba322f5e8ca69f2614735ae Mon Sep 17 00:00:00 2001 From: Mike Auclair Date: Tue, 18 Mar 2025 17:25:29 +0000 Subject: [PATCH 2/4] wind back the API change and just have the get-from-s3 branch directly call Get --- lib/gobuild/gobuild.go | 10 ++-------- lib/revproxy/cache.go | 6 +----- lib/s3util/s3util.go | 9 +++++++-- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/lib/gobuild/gobuild.go b/lib/gobuild/gobuild.go index 4de3b51..1eb16f5 100644 --- a/lib/gobuild/gobuild.go +++ b/lib/gobuild/gobuild.go @@ -10,7 +10,6 @@ import ( "errors" "expvar" "fmt" - "io" "io/fs" "os" "path" @@ -114,19 +113,14 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath } return "", "", fmt.Errorf("[s3] read action %s: %w", actionID, err) } - defer action.Close() - actionBytes, err := io.ReadAll(action) - if err != nil { - return "", "", err - } // We got an action hit remotely, try to update the local copy. - outputID, mtime, err := parseAction(actionBytes) + outputID, mtime, err := parseAction(action) if err != nil { return "", "", err } - object, err := s.S3Client.GetData(ctx, s.outputKey(outputID)) + object, err := s.S3Client.Get(ctx, s.outputKey(outputID)) if err != nil { // At this point we know the action exists, so if we can't read the // object report it as an error rather than a cache miss. diff --git a/lib/revproxy/cache.go b/lib/revproxy/cache.go index 890ebef..c5a322e 100644 --- a/lib/revproxy/cache.go +++ b/lib/revproxy/cache.go @@ -50,11 +50,7 @@ func (s *Server) cacheLoadS3(ctx context.Context, hash string) ([]byte, http.Hea if err != nil { return nil, nil, err } - byteSlice, err := io.ReadAll(data) - if err != nil { - return nil, nil, err - } - return parseCacheObject(byteSlice) + return parseCacheObject(data) } // cacheStoreS3 returns a task that writes the contents of body to the remote diff --git a/lib/s3util/s3util.go b/lib/s3util/s3util.go index 62a7ebb..4b724c5 100644 --- a/lib/s3util/s3util.go +++ b/lib/s3util/s3util.go @@ -138,8 +138,13 @@ func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) { // GetData returns the contents of the specified key from S3. It is a shorthand // for calling Get followed by io.ReadAll on the result. -func (c *Client) GetData(ctx context.Context, key string) (io.ReadCloser, error) { - return c.Get(ctx, key) +func (c *Client) GetData(ctx context.Context, key string) ([]byte, error) { + rc, err := c.Get(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + return io.ReadAll(rc) } // PutCond writes the specified data to S3 under the given key if the key does From 85f971fafe3953f5bca39a0f2d8c765a7e4dad80 Mon Sep 17 00:00:00 2001 From: Mike Auclair Date: Tue, 18 Mar 2025 18:41:10 +0000 Subject: [PATCH 3/4] wire through contentlength --- lib/gobuild/gobuild.go | 3 ++- lib/modproxy/modproxy.go | 2 +- lib/s3util/s3util.go | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/gobuild/gobuild.go b/lib/gobuild/gobuild.go index 1eb16f5..ff8948d 100644 --- a/lib/gobuild/gobuild.go +++ b/lib/gobuild/gobuild.go @@ -120,7 +120,7 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath return "", "", err } - object, err := s.S3Client.Get(ctx, s.outputKey(outputID)) + object, size, err := s.S3Client.Get(ctx, s.outputKey(outputID)) if err != nil { // At this point we know the action exists, so if we can't read the // object report it as an error rather than a cache miss. @@ -134,6 +134,7 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath diskPath, err = s.Local.Put(ctx, gocache.Object{ ActionID: actionID, OutputID: outputID, + Size: *size, Body: object, ModTime: mtime, }) diff --git a/lib/modproxy/modproxy.go b/lib/modproxy/modproxy.go index 559bc57..889292c 100644 --- a/lib/modproxy/modproxy.go +++ b/lib/modproxy/modproxy.go @@ -158,7 +158,7 @@ func (c *S3Cacher) Get(ctx context.Context, name string) (_ io.ReadCloser, oerr } defer c.sema.Release(1) - obj, err := c.S3Client.Get(ctx, c.makeKey(hash)) + obj, _, err := c.S3Client.Get(ctx, c.makeKey(hash)) if errors.Is(err, fs.ErrNotExist) { c.getFaultMiss.Add(1) return nil, err diff --git a/lib/s3util/s3util.go b/lib/s3util/s3util.go index 4b724c5..d31fdf0 100644 --- a/lib/s3util/s3util.go +++ b/lib/s3util/s3util.go @@ -122,24 +122,24 @@ func (c *Client) Put(ctx context.Context, key string, data io.Reader) error { // close the reader when finished. // // If the key is not found, the resulting error satisfies [fs.ErrNotExist]. -func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) { +func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, *int64, error) { rsp, err := c.Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &c.Bucket, Key: &key, }) if err != nil { if IsNotExist(err) { - return nil, fmt.Errorf("key %q: %w", key, fs.ErrNotExist) + return nil, nil, fmt.Errorf("key %q: %w", key, fs.ErrNotExist) } - return nil, err + return nil, nil, err } - return rsp.Body, nil + return rsp.Body, rsp.ContentLength, nil } // GetData returns the contents of the specified key from S3. It is a shorthand // for calling Get followed by io.ReadAll on the result. func (c *Client) GetData(ctx context.Context, key string) ([]byte, error) { - rc, err := c.Get(ctx, key) + rc, _, err := c.Get(ctx, key) if err != nil { return nil, err } From 060be1d7cb07576347464ebda81f32dffff60efd Mon Sep 17 00:00:00 2001 From: Mike Auclair Date: Tue, 18 Mar 2025 20:13:10 +0000 Subject: [PATCH 4/4] dereference --- lib/gobuild/gobuild.go | 2 +- lib/s3util/s3util.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/gobuild/gobuild.go b/lib/gobuild/gobuild.go index ff8948d..5592019 100644 --- a/lib/gobuild/gobuild.go +++ b/lib/gobuild/gobuild.go @@ -134,7 +134,7 @@ func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID, diskPath diskPath, err = s.Local.Put(ctx, gocache.Object{ ActionID: actionID, OutputID: outputID, - Size: *size, + Size: size, Body: object, ModTime: mtime, }) diff --git a/lib/s3util/s3util.go b/lib/s3util/s3util.go index d31fdf0..84c602d 100644 --- a/lib/s3util/s3util.go +++ b/lib/s3util/s3util.go @@ -122,18 +122,18 @@ func (c *Client) Put(ctx context.Context, key string, data io.Reader) error { // close the reader when finished. // // If the key is not found, the resulting error satisfies [fs.ErrNotExist]. -func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, *int64, error) { +func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, int64, error) { rsp, err := c.Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &c.Bucket, Key: &key, }) if err != nil { if IsNotExist(err) { - return nil, nil, fmt.Errorf("key %q: %w", key, fs.ErrNotExist) + return nil, -1, fmt.Errorf("key %q: %w", key, fs.ErrNotExist) } - return nil, nil, err + return nil, -1, err } - return rsp.Body, rsp.ContentLength, nil + return rsp.Body, *rsp.ContentLength, nil } // GetData returns the contents of the specified key from S3. It is a shorthand