Skip to content
Abstract File Storage
Go
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
archive
asset added cache service Dec 2, 2019
base
cache
example
file renamed Matcher func to Match, introduced matcher interface Oct 1, 2019
http reformatted Sep 20, 2019
matcher streamline exists, object operations Oct 9, 2019
mem added cache service Dec 2, 2019
modifier
object updated doc Aug 22, 2019
option added option.Proxy Dec 20, 2019
scp updated storager interface Nov 24, 2019
ssh map ssh to scp provider, spelling fix Oct 1, 2019
storage updated storager interface Nov 24, 2019
tar updated storager interface Nov 24, 2019
url added url.JoinUNC Jan 19, 2020
walker renamed checksum to skipchecksum Nov 1, 2019
zip updated storager interface Nov 24, 2019
.gitignore reformatted Oct 4, 2019
CHANGELOG.md added url.JoinUNC Jan 19, 2020
LICENSE.txt
NOTICE.txt initial commit Aug 19, 2019
README.md updated doc, Oct 29, 2019
TODO.md
copy.go
copy_test.go reformatted Oct 2, 2019
coverage.out updated test Sep 4, 2019
coverage_badge.png updated tests Sep 4, 2019
doc.go initial commit Aug 19, 2019
example_test.go rename afs.Service variable in examples Oct 15, 2019
faker.go initial commit Aug 19, 2019
faker_test.go
init.go
list.go
list_test.go added recursive list operation support Oct 4, 2019
move.go added auth tracker Oct 25, 2019
move_test.go patched cross manager single asset move Aug 27, 2019
registry.go map ssh to scp provider, spelling fix Oct 1, 2019
service.go
service_test.go added recursive list operation support Oct 4, 2019
uploader.go
uploader_test.go updated gocard issues Sep 20, 2019
walker.go added cache service Dec 2, 2019
walker_test.go reformatted Aug 22, 2019

README.md

afs - abstract file storage

GoReportCard GoDoc goversion-image gopherbadger-tag-do-not-edit

Please refer to CHANGELOG.md if you encounter breaking changes.

Motivation

When dealing with various storage systems, like cloud storage, SCP, container or local file system, using shared API for typical storage operation provides an excellent simplification. What's more, the ability to simulate storage-related errors like Auth or EOF allows you to test an app error handling.

Introduction

This library uses a storage manager abstraction to provide an implementation for a specific storage system with following

  • CRUD Operation:
List(ctx context.Context, URL string, options ...Option) ([]Object, error)

Walk(ctx context.Context, URL string, handler OnVisit, options ...Option) error

Download(ctx context.Context, object Object, options ...Option) (io.ReadCloser, error)

DownloadWithURL(ctx context.Context, URL string, options ...Option) (io.ReadCloser, error)

Upload(ctx context.Context, URL string, mode os.FileMode, reader io.Reader, options ...Option) error

Create(ctx context.Context, URL string, mode os.FileMode, isContainer bool, options ...Option) error

Delete(ctx context.Context, URL string, options ...Option) error
  • Batch uploader:
type Upload func(ctx context.Context, parent string, info os.FileInfo, reader io.Reader) error
 
Uploader(ctx context.Context, URL string, options ...Option) (Upload, io.Closer, error)
  • Utilities:
Copy(ctx context.Context, sourceURL, destURL string, options ...Option) error

Move(ctx context.Context, sourceURL, destURL string, options ...Option) error

URL scheme is used to identify storage system, or alternatively relative/absolute path can be used for local file storage. By default all operations using the same baseURL share the same corresponding storage manager instance. For example, instead supplying SCP auth details for all operations, auth option can be used only once.

func main() {

    ctx := context.Background()
    {
        //auth with first call 
        fs := afs.New()
        defer fs.Close()
        keyAuth, err := scp.LocalhostKeyAuth("")
        if err != nil {
           log.Fatal(err)
        }
        reader1, err := fs.DownloadWithURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
        if err != nil {
               log.Fatal(err)
        }
        ...
        reader2, err := fs.DownloadWithURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
    }
    
    {
        //auth per baseURL 
        fs := afs.New()
        err = fs.Init(ctx, "scp://host1:22/", keyAuth)
        if err != nil {
            log.Fatal(err)
        }
        defer fs.Destroy("scp://host1:22/")
        reader, err := fs.DownloadWithURL(ctx, "scp://host1:22/myfolder/asset.txt")
     }
}

Usage

  • Downloading location content
func main() {
	
    fs := afs.New()
    ctx := context.Background()
    objects, err := fs.List(ctx, "/tmp/folder")
    if err != nil {
        log.Fatal(err)
    }
    for _, object := range objects {
        fmt.Printf("%v %v\n", object.Name(), object.URL())
        if object.IsDir() {
            continue
        }
        reader, err := fs.Download(ctx, object)
        if err != nil {
            log.Fatal(err)
        }
        data, err := ioutil.ReadAll(reader)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("%s\n", data)
    }
}
  • Uploading Content
func main() {
	
    fs := afs.New()
    ctx := context.Background()
    keyAuth, err := scp.LocalhostKeyAuth("")
    if err != nil {
        log.Fatal(err)
    }
    err  = fs.Init(ctx, "scp://127.0.0.1:22/", keyAuth)
    if err != nil {
        log.Fatal(err)
    }	
    err = fs.Upload(ctx, "scp://127.0.0.1:22/folder/asset.txt", 0644, strings.NewReader("test me"))
    if err != nil {
        log.Fatal(err)
    }
    ok, err := fs.Exists(ctx, "scp://127.0.0.1:22/folder/asset.txt")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("has file: %v\n", ok)
    _ = fs.Delete(ctx, "scp://127.0.0.1:22/folder/asset.txt")
}
  • Data Copy
func main() {

    fs := afs.New()
    ctx := context.Background()
    keyAuth, err := scp.LocalhostKeyAuth("")
    if err != nil {
        log.Fatal(err)
    }
    err = fs.Copy(ctx, "s3://mybucket/myfolder", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
    if err != nil {
        log.Fatal(err)
    }
}
  • Archiving content
func main() {
	
    secretPath := path.Join(os.Getenv("HOME"), ".secret", "gcp-e2e.json")
    auth, err := gs.NewJwtConfig(option.NewLocation(secretPath))
    if err != nil {
        return
    }
    sourceURL := "mylocalPath/"
    destURL := "gs:mybucket/test.zip/zip://localhost/dir1"
    fs := afs.New()
    ctx := context.Background()
    err = fs.Copy(ctx, sourceURL, destURL, option.NewDest(auth))
    if err != nil {
        log.Fatal(err)
    }

}	
  • Archive Walker

Walker can be created for tar or zip archive.

func main() {
	
    ctx := context.Background()
	fs := afs.New()
	walker := tar.NewWalker(s3afs.New())
	err := fs.Copy(ctx, "/tmp/test.tar", "s3:///dest/folder/test", walker)
	if err != nil {
		log.Fatal(err)
	}
  • Archive Uploader

Uploader can be created for tar or zip archive.

func main() {
	
    ctx := context.Background()
	fs := afs.New()
	uploader := zip.NewBatchUploader(gsafs.New())
	err := fs.Copy(ctx, "gs:///tmp/test/data", "/tmp/data.zip", uploader)
	if err != nil {
		log.Fatal(err)
	}
}
  • Data Move
func main() {
	
    fs := afs.New()
	ctx := context.Background()
	keyAuth, err := scp.LocalhostKeyAuth("")
	if err != nil {
		log.Fatal(err)
	}
	err = fs.Move(ctx, "/tmp/transient/app", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
	if err != nil {
		log.Fatal(err)
	}
}	
  • Batch Upload
func main() {
	
    fs := afs.New()
	ctx := context.Background()
	upload, closer, err := fs.Uploader(ctx, "/tmp/clone")
	if err != nil {
		log.Fatal(err)
	}
	defer closer.Close()
	assets := []*asset.Resource{
		asset.NewFile("asset1.txt", []byte("test 1"), 0644),
		asset.NewFile("asset2.txt", []byte("test 2"), 0644),
		asset.NewDir("folder1", file.DefaultDirOsMode),
		asset.NewFile("folder1/asset1.txt", []byte("test 3"), 0644),
		asset.NewFile("folder1/asset2.txt", []byte("test 4"), 0644),
	}
	for _, asset := range assets {
		relative := ""
		var reader io.Reader
		if strings.Contains(asset.Name, "/") {
			relative, _ = path.Split(asset.Name)
		}
		if ! asset.Dir {
			reader = bytes.NewReader(asset.Data)
		}
		err = upload(ctx, relative, asset.Info(), reader)
		if err != nil {
			log.Fatal(err)
		}
	}
}

Matchers

To filter source content you can use Matcher option. The following have been implemented.

Basic Matcher

func main() {
	
    matcher, err := NewBasic("/data", ".avro", nil)
    fs := afs.New()
    ctx := context.Background()
    err := fs.Copy(ctx, "/tmp/data", "s3://mybucket/data/", matcher.Match)
    if err != nil {
        log.Fatal(err)
    }
}

Filepath matcher

OS style filepath match, with the following terms:

  • '*' matches any sequence of non-Separator characters
  • '?' matches any single non-Separator character
  • '[' [ '^' ] { character-range } ']'
func main() {
	
    matcher := matcher.Filepath("*.avro")
    fs := afs.New()
    ctx := context.Background()
    err := fs.Copy(ctx, "/tmp/data", "gs://mybucket/data/", matcher)
    if err != nil {
        log.Fatal(err)
    }
}	
		

Ignore Matcher

Ignore matcher represents matcher that matches file that are not in the ignore rules. The syntax of ignore borrows heavily from that of .gitignore; see https://git-scm.com/docs/gitignore or man gitignore for a full reference.

func mian(){
	
	ignoreMatcher, err := matcher.NewIgnore([]string{"*.txt", ".ignore"})
  	//or matcher.NewIgnore(option.NewLocation(".cloudignore"))
	if err != nil {
		log.Fatal(err)
	}
	fs := afs.New()
	ctx := context.Background()
	objects, err := fs.List(ctx, "/tmp/folder", ignoreMatcher.Match)
	if err != nil {
		log.Fatal(err)
	}
	for _, object := range objects {
		fmt.Printf("%v %v\n", object.Name(), object.URL())
		if object.IsDir() {
			continue
		}
	}
}	

Modification Time Matcher

Modification Time Matcher represents matcher that matches file that were modified either before or after specified time.

func mian(){
	
	before, err := toolbox.TimeAt("2 days ago in UTC")
    if err != nil {
		log.Fatal(err)
	}	
	modTimeMatcher, err := matcher.NewModification(before, nil)
	if err != nil {
		log.Fatal(err)
	}
	fs := afs.New()
	ctx := context.Background()
	objects, err := fs.List(ctx, "/tmp/folder", modTimeMatcher.Match)
	if err != nil {
		log.Fatal(err)
	}
	for _, object := range objects {
		fmt.Printf("%v %v\n", object.Name(), object.URL())
		if object.IsDir() {
			continue
		}
	}
}	

Content modifiers

To modify resource content on the fly you can use Modifier option.

func main() {
	fs := afs.New()
	ctx := context.Background()
	sourceURL := "file:/tmp/app.war/zip://localhost/WEB-INF/classes/config.properties"
	destURL := "file:/tmp/app.war/zip://localhost/"
	err := fs.Copy(ctx, sourceURL, destURL, modifier.Replace(map[string]string{
		"${changeMe}": os.Getenv("USER"),
	}))
	if err != nil {
		log.Fatal(err)
	}
}
package main

import (
	"bytes"
	"context"
	"log"
	"github.com/viant/afs"
	"io"
	"fmt"
	"io/ioutil"
	"os"
	"strings"
)

func modifyContent(info os.FileInfo, reader io.ReadCloser) (closer io.ReadCloser, e error) {
   if strings.HasSuffix(info.Name() ,".info") {
       data, err := ioutil.ReadAll(reader)
       if err != nil {
           return nil, err
       }
       _ = reader.Close()
       expanded := strings.Replace(string(data), "$os.User", os.Getenv("USER"), 1)
       reader = ioutil.NopCloser(strings.NewReader(expanded))
   }
   return reader, nil
}                           

func main() {

    fs := afs.New()
    reader ,err := fs.DownloadWithURL(context.Background(), "s3://mybucket/meta.info", modifyContent)
    if err != nil {
        log.Fatal(err)	
    }
    
    defer reader.Close()
    content, err := ioutil.ReadAll(reader)
    if err != nil {
        log.Fatal(err)	
    }
    fmt.Printf("content: %s\n", content)
	
}

Streaming data

Streaming data allows data reading and uploading in chunks with small memory footprint.

    jwtConfig, err := gs.NewJwtConfig()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	fs := afs.New()
	sourceURL := "gs://myBucket/path/myasset.gz"
	reader, err := fs.DownloadWithURL(ctx, sourceURL, jwtConfig, option.NewStream(64*1024*1024, 0))
	if err != nil {
		log.Fatal(err)
	}
    
	_ = os.Setenv("AWS_SDK_LOAD_CONFIG", "true")
	destURL := "s3://myBucket/path/myasset.gz"
	err = fs.Upload(ctx, destURL, 0644, reader, &option.Checksum{Skip:true})
	if err != nil {
		log.Fatal(err)
		return
	}

Options

To control number and position of listed resources you can yse page option.

Provider specific timeout.

Provides user/password auth.

  • Source & Dest Options

Groups options by source or destination options. This options work with Copy or Move operations.

func main() {
	
    fs := afs.New()
    secretPath :=  path.Join(os.Getenv("HOME"), ".secret","gcp.json")
    jwtConfig, err := gs.NewJwtConfig(option.NewLocation(secretPath))
    if err != nil {
    	log.Fatal(err)
    }
    sourceOptions := option.NewSource(jwtConfig)
    authConfig, err := s3.NewAuthConfig(option.NewLocation("aws.json"))
    if err != nil {
        log.Fatal(err)
    }
    destOptions := option.NewDest(authConfig)
	err = fs.Copy(ctx, "gs://mybucket/data", "s3://mybucket/data",  sourceOptions, destOptions)
}
  • option.Checksum skip computing checksum if Skip is set, this option allows streaming upload in chunks
  • option.Stream: download reader reads data with specified stream PartSize

Check out storage manager for additional options.

Storage Implementations

Testing fs

To unit test all storage operation all in memory you can use faker fs.

In addition you can use error options to test exception handling.

  • DownloadError
func mian() {
	fs := afs.NewFaker()
	ctx := context.Background()
	err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewUploadError(io.EOF))
	if err != nil {
		log.Fatalf("expect upload error: %v", err)
	}
}
  • ReaderError
func mian() {
    fs := afs.NewFaker()
	ctx := context.Background()
	err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewDownloadError(io.EOF))
	if err != nil {
		log.Fatal(err)
	}
	_, err = fs.DownloadWithURL(ctx, "gs://myBucket/folder/asset.txt")
	if err != nil {
		log.Fatalf("expect download error: %v", err)
	}
}
  • UploadError
func mian() {
    fs := afs.NewFaker()
    ctx := context.Background()
    err := fs.Upload(ctx, "gs://myBucket/folder/asset.txt", 0, strings.NewReader("some data"), option.NewUploadError(io.EOF))
    if err != nil {
        log.Fatalf("expect upload error: %v", err)
    }
}

Test setup utilities

Package asset defines basic utilities to quickly manage asset related unit tests.

func Test_XXX(t *testing.T) {

    var useCases = []struct {
		description string
		location    string
		options     []storage.Option
		assets      []*asset.Resource
	}{

	}

	ctx := context.Background()
	for _, useCase := range useCases {
		fs := afs.New()
		mgr, err := afs.Manager(useCase.location, useCase.options...)
		if err != nil {
			log.Fatal(err)
		}
		err = asset.Create(mgr, useCase.location, useCase.assets)
		if err != nil {
			log.Fatal(err)
		}
		
		//... actual app logic

		actuals, err := asset.Load(mgr, useCase.location)
		if err != nil {
			log.Fatal(err)
		}
        for _, expect := range useCase.assets {
            actual, ok := actuals[expect.Name]
            if !assert.True(t, ok, useCase.description+": "+expect.Name+fmt.Sprintf(" - actuals: %v", actuals)) {
                continue
            }
            assert.EqualValues(t, expect.Name, actual.Name, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Mode, actual.Mode, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Dir, actual.Dir, useCase.description+" "+expect.Name)
            assert.EqualValues(t, expect.Data, actual.Data, useCase.description+" "+expect.Name)
        }

		_ = asset.Cleanup(mgr, useCase.location)

	}
}

GoCover

GoCover

License

The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE.

Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.

Credits and Acknowledgements

Library Author: Adrian Witas

You can’t perform that action at this time.