Skip to content
Permalink
Browse files

filetree/fs: rename/cleanup the FUSE FS

  • Loading branch information
tsileo committed Nov 17, 2019
1 parent 15600af commit b388a9f3f33fab84db4a16d3b73b4d1a9985e4b7
@@ -0,0 +1,7 @@
package main

import "a4.io/blobstash/pkg/filetree/fs"

func main() {
fs.Main()
}
@@ -1,4 +1,4 @@
package main
package fs

import (
"context"
@@ -1,26 +1,19 @@
package main
package fs

import (
"bytes"
"fmt"
"sync"

"a4.io/blobstash/pkg/backend/s3/s3util"
"a4.io/blobstash/pkg/blob"
bcache "a4.io/blobstash/pkg/cache"
"a4.io/blobstash/pkg/client/blobstore"
"a4.io/blobstash/pkg/hashutil"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"golang.org/x/net/context"
)

// cache implements the blobStore interface with a local disk-backed LRU cache
type cache struct {
fs *FS
bs *blobstore.BlobStore
mu sync.Mutex
remoteRefs map[string]string
fs *FS
bs *blobstore.BlobStore
mu sync.Mutex

blobsCache *bcache.Cache
}
@@ -36,16 +29,9 @@ func newCache(fs *FS, bs *blobstore.BlobStore, path string) (*cache, error) {
fs: fs,
bs: bs,
blobsCache: blobsCache,
remoteRefs: map[string]string{},
}, nil
}

func (c *cache) RemoteRefs() map[string]string {
c.mu.Lock()
defer c.mu.Unlock()
return c.remoteRefs
}

// Close implements the io.Closer interface
func (c *cache) Close() error {
return c.blobsCache.Close()
@@ -56,11 +42,6 @@ func (c *cache) Stat(ctx context.Context, hash string) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()

// Check if the blob has already been uploaded to the remote storage
if _, ok := c.remoteRefs[hash]; ok {
return true, nil
}

stat, err := c.bs.Stat(context.TODO(), hash)
if err != nil {
return false, err
@@ -84,51 +65,6 @@ func (c *cache) Put(ctx context.Context, hash string, data []byte) error {
return nil
}

// Get implements the BlobStore interface for filereader.File
func (c *cache) PutRemote(ctx context.Context, hash string, data []byte) error {
if !c.fs.useRemote {
return c.Put(ctx, hash, data)
}
logger.Printf("[remote] uploading %s (%d bytes) to remote", hash, len(data))

c.mu.Lock()
defer c.mu.Unlock()
var err error

if _, ok := c.remoteRefs[hash]; ok {
return nil
}

if err := c.blobsCache.Add(hash, data); err != nil {
return err
}

// Encrypt
data, err = s3util.Seal(c.fs.key, &blob.Blob{Hash: hash, Data: data})
if err != nil {
return err
}
// Re-compute the hash
ehash := hashutil.Compute(data)

// Prepare the upload request
params := &s3.PutObjectInput{
Bucket: aws.String(c.fs.profile.RemoteConfig.Bucket),
Key: aws.String("tmp/" + ehash),
Body: bytes.NewReader(data),
Metadata: map[string]*string{},
}

// Actually upload the blob
if _, err := c.fs.s3.PutObject(params); err != nil {
return err
}

c.remoteRefs[hash] = "tmp/" + ehash

return nil
}

// Get implements the blobStore interface for filereader.File
func (c *cache) Get(ctx context.Context, hash string) ([]byte, error) {
logger.Printf("Cache.Get(%q)\n", hash)
@@ -1,4 +1,4 @@
package main
package fs

import (
"fmt"
@@ -1,4 +1,4 @@
package main
package fs

import (
"bytes"
@@ -1,4 +1,4 @@
package main
package fs // import "a4.io/blobstash/pkg/filetree/fs"

import (
"flag"
@@ -41,7 +41,7 @@ var startedAt = time.Now()

func usage() {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s MOUNTPOINT\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s MOUNTPOINT FSNAME\n", os.Args[0])
flag.PrintDefaults()
}

@@ -61,13 +61,11 @@ const (
otherExecute
)

func main() {
func Main() {
// Scans the arg list and sets up flags
//debug := flag.Bool("debug", false, "print debugging messages.")
resetCache := flag.Bool("reset-cache", false, "remove the local cache before starting.")
syncDelay := flag.Duration("sync-delay", 5*time.Minute, "delay to wait after the last modification to initate a sync")
forceRemote := flag.Bool("force-remote", false, "force fetching data blobs from object storage")
disableRemote := flag.Bool("disable-remote", false, "disable fetching data blobs from object storage")
configFile := flag.String("config-file", filepath.Join(pathutil.ConfigDir(), "fs_client.yaml"), "confg file path")
configProfile := flag.String("config-profile", "default", "config profile name")

@@ -157,18 +155,7 @@ func main() {
fmt.Printf("invalid BLOBS_API_HOST")
os.Exit(1)
}

var useRemote bool
switch {
case *disableRemote, !caps.ReplicationEnabled:
if *forceRemote {
logger.Printf("WARNING: disabling remote as server does not support it\n")
}
case *forceRemote:
useRemote = true
case isHostLocal:
useRemote = isHostLocal
}
fmt.Printf("isHostLocal=%v\n", isHostLocal)

c, err := fuse.Mount(
mountpoint,
@@ -205,15 +192,14 @@ func main() {
freaderCache: freaderCache,
atCache: atCache,
caps: caps,
useRemote: useRemote,
}
blobfs.bs, err = newCache(blobfs, bs, cacheDir)
if err != nil {
log.Fatal(err)
}
defer blobfs.bs.(*cache).Close()

logger.Printf("caps=%+v use_remote=%v\n", caps, useRemote)
logger.Printf("caps=%+v\n", caps)

go func() {
ticker := time.NewTicker(45 * time.Second)
@@ -279,8 +265,7 @@ type FS struct {
caps *clientutil.Caps

// config profile
profile *profile
useRemote bool
profile *profile

// S3 client and key
s3 *s3.S3
@@ -1321,14 +1306,7 @@ func (f *file) Reader() (fileReader, error) {
}

// Instanciate the filereader
var fr preloadableFileReader
logger.Printf("use_remote=%v remote_refs=%+v\n", f.fs.useRemote, n.RemoteRefs)
if f.fs.useRemote && n.RemoteRefs != nil {
logger.Println("opening file with remote")
fr = filereader.NewFileRemote(context.Background(), f.fs.bs, meta, n.RemoteRefs, f.fs.freaderCache)
} else {
fr = filereader.NewFile(context.Background(), f.fs.bs, meta, f.fs.freaderCache)
}
fr := filereader.NewFile(context.Background(), f.fs.bs, meta, f.fs.freaderCache)

// FIXME(tsileo): test if preloading is worth it
// fr.PreloadChunks()
@@ -1,4 +1,4 @@
package main
package fs

import (
"time"
@@ -1,4 +1,4 @@
package main
package fs

import (
"context"
@@ -1,4 +1,4 @@
package main
package fs

import (
"context"

0 comments on commit b388a9f

Please sign in to comment.
You can’t perform that action at this time.