Skip to content

Commit

Permalink
Merge 6908519 into 77148e5
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Dec 6, 2018
2 parents 77148e5 + 6908519 commit c106b73
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 81 deletions.
37 changes: 13 additions & 24 deletions adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package adder
import (
"context"
"fmt"
"io"
"mime/multipart"
"strings"

Expand Down Expand Up @@ -84,17 +83,17 @@ func (a *Adder) setContext(ctx context.Context) {
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", a.params)

f := &files.MultipartFile{
Mediatype: "multipart/form-data",
Reader: r,
f, err := files.NewFileFromPartReader(r, "multipart/form-data")
if err != nil {
return cid.Undef, err
}
defer f.Close()
return a.FromFiles(ctx, f)
}

// FromFiles adds content from a files.File. The adder will no longer
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.File) (cid.Cid, error) {
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) {
logger.Debugf("adding from files")
a.setContext(ctx)

Expand Down Expand Up @@ -133,23 +132,24 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (cid.Cid, error) {
prefix.MhLength = -1
ipfsAdder.CidBuilder = &prefix

for {
it := f.Entries()
for it.Next() {
select {
case <-a.ctx.Done():
return cid.Undef, a.ctx.Err()
default:
err := addFile(f, ipfsAdder)
if err == io.EOF {
goto FINALIZE
}
if err != nil {
logger.Debugf("ipfsAdder AddFile(%s)", it.Name())

if ipfsAdder.AddFile(it.Name(), it.Node()); err != nil {
logger.Error("error adding to cluster: ", err)
return cid.Undef, err
}
}
}
if it.Err() != nil {
return cid.Undef, it.Err()
}

FINALIZE:
adderRoot, err := ipfsAdder.Finalize()
if err != nil {
return cid.Undef, err
Expand All @@ -162,14 +162,3 @@ FINALIZE:
logger.Infof("%s successfully added to cluster", clusterRoot)
return clusterRoot, nil
}

func addFile(fs files.File, ipfsAdder *ipfsadd.Adder) error {
f, err := fs.NextFile()
if err != nil {
return err
}
defer f.Close()

logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath())
return ipfsAdder.AddFile(f)
}
12 changes: 11 additions & 1 deletion adder/adder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipfs/ipfs-cluster/test"

cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
)

Expand Down Expand Up @@ -105,8 +106,17 @@ func TestAdder_ContextCancelled(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

mr, closer := sth.GetRandFileMultiReader(t, 50000) // 50 MB
lg, closer := sth.GetRandFileReader(t, 50000) // 50 MB
st := sth.GetTreeSerialFile(t)
defer closer.Close()
defer st.Close()

slf := files.DirFrom(map[string]files.Node{
"a": lg,
"b": st,
})
mr := files.NewMultiFileReader(slf, true)

r := multipart.NewReader(mr, mr.Boundary())

p := api.DefaultAddParams()
Expand Down
62 changes: 32 additions & 30 deletions adder/ipfsadd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipfsadd

import (
"context"
"errors"
"fmt"
"io"
gopath "path"
Expand Down Expand Up @@ -278,11 +279,12 @@ func (adder *Adder) addNode(node ipld.Node, path string) error {
}

// AddFile adds the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
return adder.addFile(file)
func (adder *Adder) AddFile(path string, file files.Node) error {
return adder.addFile(path, file)
}

func (adder *Adder) addFile(file files.File) error {
func (adder *Adder) addFile(path string, nd files.Node) error {
defer nd.Close()
if adder.liveNodes >= liveCacheSize {
// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
mr, err := adder.mfsRoot()
Expand All @@ -297,12 +299,12 @@ func (adder *Adder) addFile(file files.File) error {
}
adder.liveNodes++

if file.IsDirectory() {
return adder.addDir(file)
if dir := files.ToDir(nd); dir != nil {
return adder.addDir(path, dir)
}

// case for symlink
if s, ok := file.(*files.Symlink); ok {
if s := files.ToSymlink(nd); s != nil {
sdata, err := unixfs.SymlinkData(s.Target)
if err != nil {
return err
Expand All @@ -315,15 +317,19 @@ func (adder *Adder) addFile(file files.File) error {
return err
}

return adder.addNode(dagnode, s.FileName())
return adder.addNode(dagnode, path)
}
file := files.ToFile(nd)
if file == nil {
return errors.New("expected a regular file")
}

// case for regular file
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
rdr := &progressReader{file: file, out: adder.Out}
rdr := &progressReader{file: file, path: path, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
Expand All @@ -337,17 +343,17 @@ func (adder *Adder) addFile(file files.File) error {
}

// patch it into the root
return adder.addNode(dagnode, file.FileName())
return adder.addNode(dagnode, path)
}

func (adder *Adder) addDir(dir files.File) error {
log.Infof("adding directory: %s", dir.FileName())
func (adder *Adder) addDir(path string, dir files.Directory) error {
log.Infof("adding directory: %s", path)

mr, err := adder.mfsRoot()
if err != nil {
return err
}
err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
err = mfs.Mkdir(mr, path, mfs.MkdirOpts{
Mkparents: true,
Flush: false,
CidBuilder: adder.CidBuilder,
Expand All @@ -356,27 +362,18 @@ func (adder *Adder) addDir(dir files.File) error {
return err
}

for {
file, err := dir.NextFile()
if err != nil && err != io.EOF {
return err
}
if file == nil {
break
}

// Skip hidden files when adding recursively, unless Hidden is enabled.
if files.IsHidden(file) && !adder.Hidden {
log.Infof("%s is hidden, skipping", file.FileName())
it := dir.Entries()
for it.Next() {
fpath := gopath.Join(path, it.Name())
if files.IsHidden(fpath, it.Node()) && !adder.Hidden {
log.Infof("%s is hidden, skipping", fpath)
continue
}
err = adder.addFile(file)
if err != nil {
if err := adder.addFile(fpath, it.Node()); err != nil {
return err
}
}

return nil
return it.Err()
}

// outputDagnode sends dagnode info over the output channel
Expand All @@ -400,7 +397,8 @@ func outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
}

type progressReader struct {
file files.File
file io.Reader
path string
out chan *api.AddedOutput
bytes int64
lastProgress int64
Expand All @@ -413,7 +411,7 @@ func (i *progressReader) Read(p []byte) (int, error) {
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &api.AddedOutput{
Name: i.file.FileName(),
Name: i.path,
Bytes: uint64(i.bytes),
}
}
Expand All @@ -425,3 +423,7 @@ type progressReader2 struct {
*progressReader
files.FileInfo
}

func (i *progressReader2) Read(p []byte) (int, error) {
return i.progressReader.Read(p)
}
23 changes: 12 additions & 11 deletions api/rest/client/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func statusReached(target api.TrackerStatus, gblPinInfo api.GlobalPinInfo) (bool
}

// logic drawn from go-ipfs-cmds/cli/parse.go: appendFile
func makeSerialFile(fpath string, params *api.AddParams) (files.File, error) {
func makeSerialFile(fpath string, params *api.AddParams) (files.Node, error) {
if fpath == "." {
cwd, err := os.Getwd()
if err != nil {
Expand All @@ -373,7 +373,7 @@ func makeSerialFile(fpath string, params *api.AddParams) (files.File, error) {
}
}

return files.NewSerialFile(path.Base(fpath), fpath, params.Hidden, stat)
return files.NewSerialFile(fpath, params.Hidden, stat)
}

// Add imports files to the cluster from the given paths. A path can
Expand All @@ -389,32 +389,33 @@ func (c *defaultClient) Add(
out chan<- *api.AddedOutput,
) error {

addFiles := make([]files.File, len(paths), len(paths))
for i, path := range paths {
u, err := url.Parse(path)
addFiles := make([]files.DirEntry, len(paths), len(paths))
for i, p := range paths {
u, err := url.Parse(p)
if err != nil {
close(out)
return fmt.Errorf("error parsing path: %s", err)
}
var addFile files.File
name := path.Base(p)
var addFile files.Node
if strings.HasPrefix(u.Scheme, "http") {
addFile = files.NewWebFile(u)
name = path.Base(u.Path)
} else {
addFile, err = makeSerialFile(path, params)
addFile, err = makeSerialFile(p, params)
if err != nil {
close(out)
return err
}
}
addFiles[i] = addFile
addFiles[i] = files.FileEntry(name, addFile)
}

sliceFile := files.NewSliceFile("", "", addFiles)
sliceFile := files.NewSliceFile(addFiles)
// If `form` is set to true, the multipart data will have
// a Content-Type of 'multipart/form-data', if `form` is false,
// the Content-Type will be 'multipart/mixed'.
mfr := files.NewMultiFileReader(sliceFile, true)
return c.AddMultiFile(mfr, params, out)
return c.AddMultiFile(files.NewMultiFileReader(sliceFile, true), params, out)
}

// AddMultiFile imports new files from a MultiFileReader. See Add().
Expand Down
8 changes: 3 additions & 5 deletions ipfsconn/ipfshttp/ipfshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package ipfshttp

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -909,10 +908,9 @@ func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error {
defer cancel()
defer ipfs.updateInformerMetric()

r := ioutil.NopCloser(bytes.NewReader(b.Data))
rFile := files.NewReaderFile("", "", r, nil)
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
multiFileR := files.NewMultiFileReader(sliceFile, true)
multiFileR := files.NewMultiFileReader(files.DirFrom(map[string]files.Node{ // IPFS reqs require a wrapping directory
"": files.FileFrom(b.Data),
}), true)
if b.Format == "" {
b.Format = "v0"
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@
},
{
"author": "magik6k",
"hash": "QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC",
"hash": "QmPhx9B9cuaXc4vuw62567BF5NxfpsdD1AVE9HbTn7t1Y6",
"name": "go-ipfs-files",
"version": "1.0.1"
"version": "2.0.1"
},
{
"author": "lanzafame",
Expand Down
26 changes: 18 additions & 8 deletions test/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,32 +97,42 @@ func NewShardingTestHelper() *ShardingTestHelper {
// The total size in ext4 is ~3420160 Bytes = ~3340 kB = ~3.4MB
func (sth *ShardingTestHelper) GetTreeMultiReader(t *testing.T) (*files.MultiFileReader, io.Closer) {
sf := sth.GetTreeSerialFile(t)
slf := files.NewSliceFile("", "", []files.File{sf})
return files.NewMultiFileReader(slf, true), sf

return files.NewMultiFileReader(files.DirFrom(map[string]files.Node{
shardingTestTree: sf,
}), true), sf
}

// GetTreeSerialFile returns a files.SerialFile pointing to the testing
// directory tree (see GetTreeMultiReader).
func (sth *ShardingTestHelper) GetTreeSerialFile(t *testing.T) files.File {
func (sth *ShardingTestHelper) GetTreeSerialFile(t *testing.T) files.Directory {
st := sth.makeTree(t)
sf, err := files.NewSerialFile(shardingTestTree, sth.path(shardingTestTree), false, st)
sf, err := files.NewSerialFile(sth.path(shardingTestTree), false, st)

if err != nil {
t.Fatal(err)
}
return sf
return sf.(files.Directory)
}

// GetRandFileMultiReader creates and returns a MultiFileReader for
// a testing random file of the given size (in kbs). The random
// file is different every time.
func (sth *ShardingTestHelper) GetRandFileMultiReader(t *testing.T, kbs int) (*files.MultiFileReader, io.Closer) {
slf, sf := sth.GetRandFileReader(t, kbs)
return files.NewMultiFileReader(slf, true), sf
}

// GetRandFileReader creates and returns a directory containing a testing
// random file of the given size (in kbs)
func (sth *ShardingTestHelper) GetRandFileReader(t *testing.T, kbs int) (files.Directory, io.Closer) {
st := sth.makeRandFile(t, kbs)
sf, err := files.NewSerialFile("randomfile", sth.path(shardingTestFile), false, st)
sf, err := files.NewSerialFile(sth.path(shardingTestFile), false, st)
if err != nil {
t.Fatal(err)
}
slf := files.NewSliceFile("", "", []files.File{sf})
return files.NewMultiFileReader(slf, true), sf
slf := files.DirFrom(map[string]files.Node{"randomfile": sf})
return slf, sf
}

// Clean deletes any folder and file generated by this helper.
Expand Down

0 comments on commit c106b73

Please sign in to comment.