Skip to content

Commit

Permalink
Fix uploading with collection - ttl (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed Nov 18, 2019
1 parent 358976b commit 9bf4b43
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 74 deletions.
38 changes: 23 additions & 15 deletions filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goseaweedfs
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
)
Expand Down Expand Up @@ -57,11 +58,8 @@ func (f *Filer) Close() (err error) {
func (f *Filer) Upload(localFilePath, newPath, collection, ttl string) (result *FilerUploadResult, err error) {
fp, err := NewFilePart(localFilePath)
if err == nil {
fp.Collection = collection
fp.TTL = ttl

var data []byte
data, _, err = f.client.upload(f.fullpath(newPath), localFilePath, fp.Reader, fp.MimeType)
data, _, err = f.client.upload(encodeURI(*f.base, newPath, normalize(nil, collection, ttl)), localFilePath, fp.Reader, fp.MimeType)
if err == nil {
result = &FilerUploadResult{}
err = json.Unmarshal(data, result)
Expand All @@ -72,26 +70,36 @@ func (f *Filer) Upload(localFilePath, newPath, collection, ttl string) (result *
return
}

// UploadFile content.
func (f *Filer) UploadFile(content io.Reader, fileSize int64, newPath, collection, ttl string) (result *FilerUploadResult, err error) {
fp := NewFilePartFromReader(ioutil.NopCloser(content), newPath, fileSize)

var data []byte
data, _, err = f.client.upload(encodeURI(*f.base, newPath, normalize(nil, collection, ttl)), newPath, ioutil.NopCloser(content), "")
if err == nil {
result = &FilerUploadResult{}
err = json.Unmarshal(data, result)
}

_ = fp.Close()

return
}

// Get response data from filer.
func (f *Filer) Get(path string, args url.Values, header map[string]string) (data []byte, statusCode int, err error) {
data, statusCode, err = f.client.get(f.base, path, args, header)
data, statusCode, err = f.client.get(encodeURI(*f.base, path, args), header)
return
}

// Download a file.
func (f *Filer) Download(path string, callback func(io.Reader) error) (err error) {
_, err = f.client.Download(f.fullpath(path), callback)
func (f *Filer) Download(path string, args url.Values, callback func(io.Reader) error) (err error) {
_, err = f.client.download(encodeURI(*f.base, path, args), callback)
return
}

// Delete a file/dir.
func (f *Filer) Delete(path string, recursive bool) (err error) {
_, err = f.client.delete(f.fullpath(path), recursive)
func (f *Filer) Delete(path string, args url.Values) (err error) {
_, err = f.client.delete(encodeURI(*f.base, path, args))
return
}

func (f *Filer) fullpath(path string) string {
u := *f.base
u.Path = path
return u.String()
}
27 changes: 8 additions & 19 deletions http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"path/filepath"
"strings"

Expand All @@ -35,8 +34,8 @@ func (c *httpClient) Close() (err error) {
return
}

func (c *httpClient) get(base *url.URL, path string, args url.Values, header map[string]string) (body []byte, statusCode int, err error) {
req, err := http.NewRequest(http.MethodGet, encodeURI(*base, path, args), nil)
func (c *httpClient) get(url string, header map[string]string) (body []byte, statusCode int, err error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err == nil {
for k, v := range header {
req.Header.Set(k, v)
Expand All @@ -52,21 +51,12 @@ func (c *httpClient) get(base *url.URL, path string, args url.Values, header map
return
}

func (c *httpClient) delete(url string, recursive bool) (statusCode int, err error) {
func (c *httpClient) delete(url string) (statusCode int, err error) {
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
return
}

if recursive {
query := req.URL.Query()
query.Set("recursive", "true")
req.URL.RawQuery = query.Encode()

// trigger to use req.URL
req.Host = ""
}

r, err := c.client.Do(req)
if err != nil {
return
Expand Down Expand Up @@ -94,13 +84,12 @@ func (c *httpClient) delete(url string, recursive bool) (statusCode int, err err
return
}

// Download file from url.
func (c *httpClient) Download(fileURL string, callback func(io.Reader) error) (filename string, err error) {
r, err := c.client.Get(fileURL)
func (c *httpClient) download(url string, callback func(io.Reader) error) (filename string, err error) {
r, err := c.client.Get(url)
if err == nil {
if r.StatusCode != http.StatusOK {
drainAndClose(r.Body)
err = fmt.Errorf("Download %s but error. Status:%s", fileURL, r.Status)
err = fmt.Errorf("Download %s but error. Status:%s", url, r.Status)
return
}

Expand All @@ -122,7 +111,7 @@ func (c *httpClient) Download(fileURL string, callback func(io.Reader) error) (f
return
}

func (c *httpClient) upload(uploadURL string, filename string, fileReader io.Reader, mtype string) (respBody []byte, statusCode int, err error) {
func (c *httpClient) upload(url string, filename string, fileReader io.Reader, mtype string) (respBody []byte, statusCode int, err error) {
r, w := io.Pipe()

// create multipart writer
Expand Down Expand Up @@ -159,7 +148,7 @@ func (c *httpClient) upload(uploadURL string, filename string, fileReader io.Rea
c.workers.Do(task)

var resp *http.Response
if resp, err = c.client.Post(uploadURL, mw.FormDataContentType(), r); err == nil {
if resp, err = c.client.Post(url, mw.FormDataContentType(), r); err == nil {
if respBody, statusCode, err = readAll(resp); err == nil {
result := <-task.Result()
err = result.Err
Expand Down
60 changes: 26 additions & 34 deletions seaweed.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,10 @@ func (c *Seaweed) Filers() []*Filer {

// Grow pre-Allocate Volumes.
func (c *Seaweed) Grow(count int, collection, replication, dataCenter string) error {
args := make(url.Values)
args := normalize(nil, collection, "")
if count > 0 {
args.Set(ParamGrowCount, strconv.Itoa(count))
}
if collection != "" {
args.Set(ParamGrowCollection, collection)
}
if replication != "" {
args.Set(ParamGrowReplication, replication)
}
Expand All @@ -163,7 +160,7 @@ func (c *Seaweed) Grow(count int, collection, replication, dataCenter string) er

// GrowArgs pre-Allocate volumes with args.
func (c *Seaweed) GrowArgs(args url.Values) (err error) {
_, _, err = c.client.get(c.master, "/vol/grow", args, nil)
_, _, err = c.client.get(encodeURI(*c.master, "/vol/grow", args), nil)
return
}

Expand All @@ -174,12 +171,10 @@ func (c *Seaweed) Lookup(volID string, args url.Values) (result *LookupResult, e
}

func (c *Seaweed) doLookup(volID string, args url.Values) (result *LookupResult, err error) {
if args == nil {
args = make(url.Values)
}
args = normalize(args, "", "")
args.Set(ParamLookupVolumeID, volID)

jsonBlob, _, err := c.client.get(c.master, "/dir/lookup", args, nil)
jsonBlob, _, err := c.client.get(encodeURI(*c.master, "/dir/lookup", args), nil)
if err == nil {
result = &LookupResult{}
if err = json.Unmarshal(jsonBlob, result); err == nil {
Expand Down Expand Up @@ -240,13 +235,13 @@ func (c *Seaweed) GC(threshold float64) (err error) {
args := url.Values{
"garbageThreshold": []string{strconv.FormatFloat(threshold, 'f', -1, 64)},
}
_, _, err = c.client.get(c.master, "/vol/vacuum", args, nil)
_, _, err = c.client.get(encodeURI(*c.master, "/vol/vacuum", args), nil)
return
}

// Status check System Status.
func (c *Seaweed) Status() (result *SystemStatus, err error) {
data, _, err := c.client.get(c.master, "/dir/status", nil, nil)
data, _, err := c.client.get(encodeURI(*c.master, "/dir/status", nil), nil)
if err == nil {
result = &SystemStatus{}
err = json.Unmarshal(data, result)
Expand All @@ -256,7 +251,7 @@ func (c *Seaweed) Status() (result *SystemStatus, err error) {

// ClusterStatus get cluster status.
func (c *Seaweed) ClusterStatus() (result *ClusterStatus, err error) {
data, _, err := c.client.get(c.master, "/cluster/status", nil, nil)
data, _, err := c.client.get(encodeURI(*c.master, "/cluster/status", nil), nil)
if err == nil {
result = &ClusterStatus{}
err = json.Unmarshal(data, result)
Expand All @@ -265,8 +260,8 @@ func (c *Seaweed) ClusterStatus() (result *ClusterStatus, err error) {
}

// Assign do assign api.
func (c *Seaweed) Assign() (result *AssignResult, err error) {
jsonBlob, _, err := c.client.get(c.master, "/dir/assign", nil, nil)
func (c *Seaweed) Assign(args url.Values) (result *AssignResult, err error) {
jsonBlob, _, err := c.client.get(encodeURI(*c.master, "/dir/assign", args), nil)
if err == nil {
result = &AssignResult{}
if err = json.Unmarshal(jsonBlob, result); err != nil {
Expand All @@ -283,9 +278,7 @@ func (c *Seaweed) Assign() (result *AssignResult, err error) {
func (c *Seaweed) Submit(filePath string, collection, ttl string) (result *SubmitResult, err error) {
fp, err := NewFilePart(filePath)
if err == nil {
fp.Collection = collection
fp.TTL = ttl
result, err = c.SubmitFilePart(fp, nil)
result, err = c.SubmitFilePart(fp, normalize(nil, collection, ttl))
_ = fp.Close()
}
return
Expand Down Expand Up @@ -323,20 +316,21 @@ func (c *Seaweed) UploadFile(filePath string, collection, ttl string) (cm *Chunk
// UploadFilePart uploads a file part.
func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string, err error) {
if f.FileID == "" {
res, err := c.Assign()
res, err := c.Assign(normalize(nil, f.Collection, f.TTL))
if err != nil {
return nil, "", err
}
f.Server, f.FileID = res.URL, res.FileID
}

if f.Server == "" {
if f.Server, err = c.LookupServerByFileID(f.FileID, url.Values{ParamCollection: []string{f.Collection}}, false); err != nil {
if f.Server, err = c.LookupServerByFileID(f.FileID, normalize(nil, f.Collection, ""), false); err != nil {
return
}
}

baseName := path.Base(f.FileName)

if c.chunkSize > 0 && f.FileSize > c.chunkSize {
chunks := f.FileSize/c.chunkSize + 1

Expand All @@ -346,13 +340,11 @@ func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string,
Mime: f.MimeType,
Chunks: make([]*ChunkInfo, chunks),
}
args := url.Values{ParamCollection: []string{f.Collection}}
args.Set("Content-Type", "multipart/form-data")

for i := int64(0); i < chunks; i++ {
_, id, count, e := c.uploadChunk(f, baseName+"_"+strconv.FormatInt(i+1, 10))
if e != nil { // delete all uploaded chunks
_ = c.DeleteChunks(cm, args)
_ = c.DeleteChunks(cm, normalize(nil, f.Collection, ""))
return nil, "", e
}

Expand All @@ -364,14 +356,14 @@ func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string,
}

if err = c.uploadManifest(f, cm); err != nil { // delete all uploaded chunks
_ = c.DeleteChunks(cm, args)
_ = c.DeleteChunks(cm, normalize(nil, f.Collection, ""))
}
} else {
args := make(url.Values)
args := normalize(nil, f.Collection, f.TTL)
args.Set("Content-Type", "multipart/form-data")
if f.ModTime != 0 {
args.Set("ts", strconv.FormatInt(f.ModTime, 10))
}
args.Set("Content-Type", "multipart/form-data")

_, _, err = c.client.upload(encodeURI(*c.master, f.FileID, args), baseName, f.Reader, f.MimeType)
}
Expand Down Expand Up @@ -402,7 +394,7 @@ func (c *Seaweed) BatchUploadFileParts(files []*FilePart, collection string, ttl
}
}

assigned, err := c.Assign()
assigned, err := c.Assign(normalize(nil, collection, ttl))
if err != nil {
for i := range files {
results[i].Error = err.Error()
Expand All @@ -418,6 +410,7 @@ func (c *Seaweed) BatchUploadFileParts(files []*FilePart, collection string, ttl
}
file.Server = assigned.URL
file.Collection = collection
file.TTL = ttl

results[i].Size = file.FileSize
results[i].FileID = file.FileID
Expand Down Expand Up @@ -469,17 +462,16 @@ func (c *Seaweed) ReplaceFile(fileID, localFilePath string, deleteFirst bool) (e
// ReplaceFilePart replaces file part.
func (c *Seaweed) ReplaceFilePart(f *FilePart, deleteFirst bool) (fileID string, err error) {
if deleteFirst && f.FileID != "" {
if err = c.DeleteFile(f.FileID, url.Values{ParamCollection: []string{f.Collection}}); err == nil {
_, fileID, err = c.UploadFilePart(f)
}
_ = c.DeleteFile(f.FileID, nil)
}

_, fileID, err = c.UploadFilePart(f)
return
}

func (c *Seaweed) uploadChunk(f *FilePart, filename string) (assignResult *AssignResult, fileID string, size int64, err error) {
// Assign first to get file id and url for uploading
assignResult, err = c.Assign()

assignResult, err = c.Assign(normalize(nil, f.Collection, f.TTL))
if err == nil {
fileID = assignResult.FileID

Expand Down Expand Up @@ -509,7 +501,7 @@ func (c *Seaweed) uploadManifest(f *FilePart, manifest *ChunkManifest) (err erro
if err == nil {
bufReader := bytes.NewReader(buf)

args := make(url.Values)
args := normalize(nil, f.Collection, f.TTL)
if f.ModTime != 0 {
args.Set("ts", strconv.FormatInt(f.ModTime, 10))
}
Expand All @@ -524,7 +516,7 @@ func (c *Seaweed) uploadManifest(f *FilePart, manifest *ChunkManifest) (err erro
func (c *Seaweed) Download(fileID string, args url.Values, callback func(io.Reader) error) (fileName string, err error) {
fileURL, err := c.LookupFileID(fileID, args, true)
if err == nil {
fileName, err = c.client.Download(fileURL, callback)
fileName, err = c.client.download(fileURL, callback)
}
return
}
Expand Down Expand Up @@ -562,7 +554,7 @@ func (c *Seaweed) deleteFileTask(fileID string, args url.Values) *workerpool.Tas
func (c *Seaweed) DeleteFile(fileID string, args url.Values) (err error) {
fileURL, err := c.LookupFileID(fileID, args, false)
if err == nil {
_, err = c.client.delete(fileURL, false)
_, err = c.client.delete(fileURL)
}
return
}
8 changes: 4 additions & 4 deletions seaweed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ func TestFiler(t *testing.T) {
require.Nil(t, err)

// try to download
err = filer.Download("/js/test.txt", func(r io.Reader) error {
err = filer.Download("/js/test.txt", nil, func(r io.Reader) error {
return nil
})
require.Nil(t, err)

// try to delete this file
err = filer.Delete("/js/test.txt", false)
err = filer.Delete("/js/test.txt", nil)
require.Nil(t, err)

// test with non prefix
Expand All @@ -181,13 +181,13 @@ func TestFiler(t *testing.T) {
require.NotZero(t, len(data))

// try to download
err = filer.Download("js/test1.jsx", func(r io.Reader) error {
err = filer.Download("js/test1.jsx", nil, func(r io.Reader) error {
return fmt.Errorf("Fake error")
})
require.NotNil(t, err)

// try to delete this file
err = filer.Delete("js/test1.jsx", true)
err = filer.Delete("js/test1.jsx", nil)
require.Nil(t, err)
}

Expand Down

0 comments on commit 9bf4b43

Please sign in to comment.