Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS paths; retry on error #3461

Merged
merged 1 commit into from
Oct 8, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 155 additions & 79 deletions util/pkg/vfs/gsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"google.golang.org/api/googleapi"
storage "google.golang.org/api/storage/v1"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kops/util/pkg/hashing"
"net/http"
"os"
"path"
"strings"
"sync"
"time"
)

// GSPath is a vfs path for Google Cloud Storage
Expand All @@ -45,6 +47,22 @@ type GSPath struct {
var _ Path = &GSPath{}
var _ HasHash = &GSPath{}

// gcsReadBackoff is the backoff strategy for GCS read retries
var gcsReadBackoff = wait.Backoff{
Duration: time.Second,
Factor: 1.5,
Jitter: 0.1,
Steps: 4,
}

// gcsWriteBackoff is the backoff strategy for GCS write retries
var gcsWriteBackoff = wait.Backoff{
Duration: time.Second,
Factor: 1.5,
Jitter: 0.1,
Steps: 5,
}

func NewGSPath(client *storage.Service, bucket string, key string) *GSPath {
bucket = strings.TrimSuffix(bucket, "/")
key = strings.TrimPrefix(key, "/")
Expand All @@ -69,14 +87,24 @@ func (p *GSPath) String() string {
}

func (p *GSPath) Remove() error {
err := p.client.Objects.Delete(p.bucket, p.key).Do()
if err != nil {
// TODO: Check for not-exists, return os.NotExist
done, err := RetryWithBackoff(gcsWriteBackoff, func() (bool, error) {
err := p.client.Objects.Delete(p.bucket, p.key).Do()
if err != nil {
// TODO: Check for not-exists, return os.NotExist

return fmt.Errorf("error deleting %s: %v", p, err)
}
return false, fmt.Errorf("error deleting %s: %v", p, err)
}

return nil
return true, nil
})
if err != nil {
return err
} else if done {
return nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return wait.ErrWaitTimeout
}
}

func (p *GSPath) Join(relativePath ...string) Path {
Expand All @@ -91,24 +119,34 @@ func (p *GSPath) Join(relativePath ...string) Path {
}

func (p *GSPath) WriteFile(data []byte) error {
glog.V(4).Infof("Writing file %q", p)
done, err := RetryWithBackoff(gcsWriteBackoff, func() (bool, error) {
glog.V(4).Infof("Writing file %q", p)

md5Hash, err := hashing.HashAlgorithmMD5.Hash(bytes.NewReader(data))
if err != nil {
return err
}
md5Hash, err := hashing.HashAlgorithmMD5.Hash(bytes.NewReader(data))
if err != nil {
return false, err
}

obj := &storage.Object{
Name: p.key,
Md5Hash: base64.StdEncoding.EncodeToString(md5Hash.HashValue),
}
r := bytes.NewReader(data)
_, err = p.client.Objects.Insert(p.bucket, obj).Media(r).Do()
obj := &storage.Object{
Name: p.key,
Md5Hash: base64.StdEncoding.EncodeToString(md5Hash.HashValue),
}
r := bytes.NewReader(data)
_, err = p.client.Objects.Insert(p.bucket, obj).Media(r).Do()
if err != nil {
return false, fmt.Errorf("error writing %s: %v", p, err)
}

return true, nil
})
if err != nil {
return fmt.Errorf("error writing %s: %v", p, err)
return err
} else if done {
return nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return wait.ErrWaitTimeout
}

return nil
}

// To prevent concurrent creates on the same file while maintaining atomicity of writes,
Expand All @@ -134,88 +172,126 @@ func (p *GSPath) CreateFile(data []byte) error {
return p.WriteFile(data)
}

// ReadFile implements Path::ReadFile
func (p *GSPath) ReadFile() ([]byte, error) {
glog.V(4).Infof("Reading file %q", p)

response, err := p.client.Objects.Get(p.bucket, p.key).Download()
if err != nil {
if isGCSNotFound(err) {
return nil, os.ErrNotExist
var ret []byte
done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) {
glog.V(4).Infof("Reading file %q", p)

response, err := p.client.Objects.Get(p.bucket, p.key).Download()
if err != nil {
if isGCSNotFound(err) {
return true, os.ErrNotExist
}
return false, fmt.Errorf("error reading %s: %v", p, err)
}
return nil, fmt.Errorf("error reading %s: %v", p, err)
}
if response == nil {
return nil, fmt.Errorf("no response returned from reading %s", p)
}
defer response.Body.Close()
if response == nil {
return false, fmt.Errorf("no response returned from reading %s", p)
}
defer response.Body.Close()

d, err := ioutil.ReadAll(response.Body)
data, err := ioutil.ReadAll(response.Body)
if err != nil {
return false, fmt.Errorf("error reading %s: %v", p, err)
}
ret = data
return true, nil
})
if err != nil {
return nil, fmt.Errorf("error reading %s: %v", p, err)
return nil, err
} else if done {
return ret, nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
return d, nil
}

// ReadDir implements Path::ReadDir
func (p *GSPath) ReadDir() ([]Path, error) {
prefix := p.key
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
var ret []Path
done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) {
prefix := p.key
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

ctx := context.Background()
var paths []Path
err := p.client.Objects.List(p.bucket).Delimiter("/").Prefix(prefix).Pages(ctx, func(page *storage.Objects) error {
for _, o := range page.Items {
child := &GSPath{
client: p.client,
bucket: p.bucket,
key: o.Name,
md5Hash: o.Md5Hash,
ctx := context.Background()
var paths []Path
err := p.client.Objects.List(p.bucket).Delimiter("/").Prefix(prefix).Pages(ctx, func(page *storage.Objects) error {
for _, o := range page.Items {
child := &GSPath{
client: p.client,
bucket: p.bucket,
key: o.Name,
md5Hash: o.Md5Hash,
}
paths = append(paths, child)
}
return nil
})
if err != nil {
if isGCSNotFound(err) {
return true, os.ErrNotExist
}
paths = append(paths, child)
return false, fmt.Errorf("error listing %s: %v", p, err)
}
return nil
glog.V(8).Infof("Listed files in %v: %v", p, paths)
ret = paths
return true, nil
})
if err != nil {
if isGCSNotFound(err) {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("error listing %s: %v", p, err)
return nil, err
} else if done {
return ret, nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
glog.V(8).Infof("Listed files in %v: %v", p, paths)
return paths, nil
}

// ReadTree implements Path::ReadTree
func (p *GSPath) ReadTree() ([]Path, error) {
// No delimiter for recursive search

prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
var ret []Path
done, err := RetryWithBackoff(gcsReadBackoff, func() (bool, error) {
// No delimiter for recursive search
prefix := p.key
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

ctx := context.Background()
var paths []Path
err := p.client.Objects.List(p.bucket).Prefix(prefix).Pages(ctx, func(page *storage.Objects) error {
for _, o := range page.Items {
key := o.Name
child := &GSPath{
client: p.client,
bucket: p.bucket,
key: key,
md5Hash: o.Md5Hash,
ctx := context.Background()
var paths []Path
err := p.client.Objects.List(p.bucket).Prefix(prefix).Pages(ctx, func(page *storage.Objects) error {
for _, o := range page.Items {
key := o.Name
child := &GSPath{
client: p.client,
bucket: p.bucket,
key: key,
md5Hash: o.Md5Hash,
}
paths = append(paths, child)
}
return nil
})
if err != nil {
if isGCSNotFound(err) {
return true, os.ErrNotExist
}
paths = append(paths, child)
return false, fmt.Errorf("error listing tree %s: %v", p, err)
}
return nil
ret = paths
return true, nil
})
if err != nil {
if isGCSNotFound(err) {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("error listing tree %s: %v", p, err)
return nil, err
} else if done {
return ret, nil
} else {
// Shouldn't happen - we always return a non-nil error with false
return nil, wait.ErrWaitTimeout
}
return paths, nil
}

func (p *GSPath) Base() string {
Expand Down