Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IncludeInfoObject flag for terminating the .info in gcs as well #1082

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 35 additions & 37 deletions pkg/gcsstore/gcsservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"hash/crc32"
"io"
"math"
"strconv"
"strings"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -43,6 +42,9 @@ type GCSFilterParams struct {

// Prefix specifies the prefix of which you want to filter object names with.
Prefix string

// IncludeInfoObject indicates whether to include or ignore the info object.
IncludeInfoObject bool
}

// GCSReader implements cloud.google.com/go/storage.Reader.
Expand Down Expand Up @@ -202,8 +204,9 @@ func (service *GCSService) recursiveCompose(ctx context.Context, srcs []string,
// Remove all the temporary composition objects
prefix := fmt.Sprintf("%s_tmp", params.Destination)
filterParams := GCSFilterParams{
Bucket: params.Bucket,
Prefix: prefix,
Bucket: params.Bucket,
Prefix: prefix,
IncludeInfoObject: false,
}

err = service.DeleteObjectsWithFilter(ctx, filterParams)
Expand Down Expand Up @@ -340,57 +343,52 @@ func (service *GCSService) FilterObjects(ctx context.Context, params GCSFilterPa
Versions: false,
}

it := bkt.Objects(ctx, &q)
names := make([]string, 0)
loop:
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}

if strings.HasSuffix(objAttrs.Name, "info") {
continue
}

fileNameParts := strings.Split(objAttrs.Name, "/")
// If the object name does not split on "_", we have a composed object.
// If the object name splits on "_" in to four pieces we
// know the object name we are working with is in the format
// [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter
// these temporary objects is on a delete operation so we can just
// append and continue without worrying about index order
isValidObject := func(name string) (bool, error) {
fileNameParts := strings.Split(name, "/")
fileName := fileNameParts[len(fileNameParts)-1]

split := strings.Split(fileName, "_")

// If the object name does not split on "_", we have a composed object.
// If the object name splits on "_" in to four pieces we
// know the object name we are working with is in the format
// [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter
// these temporary objects is on a delete operation so we can just
// append and continue without worrying about index order

switch len(split) {
case 1:
names = []string{objAttrs.Name}
break loop
case 2:
case 4:
names = append(names, objAttrs.Name)
continue
default:
err := errors.New("invalid filter format for object name")
return nil, err
return false, errors.New("invalid filter format for object name")
}

idx, err := strconv.Atoi(split[1])
return true, nil
}

it := bkt.Objects(ctx, &q)
var names []string

for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}

if len(names) <= idx {
names = append(names, make([]string, idx-len(names)+1)...)
if !params.IncludeInfoObject && strings.HasSuffix(objAttrs.Name, "info") {
continue
}

names[idx] = objAttrs.Name
ok, err := isValidObject(objAttrs.Name)
if err != nil {
return nil, err
}
if ok {
names = append(names, objAttrs.Name)
}
}

return names, nil
Expand Down
172 changes: 166 additions & 6 deletions pkg/gcsstore/gcsservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"net/http"
reflect "reflect"
"testing"

"gopkg.in/h2non/gock.v1"
Expand Down Expand Up @@ -99,8 +100,9 @@ func TestDeleteObjectWithFilter(t *testing.T) {
}

err = service.DeleteObjectsWithFilter(ctx, GCSFilterParams{
Bucket: "test-bucket",
Prefix: "test-prefix",
Bucket: "test-bucket",
Prefix: "test-prefix",
IncludeInfoObject: true,
})

if err != nil {
Expand Down Expand Up @@ -479,6 +481,7 @@ func TestFilterObject(t *testing.T) {

resp := googleBucketResponse{[]googleObjectResponse{
{Name: "test_directory/test-prefix_1"},
{Name: "test_directory/test-prefix_2"},
}}

gock.New("https://storage.googleapis.com").
Expand Down Expand Up @@ -510,16 +513,173 @@ func TestFilterObject(t *testing.T) {
}

objects, err := service.FilterObjects(ctx, GCSFilterParams{
Bucket: "test-bucket",
Prefix: "test-prefix",
Bucket: "test-bucket",
Prefix: "test-prefix",
IncludeInfoObject: false,
})

if err != nil {
t.Errorf("Error: %v", err)
return
}

if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix_1", "test_directory/test-prefix_2"}) {
t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects)
}
}

func TestFilterObject_WithoutChunk(t *testing.T) {
defer gock.Off()

resp := googleBucketResponse{[]googleObjectResponse{
{Name: "test_directory/test-prefix"},
}}

gock.New("https://storage.googleapis.com").
Get("/storage/v1/b/test-bucket/o").
MatchParam("alt", "json").
MatchParam("pageToken", "").
MatchParam("prefix", "test-prefix").
MatchParam("projection", "full").
Reply(200).
JSON(resp)

gock.New("https://accounts.google.com/").
Post("/o/oauth2/token").Reply(200).JSON(map[string]string{
"access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC",
"token_type": "Bearer",
"refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk",
"expiry_date": "1425333671141",
})

ctx := context.Background()
client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo"))
if err != nil {
t.Fatal(err)
return
}

service := GCSService{
Client: client,
}

objects, err := service.FilterObjects(ctx, GCSFilterParams{
Bucket: "test-bucket",
Prefix: "test-prefix",
IncludeInfoObject: false,
})

if err != nil {
t.Errorf("Error: %v", err)
return
}

if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix"}) {
t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects)
}
}

func TestFilterObject_IncludeInfoObject(t *testing.T) {
defer gock.Off()

resp := googleBucketResponse{[]googleObjectResponse{
{Name: "test_directory/test-prefix_1"},
{Name: "test_directory/test-prefix.info"},
{Name: "test_directory/test-prefix_2"},
}}

gock.New("https://storage.googleapis.com").
Get("/storage/v1/b/test-bucket/o").
MatchParam("alt", "json").
MatchParam("pageToken", "").
MatchParam("prefix", "test-prefix").
MatchParam("projection", "full").
Reply(200).
JSON(resp)

gock.New("https://accounts.google.com/").
Post("/o/oauth2/token").Reply(200).JSON(map[string]string{
"access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC",
"token_type": "Bearer",
"refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk",
"expiry_date": "1425333671141",
})

ctx := context.Background()
client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo"))
if err != nil {
t.Fatal(err)
return
}

service := GCSService{
Client: client,
}

objects, err := service.FilterObjects(ctx, GCSFilterParams{
Bucket: "test-bucket",
Prefix: "test-prefix",
IncludeInfoObject: true,
})

if err != nil {
t.Errorf("Error: %v", err)
return
}

if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix_1", "test_directory/test-prefix.info", "test_directory/test-prefix_2"}) {
t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects)
}
}

func TestFilterObject_IncludeInfoObject_NonChunked(t *testing.T) {
defer gock.Off()

resp := googleBucketResponse{[]googleObjectResponse{
{Name: "test_directory/test-prefix"},
{Name: "test_directory/test-prefix.info"},
}}

gock.New("https://storage.googleapis.com").
Get("/storage/v1/b/test-bucket/o").
MatchParam("alt", "json").
MatchParam("pageToken", "").
MatchParam("prefix", "test-prefix").
MatchParam("projection", "full").
Reply(200).
JSON(resp)

gock.New("https://accounts.google.com/").
Post("/o/oauth2/token").Reply(200).JSON(map[string]string{
"access_token": "H3l5321N123sdI4HLY/RF39FjrCRF39FjrCRF39FjrCRF39FjrC_RF39FjrCRF39FjrC",
"token_type": "Bearer",
"refresh_token": "1/smWJksmWJksmWJksmWJksmWJk_smWJksmWJksmWJksmWJksmWJk",
"expiry_date": "1425333671141",
})

ctx := context.Background()
client, err := storage.NewClient(ctx, option.WithHTTPClient(http.DefaultClient), option.WithAPIKey("foo"))
if err != nil {
t.Fatal(err)
return
}

service := GCSService{
Client: client,
}

objects, err := service.FilterObjects(ctx, GCSFilterParams{
Bucket: "test-bucket",
Prefix: "test-prefix",
IncludeInfoObject: true,
})

if err != nil {
t.Errorf("Error: %v", err)
return
}

if len(objects) != 2 {
t.Errorf("Didn't get appropriate amount of objects back: got %v from %v", len(objects), objects)
if !reflect.DeepEqual(objects, []string{"test_directory/test-prefix", "test_directory/test-prefix.info"}) {
t.Errorf("Didn't get appropriate objects back: got %v from %v", len(objects), objects)
}
}
20 changes: 12 additions & 8 deletions pkg/gcsstore/gcsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ func (upload gcsUpload) WriteChunk(ctx context.Context, offset int64, src io.Rea

prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
filterParams := GCSFilterParams{
Bucket: store.Bucket,
Prefix: prefix,
Bucket: store.Bucket,
Prefix: prefix,
IncludeInfoObject: false,
}

names, err := store.Service.FilterObjects(ctx, filterParams)
Expand Down Expand Up @@ -165,8 +166,9 @@ func (upload gcsUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) {

prefix := store.keyWithPrefix(id)
filterParams := GCSFilterParams{
Bucket: store.Bucket,
Prefix: prefix,
Bucket: store.Bucket,
Prefix: prefix,
IncludeInfoObject: false,
}

names, err := store.Service.FilterObjects(ctx, filterParams)
Expand Down Expand Up @@ -261,8 +263,9 @@ func (upload gcsUpload) FinishUpload(ctx context.Context) error {

prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
filterParams := GCSFilterParams{
Bucket: store.Bucket,
Prefix: prefix,
Bucket: store.Bucket,
Prefix: prefix,
IncludeInfoObject: false,
}

names, err := store.Service.FilterObjects(ctx, filterParams)
Expand Down Expand Up @@ -313,8 +316,9 @@ func (upload gcsUpload) Terminate(ctx context.Context) error {
store := upload.store

filterParams := GCSFilterParams{
Bucket: store.Bucket,
Prefix: store.keyWithPrefix(id),
Bucket: store.Bucket,
Prefix: store.keyWithPrefix(id),
IncludeInfoObject: true,
}

err := store.Service.DeleteObjectsWithFilter(ctx, filterParams)
Expand Down
Loading