Permalink
Browse files

Revise design: send md5 with GetAttr rpc.

  • Loading branch information...
1 parent e66c3ae commit f60864f5e3c4c057c045ef50176a8b2fee47fca0 @hanwen hanwen committed Jun 30, 2011
Showing with 379 additions and 266 deletions.
  1. +10 −0 .gitignore
  2. +2 −2 rpcfs/Makefile
  3. +0 −65 rpcfs/chunkedfile.go
  4. +135 −0 rpcfs/diskcache.go
  5. +32 −0 rpcfs/diskcache_test.go
  6. +23 −16 rpcfs/executor.go
  7. +58 −54 rpcfs/fsserver.go
  8. +0 −46 rpcfs/fsserver_test.go
  9. +93 −65 rpcfs/rpcfs.go
  10. +5 −7 termite/fsserver/main.go
  11. +14 −6 termite/rpcfs/main.go
  12. +7 −5 termite/worker/worker.go
View
@@ -0,0 +1,10 @@
+*~
+*.6
+8.out
+.nfs*
+_*
+6.out
+termite/chroot/chroot
+termite/worker/worker
+termite/rpcfs/rpcfs
+termite/fsserver/fsserver
View
@@ -2,12 +2,12 @@
include $(GOROOT)/src/Make.inc
TARG=github.com/hanwen/go-fuse/rpcfs
-DEPS=../fuse
+DEPS=../fuse ../unionfs
GOFILES=fsserver.go \
rpcfs.go \
executor.go \
- chunkedfile.go
+ diskcache.go
include $(GOROOT)/src/Make.pkg
View
@@ -1,65 +0,0 @@
-package rpcfs
-
-import (
- "fmt"
- "log"
- "github.com/hanwen/go-fuse/fuse"
- )
-var _ = fmt.Println
-
-// ReadOnlyFile is for implementing read-only filesystems. This
-// assumes we already have the data in memory.
-type ChunkedFile struct {
- chunks [][]byte
- chunkSize uint32
-
- fuse.DefaultFile
-}
-
-func NewChunkedFile(data [][]byte) *ChunkedFile {
- f := new(ChunkedFile)
- f.chunks = data
-
- if len(data) > 0 {
- f.chunkSize = uint32(len(data[0]))
- }
-
- for i, v := range data {
- if i < len(data)-1 && uint32(len(v)) != f.chunkSize {
- log.Fatal("all chunks should be equal")
- }
- }
- return f
-}
-
-func (me *ChunkedFile) Read(input *fuse.ReadIn, bp fuse.BufferPool) ([]byte, fuse.Status) {
- if me.chunkSize == 0 {
- return []byte{}, fuse.OK
- }
-
- out := bp.AllocBuffer(input.Size)[:0]
-
- i := int(input.Offset / uint64(me.chunkSize))
- off := uint32(input.Offset % uint64(me.chunkSize))
-
- if off + input.Size < me.chunkSize {
- end := off + input.Size
- if end > uint32(len(me.chunks[i])) {
- end = uint32(len(me.chunks[i]))
- }
- return me.chunks[i][off:end], fuse.OK
- }
-
- for ; uint32(len(out)) < input.Size && i < len(me.chunks); i++ {
- end := len(me.chunks[i])
- if end - int(off) > (int(input.Size) - len(out)) {
- end = int(off) + int(input.Size) - len(out)
- }
- oldLen := len(out)
- out = out[:oldLen + end - int(off)]
- copy(out[oldLen:], me.chunks[i][off:end])
- off = 0
- }
-
- return out, fuse.OK
-}
View
@@ -0,0 +1,135 @@
+package rpcfs
+
+import (
+ "os"
+ "fmt"
+ "crypto"
+ "hash"
+ "path/filepath"
+ "log"
+ "io"
+ "io/ioutil"
+
+)
+
+type DiskFileCache struct {
+ dir string
+}
+
+func NewDiskFileCache(d string) *DiskFileCache {
+ if fi, _ := os.Lstat(d); fi == nil {
+ err := os.MkdirAll(d, 0700)
+ if err != nil {
+ panic(err)
+ }
+ }
+ return &DiskFileCache{dir: d}
+}
+
+func HashPath(dir string, md5 []byte) string {
+ s := fmt.Sprintf("%x", md5)
+ prefix := s[:2]
+ name := s[2:]
+ dst := filepath.Join(dir, prefix, name)
+ prefixDir, _ := filepath.Split(dst)
+ if err := os.MkdirAll(prefixDir, 0700); err != nil {
+ log.Fatal("MkdirAll error:", err)
+ }
+ return dst
+}
+
+
+func (me *DiskFileCache) HasHash(hash []byte) bool {
+ p := HashPath(me.dir, hash)
+ _, err := os.Lstat(p)
+ return err == nil
+}
+
+func (me *DiskFileCache) Path(hash []byte) string {
+ return HashPath(me.dir, hash)
+}
+
+type HashWriter struct {
+ hasher hash.Hash
+ dest *os.File
+ hash []byte
+}
+
+func NewHashWriter(dir string, hashfunc crypto.Hash) *HashWriter {
+ me := &HashWriter{}
+ tmp, err := ioutil.TempFile(dir, ".md5temp")
+ if err != nil {
+ panic(err)
+ log.Fatal(err)
+ }
+
+ me.dest = tmp
+ me.hasher = hashfunc.New()
+ return me
+}
+
+func (me *HashWriter) Write(p []byte) (n int, err os.Error) {
+ n, err = me.dest.Write(p)
+ me.hasher.Write(p)
+ return n, err
+}
+
+func (me *HashWriter) Close() os.Error {
+ err := me.dest.Close()
+
+ if err != nil {
+ return err
+ }
+ src := me.dest.Name()
+ dir, _ := filepath.Split(src)
+ sumpath := HashPath(dir, me.hasher.Sum())
+ if fi, _ := os.Lstat(sumpath); fi == nil {
+ err = os.Rename(src, sumpath)
+ } else {
+ os.Remove(src)
+ }
+ return err
+}
+
+
+func (me *DiskFileCache) SavePath(path string) (md5 []byte) {
+ f, err := os.Open(path)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ dup := NewHashWriter(me.dir, crypto.MD5)
+ _, err = io.Copy(dup, f)
+ if err != nil {
+ log.Fatal(err)
+ }
+ err = dup.Close()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return dup.hasher.Sum()
+}
+
+func (me *DiskFileCache) Save(content []byte) (md5 []byte) {
+ h := crypto.MD5.New()
+
+ // TODO: make atomic.
+ h.Write(content)
+ sum := h.Sum()
+ name := me.Path(sum)
+ fi, err := os.Lstat(name)
+ if fi != nil {
+ return sum
+ }
+
+ f, err := os.Create(name)
+ if err != nil {
+ log.Fatal("Create err:", err)
+ }
+ f.Write(content)
+ f.Close()
+
+ log.Printf("saved Hash %x\n", sum)
+ return sum
+}
+
@@ -0,0 +1,32 @@
+package rpcfs
+
+import (
+// "os"
+ "crypto"
+ "testing"
+ "io/ioutil"
+)
+
+func TestDiskCache(t *testing.T) {
+ content := []byte("hello")
+
+ d, _ := ioutil.TempDir("", "")
+
+ cache := NewDiskFileCache(d)
+
+ h := crypto.MD5.New()
+ h.Write(content)
+ checksum := h.Sum()
+
+ f, _ := ioutil.TempFile("", "")
+ f.Write(content)
+ f.Close()
+
+ savedSum := cache.SavePath(f.Name())
+ if string(savedSum) != string(checksum) {
+ t.Fatal("mismatch")
+ }
+ if !cache.HasHash(checksum) {
+ t.Fatal("path gone")
+ }
+}
View
@@ -2,6 +2,7 @@ package rpcfs
import (
"fmt"
+ "path/filepath"
"os"
"log"
"io/ioutil"
@@ -21,6 +22,7 @@ type WorkerTask struct {
fileServer *rpc.Client
mount string
rwDir string
+ cacheDir string
tmpDir string
*Task
@@ -48,7 +50,7 @@ func (me *WorkerTask) Run() os.Error {
}
// TODO - configurable.
- bin := "/tmp/chroot"
+ bin := "termite/chroot/chroot"
cmd := []string{bin, "-dir", me.Task.Dir,
"-uid", fmt.Sprintf("%d", nobody.Uid), "-gid", fmt.Sprintf("%d", nobody.Gid),
me.mount}
@@ -78,28 +80,33 @@ func (me *WorkerTask) Run() os.Error {
return err
}
-func NewWorkerTask(server *rpc.Client, task *Task) (*WorkerTask, os.Error) {
- w := &WorkerTask{}
+
+func NewWorkerTask(server *rpc.Client, task *Task, cacheDir string) (*WorkerTask, os.Error) {
+ w := &WorkerTask{
+ cacheDir: cacheDir,
+ }
tmpDir, err := ioutil.TempDir("", "rpcfs-tmp")
- w.tmpDir = tmpDir
- if err != nil {
- return nil, err
+ type dirInit struct {
+ dst *string
+ val string
}
- w.rwDir = w.tmpDir + "/rw"
- err = os.Mkdir(w.rwDir, 0700)
- if err != nil {
- return nil, err
- }
- w.mount = w.tmpDir + "/mnt"
- err = os.Mkdir(w.mount, 0700)
- if err != nil {
- return nil, err
+
+ for _, v := range []dirInit{
+ dirInit{&w.rwDir, "rw"},
+ dirInit{&w.mount, "mnt"},
+ } {
+ *v.dst = filepath.Join(tmpDir, v.val)
+ err = os.Mkdir(*v.dst, 0700)
+ if err != nil {
+ return nil, err
+ }
}
+
w.Task = task
fs := fuse.NewLoopbackFileSystem(w.rwDir)
- roFs := NewRpcFs(server)
+ roFs := NewRpcFs(server, w.cacheDir)
// High ttl, since all writes come through fuse.
ttl := 100.0
Oops, something went wrong.

0 comments on commit f60864f

Please sign in to comment.