Skip to content

Commit

Permalink
add copy to adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Oct 11, 2020
1 parent 8f8824d commit 87c82b6
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 1 deletion.
1 change: 1 addition & 0 deletions block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Adapter interface {
GetRange(obj ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error)
GetProperties(obj ObjectPointer) (Properties, error)
Remove(obj ObjectPointer) error
Copy(sourceObj, destinationObj ObjectPointer) error
CreateMultiPartUpload(obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (string, error)
UploadPart(obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int64) (string, error)
AbortMultiPartUpload(obj ObjectPointer, uploadID string) error
Expand Down
19 changes: 19 additions & 0 deletions block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,25 @@ func (a *Adapter) Remove(obj block.ObjectPointer) error {
return nil
}

func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", time.Now(), nil, &err)
qualifiedDestinationKey, err := resolveNamespace(destinationObj)
if err != nil {
return err
}
qualifiedSourceKey, err := resolveNamespace(sourceObj)
if err != nil {
return err
}
destinationObjectHandle := a.client.Bucket(qualifiedDestinationKey.StorageNamespace).Object(qualifiedDestinationKey.Key)
sourceObjectHandle := a.client.Bucket(qualifiedSourceKey.StorageNamespace).Object(qualifiedSourceKey.Key)
_, err = destinationObjectHandle.CopierFrom(sourceObjectHandle).Run(a.ctx)
if err != nil {
return fmt.Errorf("Copy: %w", err)
}
return nil
}
func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) {
var err error
defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err)
Expand Down
21 changes: 21 additions & 0 deletions block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ func (l *Adapter) Remove(obj block.ObjectPointer) error {
return os.Remove(p)
}

func (l *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error {
dest := l.getPath(destinationObj.Identifier)
destinationFile, err := os.Create(dest)
if err != nil {
return err
}
defer func() {
_ = destinationFile.Close()
}()
source := l.getPath(sourceObj.Identifier)
sourceFile, err := os.OpenFile(source, os.O_RDONLY, 0755)
defer func() {
_ = sourceFile.Close()
}()
if err != nil {
return err
}
_, err = io.Copy(destinationFile, sourceFile)
return err
}

func (l *Adapter) Get(obj block.ObjectPointer, _ int64) (reader io.ReadCloser, err error) {
p := l.getPath(obj.Identifier)
f, err := os.OpenFile(p, os.O_RDONLY, 0755)
Expand Down
10 changes: 10 additions & 0 deletions block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func (a *Adapter) Remove(obj block.ObjectPointer) error {
return nil
}

func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error {
a.mutex.Lock()
defer a.mutex.Unlock()
destinationKey := getKey(destinationObj)
sourceKey := getKey(sourceObj)
a.data[destinationKey] = a.data[sourceKey]
a.properties[destinationKey] = a.properties[sourceKey]
return nil
}

func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
Expand Down
24 changes: 24 additions & 0 deletions block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,30 @@ func (a *Adapter) Remove(obj block.ObjectPointer) error {
return err
}

func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", time.Now(), nil, &err)

qualifiedDestinationKey, err := resolveNamespace(destinationObj)
if err != nil {
return err
}
qualifiedSourceKey, err := resolveNamespace(sourceObj)
if err != nil {
return err
}
copyObjectParams := &s3.CopyObjectInput{
Bucket: aws.String(qualifiedDestinationKey.StorageNamespace),
Key: aws.String(qualifiedDestinationKey.Key),
CopySource: aws.String(qualifiedSourceKey.StorageNamespace + "/" + qualifiedSourceKey.Key),
}
_, err = a.s3.CopyObject(copyObjectParams)
if err != nil {
a.log().WithError(err).Error("failed to copy S3 object")
}
return err
}

func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) {
var err error
defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err)
Expand Down
4 changes: 4 additions & 0 deletions block/transient/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (a *Adapter) Remove(_ block.ObjectPointer) error {
return nil
}

func (a *Adapter) Copy(_, _ block.ObjectPointer) error {
return nil
}

func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) {
uid := uuid.New()
uploadID := hex.EncodeToString(uid[:])
Expand Down
4 changes: 3 additions & 1 deletion gateway/operations/mock_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (a *mockAdapter) GetProperties(_ block.ObjectPointer) (block.Properties, er
func (a *mockAdapter) Remove(_ block.ObjectPointer) error {
return errors.New(" remove method not implemented in mock adapter")
}

func (a *mockAdapter) Copy(_, _ block.ObjectPointer) error {
return errors.New(" copy method not implemented in mock adapter")
}
func (a *mockAdapter) CreateMultiPartUpload(_ block.ObjectPointer, r *http.Request, _ block.CreateMultiPartUploadOpts) (string, error) {
panic("try to create multipart in mock adaptor")
}
Expand Down

0 comments on commit 87c82b6

Please sign in to comment.