Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions pkg/bucket/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bucket
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (g gcpBucketClient) Exists() (bool, error) {
bucket := gcsClient.Bucket(g.bucket.Spec.Name)
_, err = bucket.Attrs(ctx)
if err != nil {
if err == storage.ErrBucketNotExist {
if isBucketNotFoundError(err) {
return false, nil
}
// Return true for permission errors - unable to determine if bucket exists
Expand Down Expand Up @@ -116,7 +117,7 @@ func (g gcpBucketClient) Delete() (bool, error) {
// Check if bucket exists first
_, err = bucket.Attrs(ctx)
if err != nil {
if err == storage.ErrBucketNotExist {
if isBucketNotFoundError(err) {
// Bucket doesn't exist - idempotent behavior
return true, nil
}
Expand Down Expand Up @@ -391,18 +392,18 @@ func (g gcpBucketClient) deleteObjectsConcurrently(ctx context.Context, bucket *
const batchSize = 100

objects := make(chan string, batchSize)
errors := make(chan error, maxWorkers)
errs := make(chan error, maxWorkers)

// Start workers
for i := 0; i < maxWorkers; i++ {
go func() {
for objName := range objects {
if err := bucket.Object(objName).Delete(ctx); err != nil {
errors <- fmt.Errorf("error deleting object %s: %v", objName, err)
errs <- fmt.Errorf("error deleting object %s: %v", objName, err)
return
}
}
errors <- nil
errs <- nil
}()
}

Expand All @@ -416,7 +417,7 @@ func (g gcpBucketClient) deleteObjectsConcurrently(ctx context.Context, bucket *
break
}
if err != nil {
errors <- fmt.Errorf("error listing objects: %v", err)
errs <- fmt.Errorf("error listing objects: %v", err)
return
}
objects <- objAttrs.Name
Expand All @@ -425,7 +426,7 @@ func (g gcpBucketClient) deleteObjectsConcurrently(ctx context.Context, bucket *

// Wait for all workers to complete
for i := 0; i < maxWorkers; i++ {
if err := <-errors; err != nil {
if err := <-errs; err != nil {
return err
}
}
Expand Down Expand Up @@ -486,7 +487,8 @@ func isGCSRetryableError(err error) bool {
return false
}

if gerr, ok := err.(*googleapi.Error); ok {
var gerr *googleapi.Error
if errors.As(err, &gerr) {
switch gerr.Code {
case http.StatusTooManyRequests: // Too Many Requests
return true
Expand All @@ -513,16 +515,28 @@ func isGCSRetryableError(err error) bool {
return false
}

// isBucketNotFoundError reports whether err indicates a GCS bucket does not exist.
// The GCS library may return storage.ErrBucketNotExist directly, wrap it via fmt.Errorf("%w", ...),
// or surface a *googleapi.Error with HTTP 404 — all three must be handled.
func isBucketNotFoundError(err error) bool {
if errors.Is(err, storage.ErrBucketNotExist) {
return true
}
var gerr *googleapi.Error
return errors.As(err, &gerr) && gerr.Code == http.StatusNotFound
}

// handleGCSError provides consistent error handling for GCS operations
func handleGCSError(err error, operation string, bucketName string) error {
if err == storage.ErrBucketNotExist {
if errors.Is(err, storage.ErrBucketNotExist) {
if operation == "exists" || operation == "delete" {
return nil // Expected for these operations
}
return fmt.Errorf("bucket '%s' not found", bucketName)
}

if gerr, ok := err.(*googleapi.Error); ok {
var gerr *googleapi.Error
if errors.As(err, &gerr) {
switch gerr.Code {
case http.StatusConflict: // Conflict - Bucket already exists
if operation == "create" {
Expand Down
73 changes: 73 additions & 0 deletions pkg/bucket/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package bucket

import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"testing"

"cloud.google.com/go/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -408,3 +411,73 @@ func TestGCSRetryConfig(t *testing.T) {
assert.Equal(t, float64(1), defaultGCSRetryConfig.initialDelay.Seconds())
assert.Equal(t, float64(32), defaultGCSRetryConfig.maxDelay.Seconds())
}

func TestIsBucketNotFoundError(t *testing.T) {
googleapi404 := &googleapi.Error{Code: 404, Message: "The specified bucket does not exist."}
googleapi403 := &googleapi.Error{Code: 403, Message: "Permission Denied"}

tests := []struct {
name string
err error
expected bool
}{
{
name: "exact storage.ErrBucketNotExist sentinel",
err: storage.ErrBucketNotExist,
expected: true,
},
{
name: "storage.ErrBucketNotExist wrapped with %w",
err: fmt.Errorf("outer: %w", storage.ErrBucketNotExist),
expected: true,
},
{
name: "storage.ErrBucketNotExist doubly wrapped",
err: fmt.Errorf("outer: %w", fmt.Errorf("inner: %w", storage.ErrBucketNotExist)),
expected: true,
},
{
name: "GCS library style: both sentinel and googleapi.Error wrapped together",
// Mirrors how cloud.google.com/go/storage PR#11519 wraps the error:
// fmt.Errorf("%w: %w", storage.ErrBucketNotExist, googleapiErr)
err: fmt.Errorf("%w: %w", storage.ErrBucketNotExist, googleapi404),
expected: true,
},
{
name: "bare *googleapi.Error with 404",
err: googleapi404,
expected: true,
},
{
name: "*googleapi.Error with 404 wrapped with %w",
err: fmt.Errorf("wrapped: %w", googleapi404),
expected: true,
},
{
name: "*googleapi.Error with non-404 code",
err: googleapi403,
expected: false,
},
{
name: "unrelated sentinel error",
err: storage.ErrObjectNotExist,
expected: false,
},
{
name: "plain errors.New string error",
err: errors.New("some other error"),
expected: false,
},
{
name: "nil error",
err: nil,
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, isBucketNotFoundError(tt.err))
})
}
}