Skip to content

Commit

Permalink
Merge pull request #698 from pachyderm/better_putfile
Browse files Browse the repository at this point in the history
Better putfile
  • Loading branch information
jdoliner committed Aug 4, 2016
2 parents 9f3a311 + fd6337f commit 2bd83e6
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 163 deletions.
22 changes: 22 additions & 0 deletions src/client/pfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,28 @@ func (c APIClient) PutFileWithDelimiter(repoName string, commitID string, path s
return int(written), err
}

// PutFileURL puts a file using the content found at a URL.
// The URL is sent to the server which performs the request.
func (c APIClient) PutFileURL(repoName string, commitID string, path string, url string) (retErr error) {
putFileClient, err := c.PfsAPIClient.PutFile(context.Background())
if err != nil {
return sanitizeErr(err)
}
defer func() {
if _, err := putFileClient.CloseAndRecv(); err != nil && retErr == nil {
retErr = sanitizeErr(err)
}
}()
if err := putFileClient.Send(&pfs.PutFileRequest{
File: NewFile(repoName, commitID, path),
FileType: pfs.FileType_FILE_TYPE_REGULAR,
Url: url,
}); err != nil {
return sanitizeErr(err)
}
return nil
}

// GetFile returns the contents of a file at a specific Commit.
// offset specifies a number of bytes that should be skipped in the beginning of the file.
// size limits the total amount of data returned, note you will get fewer bytes
Expand Down
272 changes: 137 additions & 135 deletions src/client/pfs/pfs.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/client/pfs/pfs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ message PutFileRequest {
bytes value = 3;
string handle = 4;
Delimiter delimiter = 5;
string url = 6;
}

message InspectFileRequest {
Expand Down
17 changes: 17 additions & 0 deletions src/server/pachyderm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,23 @@ func TestPipelineWithFullObjects(t *testing.T) {
require.Equal(t, "foo\nbar\n", buffer.String())
}

func TestPutFileURL(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
t.Parallel()
c := getPachClient(t)
repo := uniqueString("TestPutFileURL")
require.NoError(t, c.CreateRepo(repo))
_, err := c.StartCommit(repo, "", "master")
require.NoError(t, err)
require.NoError(t, c.PutFileURL(repo, "master", "readme", "https://raw.githubusercontent.com/pachyderm/pachyderm/master/README.md"))
require.NoError(t, c.FinishCommit(repo, "master"))
fileInfo, err := c.InspectFile(repo, "master", "readme", "", false, nil)
require.NoError(t, err)
require.True(t, fileInfo.SizeBytes > 0)
}

func getPachClient(t *testing.T) *client.APIClient {
client, err := client.NewFromAddress("0.0.0.0:30650")
require.NoError(t, err)
Expand Down
102 changes: 81 additions & 21 deletions src/server/pfs/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package cmds
import (
"errors"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"text/tabwriter"

"golang.org/x/sync/errgroup"

"github.com/pachyderm/pachyderm/src/client"
pfsclient "github.com/pachyderm/pachyderm/src/client/pfs"
"github.com/pachyderm/pachyderm/src/server/pfs/fuse"
Expand Down Expand Up @@ -349,44 +352,87 @@ Files can be read from finished commits with get-file.`,
}

var filePath string
var recursive bool
var commitFlag bool
putFile := &cobra.Command{
Use: "put-file repo-name commit-id path/to/file/in/pfs",
Short: "Put a file",
Long: "Put a file. If the -f flag is not used, the data is read from stdin. If the -f flag is used and a path is not provided, the base name of the file is used as the path. commit-id must be an open commit.",
Short: "Put a file into the filesystem.",
Long: `Put-file supports a number of ways to insert data into pfs:
Put data from stdin as repo/commit/path :
echo "data" | pachctl put-file repo commit path
Put a file from the local filesystem as repo/commit/path:
pachctl put-file repo commit path -f file
Put a file from the local filesystem as repo/commit/file:
pachctl put-file repo commit -f file
Put the contents of a directory as repo/commit/path/dir/file:
pachctl put-file -r repo commit path -f dir
Put the contents of a directory as repo/commit/dir/file:
pachctl put-file -r repo commit -f dir
Put the data from a URL as repo/commit/path:
pachctl put-file repo commit path -f http://host/url_path
Put the data from a URL as repo/commit/url_path:
pachctl put-file repo commit -f http://host/url_path
`,
Run: cmd.RunBoundedArgs(2, 3, func(args []string) (retErr error) {
client, err := client.NewFromAddress(address)
if err != nil {
return err
}
if filePath == "" || filePath == "-" {
if commitFlag {
commit, err := client.StartCommit(args[0],
"", args[1])
if err != nil {
return err
}
defer func() {
if err := client.FinishCommit(commit.Repo.Name, commit.ID); err != nil && retErr == nil {
retErr = err
}
}()
}
if filePath == "-" {
if len(args) < 3 {
return errors.New("either a path or the -f flag needs to be provided")
}
_, err = client.PutFile(args[0], args[1], args[2], os.Stdin)
return err
}
f, err := os.Open(filePath)
if err != nil {
return err
// try parsing the filename as a url, if it is one do a PutFileURL
if url, err := url.Parse(filePath); err == nil && url.Scheme != "" {
if len(args) < 3 {
return client.PutFileURL(args[0], args[1], url.Path, url.String())
}
return client.PutFileURL(args[0], args[1], args[2], url.String())
}
defer func() {
if err := f.Close(); err != nil && retErr == nil {
retErr = err
if !recursive {
if len(args) == 3 {
return cpFile(client, args[0], args[1], args[2], filePath)
}
}()
var p string
if len(args) == 3 {
p = args[2]
} else {
// If a path is not provided,
// use the basename of the file as the path
p = filepath.Base(filePath)
}
_, err = client.PutFile(args[0], args[1], p, f)
return err
return cpFile(client, args[0], args[1], filePath, filePath)
}
var eg errgroup.Group
filepath.Walk(filePath, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if len(args) == 3 {
eg.Go(func() error { return cpFile(client, args[0], args[1], filepath.Join(args[2], path), path) })
}
eg.Go(func() error { return cpFile(client, args[0], args[1], path, path) })
return nil
})
return eg.Wait()
}),
}
putFile.Flags().StringVarP(&filePath, "file", "f", "", "The file to be put")
putFile.Flags().StringVarP(&filePath, "file", "f", "-", "The file to be put, it can be a local file or a URL.")
putFile.Flags().BoolVarP(&recursive, "recursive", "r", false, "Recursively put the files in a directory.")
putFile.Flags().BoolVarP(&commitFlag, "commit", "c", false, "Start and finish the commit in addition to putting data.")

var fromCommitID string
var fullFile bool
Expand Down Expand Up @@ -536,3 +582,17 @@ func parseCommitMounts(args []string) []*fuse.CommitMount {
}
return result
}

func cpFile(client *client.APIClient, repo string, commit string, path string, filePath string) (retErr error) {
f, err := os.Open(filePath)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil && retErr == nil {
retErr = err
}
}()
_, err = client.PutFile(repo, commit, path, f)
return err
}
41 changes: 34 additions & 7 deletions src/server/pfs/server/internal_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package server
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -364,14 +366,39 @@ func (a *internalAPIServer) PutFile(putFileServer pfs.InternalAPI_PutFileServer)
return err
}
} else {
reader := putFileReader{
server: putFileServer,
}
_, err = reader.buffer.Write(request.Value)
if err != nil {
return err
var r io.Reader
var delimiter pfs.Delimiter
if request.Url != "" {
resp, err := http.Get(request.Url)
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil && retErr == nil {
retErr = err
}
}()
r = resp.Body
switch resp.Header.Get("Content-Type") {
case "application/json":
delimiter = pfs.Delimiter_JSON
case "application/text":
delimiter = pfs.Delimiter_LINE
default:
delimiter = pfs.Delimiter_NONE
}
} else {
reader := putFileReader{
server: putFileServer,
}
_, err = reader.buffer.Write(request.Value)
if err != nil {
return err
}
r = &reader
delimiter = request.Delimiter
}
if err := a.driver.PutFile(request.File, request.Handle, request.Delimiter, shard, &reader); err != nil {
if err := a.driver.PutFile(request.File, request.Handle, delimiter, shard, r); err != nil {
return err
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/server/vendor/golang.org/x/sync/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions src/server/vendor/golang.org/x/sync/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions src/server/vendor/golang.org/x/sync/errgroup/errgroup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/server/vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@
"revision": "71d9edd725fe4ce4c692fcb20765be558df45ad3",
"revisionTime": "2016-06-01T00:25:10Z"
},
{
"checksumSHA1": "S0DP7Pn7sZUmXc55IzZnNvERu6s=",
"path": "golang.org/x/sync/errgroup",
"revision": "316e794f7b5e3df4e95175a45a5fb8b12f85cb4f",
"revisionTime": "2016-07-15T18:54:39Z"
},
{
"checksumSHA1": "QcDlr1sWBDt6Dn/v7W/MRgccAi4=",
"path": "golang.org/x/sys/unix",
Expand Down

0 comments on commit 2bd83e6

Please sign in to comment.