Permalink
Browse files

Merge pull request #199 from shijiayun/b_sjy_up_prefop

up支持向mq推送异步nfop消息和up的一些优化调整
  • Loading branch information...
2 parents 26a87e6 + 376e40c commit ae52b60a84ef1227f4b88ab4426ab8ce771df8b9 @carter2000 carter2000 committed Dec 5, 2012
Showing with 343 additions and 328 deletions.
  1. +1 −1 io/src/qbox.us/store/simplestg.go
  2. +214 −174 io/src/qbox.us/up/up_svr.go
  3. +128 −153 io/src/qbox.us/up/up_svr_test.go
View
2 io/src/qbox.us/store/simplestg.go
@@ -69,7 +69,7 @@ func (p *SimpleStorage) Get(key []byte, w io.Writer, from, to int, bds [4]uint16
func (p *SimpleStorage) Put(key []byte, r io.Reader, n int) error {
val := make([]byte, n)
- n2, err := r.Read(val)
+ n2, err := io.ReadFull(r, val)
if n != n2 || err != nil || !bytes.Equal(key, sha1.Hash(val)) {
return EVerifyFailed
}
View
388 io/src/qbox.us/up/up_svr.go
@@ -19,7 +19,6 @@ import (
"qbox.us/api/up"
"qbox.us/audit/logh"
"qbox.us/auditlog"
- "qbox.us/cc"
"qbox.us/errors"
betag "qbox.us/etag"
"qbox.us/log"
@@ -33,6 +32,7 @@ import (
"qbox.us/timeio"
"qbox.us/sstore"
"qbox.us/store"
+ "qbox.us/cc"
upadmin "qbox.us/admin_api/up"
upSha1 "qbox.us/up/sha1"
)
@@ -100,6 +100,7 @@ type Config struct {
Clients map[string]Client // {"rs": Client{"http://rs.qbox.me", RS_AuthScope}}
MyIdc uint16
Csid uint16
+ AsyncAct func(owner uint32, entryURI string, fh []byte, fsize int64, asyncOps string) error
MyHost string
IgnoreDeadline bool
}
@@ -375,40 +376,50 @@ func (r *Service) mkfile(w http.ResponseWriter, req *http.Request) {
}
begin = time.Now().UnixNano()
- done := r.onFilePutted(
- w, int64(fsize), entryURI, otherQueryStr, user, nil, &auth, keys, cli, (auth.CallbackUrl == ""))
+ etag, _, err := r.onFilePutted(int64(fsize), entryURI, otherQueryStr, user, &auth, keys, cli)
+ if err != nil {
+ err = errors.Info(err, "mkfile: onFilePutted failed").Detail(err)
+ httputil.Error(w, err)
+ return
+ }
log.Info("UP mkfile onFilePutted", time.Now().UnixNano() - begin)
- if done {
- var ret interface{}
- var params string
- for i := 4; i+1 < len(query); i += 2 {
- if query[i] == "params" {
- params2, err2 := rpc.DecodeURI(query[i+1])
- if err2 != nil {
- err = errors.Info(api.EInvalidArgs, "mkfile: invalid params -", query[i+1]).Detail(err2)
- httputil.Error(w, err)
- return
- }
- params = string(params2)
- break
+
+ if auth.CallbackUrl == "" {
+ data := map[string]string{"hash": etag}
+ httputil.Reply(w, api.OK, data)
+ return
+ }
+
+ // app callback
+ var ret interface{}
+ var params string
+ for i := 4; i+1 < len(query); i += 2 {
+ if query[i] == "params" {
+ params2, err2 := rpc.DecodeURI(query[i+1])
+ if err2 != nil {
+ err = errors.Info(api.EInvalidArgs, "mkfile: invalid params -", query[i+1]).Detail(err2)
+ httputil.Error(w, err)
+ return
}
+ params = string(params2)
+ break
}
- if auth.CallbackBodyType == "" {
- auth.CallbackBodyType = "application/x-www-form-urlencoded"
- }
- begin := time.Now().UnixNano()
- code, err := rpc.DefaultClient.CallWithBinaryEx(
- &ret, auth.CallbackUrl, auth.CallbackBodyType, strings.NewReader(params), len(params))
- logh.Info(w, "Appcall", logh.M{
- "callback": auth.CallbackUrl,
- "data": ret,
- })
- log.Debug("Appcall:", auth.CallbackUrl, params, time.Now().UnixNano() - begin)
- if err != nil {
- httputil.ReplyError(w, err.Error(), code)
- } else {
- httputil.Reply(w, code, ret)
- }
+ }
+ if auth.CallbackBodyType == "" {
+ auth.CallbackBodyType = "application/x-www-form-urlencoded"
+ }
+ begin = time.Now().UnixNano()
+ code, err := rpc.DefaultClient.CallWithBinaryEx(
+ &ret, auth.CallbackUrl, auth.CallbackBodyType, strings.NewReader(params), len(params))
+ logh.Info(w, "Appcall", logh.M{
+ "callback": auth.CallbackUrl,
+ "data": ret,
+ })
+ log.Debug("Appcall:", auth.CallbackUrl, params, time.Now().UnixNano() - begin)
+ if err != nil {
+ httputil.ReplyError(w, err.Error(), code)
+ } else {
+ httputil.Reply(w, code, ret)
}
}
@@ -425,21 +436,31 @@ func (r *Service) put(w http.ResponseWriter, req *http.Request) {
fsize := req.ContentLength
if fsize < 0 {
- err = errors.Info(api.EInvalidArgs, "Service.put: Content-Length not found")
+ err = errors.Info(api.EInvalidArgs, "rs-put: Content-Length not found")
httputil.Error(w, err)
return
}
user, err := account.GetAuthExt(r.Account, req)
if err != nil {
- err = errors.Info(api.EBadToken, "Service.put: auth failed -", user).Detail(err)
+ err = errors.Info(api.EBadToken, "rs-put: GetAuthExt failed", user).Detail(err)
httputil.Error(w, err)
return
}
queryStr := req.URL.Path[1:]
- r.putFile(w, req.Body, fsize, queryStr, user, nil, nil, true)
+ begin := time.Now().UnixNano()
+ etag, _, err := r.putFile(req.Body, fsize, queryStr, user, nil)
+ if err != nil {
+ err = errors.Info(err, "rs-put: putFile failed").Detail(err)
+ httputil.Error(w, err)
+ return
+ }
+ log.Info("rs-put: putFile cost time ->", time.Now().UnixNano()-begin)
+
+ data := map[string]string{"hash": etag}
+ httputil.Reply(w, api.OK, data)
}
// -----------------------------------------------------------
@@ -470,7 +491,7 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
var err error
if err = req.ParseMultipartForm(16 * 1024); err != nil || req.MultipartForm == nil {
- err = errors.Info(errors.EINVAL, "Service.upload ParseMultipartForm failed").Detail(err)
+ err = errors.Info(api.EInvalidArgs, "upload: req.ParseMultipartForm failed").Detail(err)
httputil.Error(w, err)
return
}
@@ -484,48 +505,53 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
uptoken := multiForm.Value["auth"]
if uptoken == nil || uptoken[0] == "" {
- err = errors.Info(api.EInvalidArgs, "Service.upload: uptoken not specified")
+ err = errors.Info(api.EInvalidArgs, "upload: uptoken not specified")
httputil.Error(w, err)
return
}
var auth authPolicy
user, err := GetUpAuth(&auth, r.Account, uptoken[0])
- log.Debug("auth info:", auth, auth.Customer)
if err != nil {
- err = errors.Info(api.EBadToken, "Service.upload: auth failed -", user).Detail(err)
+ err = errors.Info(api.EBadToken, "upload: GetUpAuth failed", user).Detail(err)
httputil.Error(w, err)
return
}
+ log.Debug("authPolicy:", auth)
- if file == nil {
- err = errors.Info(api.EInvalidArgs, "Service.upload: file not specified")
+ if file == nil || file[0] == nil {
+ err = errors.Info(api.EInvalidArgs, "upload: file not specified")
httputil.Error(w, err)
return
}
if action == nil || action[0] == "" {
- err = errors.Info(api.EInvalidArgs, "Service.upload: action not specified")
+ err = errors.Info(api.EInvalidArgs, "upload: action not specified")
httputil.Error(w, err)
return
}
if auth.CallbackUrl != "" && (params == nil || params[0] == "") {
- err = errors.Info(api.EInvalidArgs, "Service.upload: params required")
+ err = errors.Info(api.EInvalidArgs, "upload: params required")
httputil.Error(w, err)
return
}
- var hasCrc32, hasRotate bool
- var crc32Data uint32
- var rotate int
queryStr := action[0][1:]
query := strings.Split(queryStr, "/")
+ if len(query) < 2 {
+ httputil.ReplyWithCode(w, api.InvalidArgs)
+ return
+ }
log.Debug("query:", query)
+
+ var hasCrc32, hasRotate bool
+ var crc32Data uint32
+ var rotate int
for i := 1; i < len(query); i += 2 {
switch query[i-1] {
case "crc32":
hasCrc32 = true
u, err := strconv.ParseUint(query[i], 10, 32)
if err != nil {
- err = errors.Info(api.EInvalidArgs, "Service.upload: invalid crc32")
+ err = errors.Info(api.EInvalidArgs, "upload: invalid crc32")
httputil.Error(w, err)
return
}
@@ -534,7 +560,7 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
hasRotate = true
r, err := strconv.Atoi(query[i])
if err != nil || uint(r) >= 4 {
- err = errors.Info(api.EInvalidArgs, "Service.upload: invalid rotate")
+ err = errors.Info(api.EInvalidArgs, "upload: invalid rotate")
httputil.Error(w, err)
return
}
@@ -548,7 +574,7 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
f, err := file[0].Open()
if err != nil {
- err = errors.Info(api.EFunctionFail, "Service.upload: file open failed").Detail(err)
+ err = errors.Info(err, "upload: file.Open failed").Detail(err)
httputil.Error(w, err)
return
}
@@ -557,11 +583,11 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
// crc32
if hasCrc32 {
- log.Debug("Service.upload: check crc32")
+ log.Debug("upload: check crc32")
h := crc32.NewIEEE()
fsize, err = io.Copy(h, f)
if crc32Data != h.Sum32() {
- err = errors.Info(api.EDataVerificationFail, "Service.upload: crc32 check error")
+ err = errors.Info(api.EDataVerificationFail, "upload: crc32 check error")
httputil.Error(w, err)
return
}
@@ -576,163 +602,172 @@ func (r *Service) upload(w http.ResponseWriter, req *http.Request) {
// rotate
if hasRotate {
- log.Debug("Service.upload: rotate image")
+ log.Debug("upload: rotate image")
+ begin := time.Now().UnixNano()
fname, err := rotateImage(f, fsize, rotate)
if err != nil {
- err = errors.Info(err, "Service.rotateImage failed").Detail(err)
+ err = errors.Info(err, "rotateImage failed").Detail(err)
httputil.Error(w, err)
return
}
defer os.Remove(fname)
+
imageFile, err := os.Open(fname)
if err != nil {
- err = errors.Info(err, "Service.upload: open rotated image failed").Detail(err)
+ err = errors.Info(err, "upload: os.Open rotated image failed").Detail(err)
httputil.Error(w, err)
return
}
defer imageFile.Close()
fi, err := imageFile.Stat()
if err != nil {
- err = errors.Info(err, "Service.upload: imageFile.Stat failed").Detail(err)
+ err = errors.Info(err, "upload: imageFile.Stat failed").Detail(err)
httputil.Error(w, err)
return
}
fileToUpload = imageFile
fsize = fi.Size()
+ log.Info("upload: rotateImage cost time ->", time.Now().UnixNano()-begin)
}
log.Debug("fileToUpload:", fileToUpload, "fsize:", fsize)
- var etag string
+ // upload
+ begin := time.Now().UnixNano()
fileToUpload.Seek(0, 0)
- if r.putFile(w, fileToUpload, fsize, queryStr, user, &etag, &auth, auth.CallbackUrl == "") {
- var ret interface{}
- msg := params[0]
- callbackUrl := auth.CallbackUrl
- callbackBodyType := auth.CallbackBodyType
- if callbackBodyType == "" {
- callbackBodyType = "application/x-www-form-urlencoded"
- }
+ etag, fh, err := r.putFile(fileToUpload, fsize, queryStr, user, &auth)
+ if err != nil {
+ err = errors.Info(err, "upload: putFile failed").Detail(err)
+ httputil.Error(w, err)
+ return
+ }
+ log.Info("upload: putFile cost time ->", time.Now().UnixNano()-begin)
- if auth.Escape == 1 {
- msg, err = escapeCustomData(fileToUpload, fsize, etag, msg, callbackBodyType)
- if err != nil {
- httputil.Error(w, err)
- return
- }
+ // async action
+ if auth.AsyncOps != "" {
+ log.Info("upload: async action -", auth.AsyncOps)
+ entryURI, _ := rpc.DecodeURI(query[1])
+ err = r.AsyncAct(user.Uid, entryURI, fh, fsize, auth.AsyncOps)
+ if err != nil {
+ errors.Info(err, "upload: AsyncAct").Detail(err).Warn()
}
+ }
- body := strings.NewReader(msg)
- log.Info("AppCallback:", callbackUrl, callbackBodyType, msg)
- code, err := rpc.DefaultClient.CallWithBinaryEx(&ret, callbackUrl, callbackBodyType, body, len(msg))
+ if auth.CallbackUrl == "" {
+ data := map[string]string{"hash": etag}
+ httputil.Reply(w, api.OK, data)
+ return
+ }
+
+ // app callback
+ msg := params[0]
+ callbackUrl := auth.CallbackUrl
+ callbackBodyType := auth.CallbackBodyType
+ if callbackBodyType == "" {
+ callbackBodyType = "application/x-www-form-urlencoded"
+ }
+
+ if auth.Escape == 1 {
+ msg, err = escapeCustomData(fileToUpload, fsize, etag, msg, callbackBodyType)
if err != nil {
- httputil.ReplyError(w, err.Error(), code)
- } else {
- httputil.Reply(w, code, ret)
+ err = errors.Info(err, "upload: escapeCustomData failed").Detail(err)
+ httputil.Error(w, err)
+ return
}
}
-}
-func (r *Service) putFile(w http.ResponseWriter, stream io.Reader, fsize int64,
- queryStr string, user account.UserInfo, etag *string, auth *authPolicy, replyIfOk bool) (notReplyed bool) {
+ log.Info("AppCallback:", callbackUrl, callbackBodyType, msg)
+ var ret interface{}
+ begin = time.Now().UnixNano()
+ body := strings.NewReader(msg)
+ code, err := rpc.DefaultClient.CallWithBinaryEx(&ret, callbackUrl, callbackBodyType, body, len(msg))
+ log.Info("AppCallback: time cost ->", time.Now().UnixNano()-begin, "ret ->", code, ret, err)
+ if err != nil {
+ httputil.ReplyError(w, err.Error(), code)
+ } else {
+ httputil.Reply(w, code, ret)
+ }
+ return
+}
- var err error
+func (r *Service) putFile(f io.Reader, fsize int64,
+ queryStr string, user account.UserInfo, auth *authPolicy) (etag string, fh []byte, err error) {
- query := strings.Split(queryStr, "/")
+ query := strings.SplitN(queryStr, "/", 3)
if len(query) < 2 {
- httputil.ReplyWithCode(w, api.InvalidArgs)
+ err = api.EInvalidArgs
return
}
cmd := query[0]
if !strings.HasSuffix(cmd, "-put") {
- err = errors.Info(api.EInvalidArgs, "Service.upload: invalid action cmd -", query[0])
- httputil.Error(w, err)
+ err = errors.Info(api.EInvalidArgs, "putFile: invalid action cmd -", query[0])
return
}
client := cmd[:len(cmd)-4]
cli, ok := r.Clients[client]
if !ok {
- err = errors.Info(api.EInvalidArgs, "Service.upload: unknown client -", client)
- httputil.Error(w, err)
+ err = errors.Info(api.EInvalidArgs, "putFile: unknown client -", client)
return
}
entryURI, err := rpc.DecodeURI(query[1])
if err != nil {
- err = errors.Info(api.EInvalidArgs, "Service.upload: invalid action encodedEntryURI -", query[1]).Detail(err)
- httputil.Error(w, err)
+ err = errors.Info(api.EInvalidArgs, "putFile: invalid entryURI -", query[1]).Detail(err)
return
}
- query = strings.SplitN(queryStr, "/", 3)
var otherQueryStr string
if len(query) == 3 {
otherQueryStr = query[2]
}
- blockCnt := int((fsize + (ChunkSize - 1)) >> ChunkBits)
- last := blockCnt - 1
- blockSize := int64(ChunkSize)
- keys := make([]byte, blockCnt*20)
- kwriter := cc.NewBytesWriter(keys)
+ chunkCnt := int((fsize + (ChunkSize - 1)) >> ChunkBits)
+ chunkSize := int64(ChunkSize)
+ keys := make([]byte, chunkCnt*20)
- n := int64(0)
- for blockIdx := 0; blockIdx < blockCnt; blockIdx++ {
- if blockIdx == last {
- offbase := int64(blockIdx) << ChunkBits
- blockSize = fsize - offbase
+ var offset int64
+ for i := 0; i < chunkCnt; i++ {
+ if i == chunkCnt-1 {
+ chunkSize = fsize - int64(i)<<ChunkBits
}
- cnt, key, err := r.putBlock(stream, blockSize)
- if err != nil {
- err = errors.Info(err, "Service.putFile").Detail(err)
- httputil.Error(w, err)
+ key, er := r.putBlock(f, offset, chunkSize)
+ if er != nil {
+ err = errors.Info(er, "putFile: putBlock failed", offset, chunkSize).Detail(er)
return
}
- n += cnt
- kwriter.Write(key)
- }
-
- if n != fsize {
- err := errors.Info(api.EFunctionFail, "Service.putFile: put len != fsize")
- httputil.Error(w, err)
- return
+ copy(keys[i*20: (i+1)*20], key)
+ offset += chunkSize
}
- return r.onFilePutted(w, fsize, entryURI, otherQueryStr, user, etag, auth, kwriter.Bytes(), cli, replyIfOk)
+ return r.onFilePutted(fsize, entryURI, otherQueryStr, user, auth, keys, cli)
}
-func (r *Service) onFilePutted(
- w http.ResponseWriter, fsize int64, entryURI, otherQueryStr string, user account.UserInfo,
- etag *string, auth *authPolicy, keys []byte, cli Client, replyIfOk bool) (notReplyed bool) {
+func (r *Service) onFilePutted(fsize int64, entryURI, otherQueryStr string,
+ user account.UserInfo, auth *authPolicy, keys []byte, cli Client) (etag string, fh []byte, err error) {
- var err error
nkeys := int((fsize + (ChunkSize - 1)) >> ChunkBits)
+ if nkeys*20 != len(keys) {
+ err = errors.Info(api.EInvalidArgs, "onFilePutted: invalid keys length")
+ return
+ }
method := "/put/"
if auth != nil && auth.fromClient {
method, err = cli.AuthScope(auth.Scope, entryURI)
if err != nil {
- httputil.Error(w, err)
+ err = errors.Info(err, "onFilePutted: cli.AuthScope failed").Detail(err)
return
}
}
- if nkeys*20 != len(keys) {
- err := errors.Info(errors.New("invalid keys length"), "Servece.onFilePutted")
- httputil.Error(w, err)
- return
- }
-
- var fh []byte
if len(keys) > (FileHandleLenMax - 3) {
h := sha1.New()
h.Write(keys)
digest := h.Sum(nil)
- err = r.Put(digest, cc.NewBytesReader(keys), len(keys), false)
+ err = r.Put(digest, bytes.NewReader(keys), len(keys), false)
if err != nil {
- err = errors.Info(err, "Servece.onFilePutted: put keys failed").Detail(err)
- httputil.Error(w, err)
+ err = errors.Info(err, "onFilePutted: put key to stg failed").Detail(err)
return
}
fh = make([]byte, 23)
@@ -749,41 +784,35 @@ func (r *Service) onFilePutted(
}
hashStr := betag.GenString(fh)
- if etag != nil {
- *etag = hashStr
- }
token := r.Account.MakeAccessToken(user)
conn := rpc.Client{oauth.NewClient(token, nil)}
callback := cli.Host + method + rpc.EncodeURI(entryURI)
callback += "/fsize/" + strconv.FormatInt(fsize, 10)
callback += "/hash/" + hashStr
- log.Debug("otherQueryStr: ", otherQueryStr)
if auth != nil && auth.fromClient && auth.Customer != "" {
err = checkQueryStr(otherQueryStr)
if err != nil {
- httputil.Error(w, err)
+ err = errors.Info(err, "onFilePutted: checkQueryStr failed", otherQueryStr).Detail(err)
+ return
}
callback += "/customer/" + auth.Customer
}
if otherQueryStr != "" {
callback += "/" + otherQueryStr
}
- log.Debug("Call callback:", callback, fh)
+ log.Info("Call RS Service:", callback, fh)
+ begin := time.Now().UnixNano()
var data interface{}
code, err := conn.CallWithParam(&data, callback, "application/octet-stream", fh)
- log.Debug("Call callback ret:", code, data, err)
+ log.Info("Call RS Service: cost time ->", time.Now().UnixNano()-begin, "ret ->", code, data, err)
if err != nil {
- err = errors.Info(err, "Servece.onFilePutted, callback failed").Detail(err)
- httputil.Error(w, err)
- return
- }
- if replyIfOk {
- httputil.Reply(w, code, data)
+ err = errors.Info(err, "onFilePutted: call rs failed").Detail(err)
return
}
- return true
+ etag = data.(map[string]interface{})["hash"].(string)
+ return
}
func checkQueryStr(s string) error {
@@ -797,56 +826,67 @@ func checkQueryStr(s string) error {
return nil
}
-func (r *Service) putBlock(stream io.Reader, size int64) (n int64, digest []byte, err error) {
+func (r *Service) putBlock(f io.Reader, offset, size int64) (key []byte, err error) {
- if size <= int64(r.BlockInMemLimit) {
- buf := make([]byte, size)
- nr, er := io.ReadFull(stream, buf)
- n = int64(nr)
- if er != nil {
- err = errors.Info(er, "Service.putBlock.Read").Detail(er)
+ log.Info("putBlock:", offset, size)
+
+ // upload: file is ReaderAt
+ if rt, ok := f.(io.ReaderAt); ok {
+ h := sha1.New()
+ rr := &cc.Reader{rt, offset}
+ _, err = io.CopyN(h, rr, size)
+ if err != nil {
+ err = errors.Info(err, "putBlock(upload): io.CopyN to sha1 failed").Detail(err)
return
}
- if n != size {
- err = errors.Info(errors.New("Service.putBlock.Read: not read all"), size, nr)
- return
+ key = h.Sum(nil)
+
+ rr.Offset = offset
+ err = r.Put(key, &io.LimitedReader{rr, size}, int(size), true)
+ if err != nil {
+ err = errors.Info(err, "putBlock(upload): put to stg failed").Detail(err)
}
- sha1Hash := sha1.New()
- sha1Hash.Write(buf)
- digest = sha1Hash.Sum(nil)
+ return
+ }
+
+ // rs-put: file is Reader
+ var sr io.Reader
+ h := sha1.New()
+ tee := io.TeeReader(f, h)
- sr := bytes.NewReader(buf)
- err = r.Put(digest, sr, int(n), true)
+ if size <= int64(r.BlockInMemLimit) {
+ buf := make([]byte, size)
+ _, err = io.ReadFull(tee, buf)
if err != nil {
- err = errors.Info(err, "Service.putBlock.PutFormMem").Detail(err)
+ err = errors.Info(err, "putBlock(rs-put): io.ReadFull failed").Detail(err)
return
}
+ key = h.Sum(nil)
+ sr = bytes.NewReader(buf)
} else {
- index, checkId := r.AllocAndLock()
- if index == -1 {
- err = errors.Info(errors.New("AllocAndLock failed"), "Service.putBlock")
- return
- }
- defer r.Free(index, checkId)
-
- ctx := newFirstBlockCtx(r.File, index, int(size), true)
- block := io.LimitedReader{stream, size}
- n, err = io.Copy(ctx, &block)
- if err != nil {
- err = errors.Info(err, "Service.putBlock.Copy").Detail(err)
+ tmp, er := ioutil.TempFile("", "tmp-chunk")
+ if er != nil {
+ err = errors.Info(er, "putBlock(rs-put): ioutil.TempFile failed").Detail(er)
return
}
- digest = ctx.digest.Sum(nil)
+ defer os.Remove(tmp.Name())
+ defer tmp.Close()
- offset := int64(index) << ChunkBits
- sr := io.NewSectionReader(r.File, offset, int64(n))
- err = r.Put(digest, sr, int(n), true)
+ _, err = io.CopyN(tmp, tee, size)
if err != nil {
- err = errors.Info(err, "Service.putBlock.PutFormFile").Detail(err)
+ err = errors.Info(err, "putBlock(rs-put): io.CopyN to tmpFile failed").Detail(err)
return
}
+
+ key = h.Sum(nil)
+ tmp.Seek(0, 0)
+ sr = tmp
}
+ err = r.Put(key, sr, int(size), true)
+ if err != nil {
+ err = errors.Info(err, "putBlock(rs-put): put to stg failed").Detail(err)
+ }
return
}
View
281 io/src/qbox.us/up/up_svr_test.go
@@ -4,12 +4,12 @@ import (
"fmt"
"net/http"
"os"
- "strings"
"testing"
"time"
"bytes"
"io/ioutil"
"io"
+ "errors"
"crypto/sha1"
"hash/crc32"
"qbox.us/api/up"
@@ -33,10 +33,12 @@ const (
RS_ADDR = ":2399"
)
+// var BIG_FILE = os.Getenv("HOME") + "/Work/code/big.mp4"
var UPMGR_HOST string
-var BIG_FILE = "/Users/qiniu/Work/code/go1.0.2.src.tar.gz"
+var BIG_FILE = os.Getenv("HOME") + "/Work/code/go1.0.2.src.tar.gz"
var LIT_FILE = "up_svr.go"
var IMAGE_FILE = "testfiles/test.jpg"
+var FILE_NAME string
var TestBig = false
func init() {
@@ -45,68 +47,93 @@ func init() {
UP_HOST = "http://localhost" + UP_ADDR
UPMGR_HOST = "http://localhost" + UPMGR_ADDR
RS_HOST = "http://localhost" + RS_ADDR
+
+ if TestBig {
+ FILE_NAME = BIG_FILE
+ } else {
+ FILE_NAME = LIT_FILE
+ }
}
func doTestRsPut(rsapi rs.Service, t *testing.T) {
fmt.Println("\ndoTestRsPut ===>\n")
- if TestBig {
- f, err := os.Open(BIG_FILE)
- if err != nil {
- ts.Fatal(t, err)
- }
- defer f.Close()
- fi, _ := f.Stat()
- fsize := fi.Size()
- ret, code, err := rsapi.PutEx("foo:a", "", f, fsize, "", "", "", "1234put")
- if err != nil || code != 200 {
- ts.Fatal(t, "rs.Put failed:", code, err, ret)
- }
- fmt.Println(ret)
+ f, err := os.Open(FILE_NAME)
+ if err != nil {
+ ts.Fatal(t, err)
}
+ defer f.Close()
- {
- f, err := os.Open(LIT_FILE)
- if err != nil {
- ts.Fatal(t, err)
- }
- defer f.Close()
+ fi, _ := f.Stat()
+ fsize := fi.Size()
- fi, _ := f.Stat()
- fsize := fi.Size()
- ret, code, err := rsapi.PutEx("foo:a", "", f, fsize, "", "", "", "1234put")
- if err != nil || code != 200 {
- ts.Fatal(t, "rs.Put failed:", code, err, ret)
- }
- fmt.Println(ret)
+ h := sha1.New()
+ tee := io.TeeReader(f, h)
+ ret, code, err := rsapi.PutEx("foo:a", "", tee, fsize, "", "", "", "1234")
+ if err != nil || code != 200 {
+ ts.Fatal(t, "rs.Put failed:", code, err, ret)
+ }
+ digeset1 := h.Sum(nil)
+
+ fh, err := rpc.DecodeURI(ret.Hash)
+ if err != nil {
+ ts.Fatal(t, "DecodeURI failed:", err, fh)
+ }
+ digeset2 := download([]byte(fh), fsize, t)
+
+ if !bytes.Equal(digeset1, digeset2) {
+ ts.Fatal(t, "rsput file diff with download file")
}
}
func doTestUpload(rsapi rs.Service, t *testing.T) {
fmt.Println("\ndoTestUpload ===>\n")
+
deadline := qtime.Seconds() + 36000
authPolicy := up.AuthPolicy{
- Scope: "foo:a",
+ Scope: "foo",
Customer: "1234",
Deadline: uint32(deadline),
}
upToken := up.MakeAuthTokenString(CLIENT_ID, CLIENT_SECRET, &authPolicy)
- // big file
- if TestBig {
- ret, code, err := rs.Upload("foo:a", BIG_FILE, "", "", "", upToken)
+ // upload
+ {
+ fmt.Println("\n==>upload file\n")
+
+ f, err := os.Open(FILE_NAME)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer f.Close()
+ h := sha1.New()
+ io.Copy(h, f)
+ digeset1 := h.Sum(nil)
+
+ fi, _ := f.Stat()
+ fsize := fi.Size()
+ ret, code, err := rs.Upload("foo:a", FILE_NAME, "", "", "", upToken)
if err != nil || code != 200 {
ts.Fatal(t, "rs.Upload failed:", code, err, ret)
}
- fmt.Println(ret)
+ log.Info(ret, code)
+
+ fh, err := rpc.DecodeURI(ret.Hash)
+ if err != nil {
+ ts.Fatal(t, "DecodeURI failed:", err, fh)
+ }
+ digeset2 := download([]byte(fh), fsize, t)
+
+ if !bytes.Equal(digeset1, digeset2) {
+ ts.Fatal(t, "upload file diff with download file")
+ }
}
- // upload and crc32 & rotate
+ // crc32
{
- // calc crc32
- f, err := os.Open(IMAGE_FILE)
+ f, err := os.Open(FILE_NAME)
if err != nil {
t.Fatal(err)
}
@@ -116,31 +143,34 @@ func doTestUpload(rsapi rs.Service, t *testing.T) {
crc32Data := h.Sum32()
fmt.Println("\n==>upload: right crc32\n")
- ret, code, err := rs.UploadEx(upToken, IMAGE_FILE, "foo:a", "", "", "", int64(crc32Data), -1)
+ ret, code, err := rs.UploadEx(upToken, FILE_NAME, "foo:a", "", "", "", int64(crc32Data), -1)
if err != nil || code != 200 {
ts.Fatal(t, "rs.UploadEx failed:", code, err, ret)
}
log.Info(ret, code)
fmt.Println("\n==>upload: wrong crc32\n")
crc32ErrorData := 0
- ret, code, err = rs.UploadEx(upToken, IMAGE_FILE, "foo:a", "", "", "", int64(crc32ErrorData), -1)
+ ret, code, err = rs.UploadEx(upToken, FILE_NAME, "foo:a", "", "", "", int64(crc32ErrorData), -1)
if err == nil {
- ts.Fatal(t, "rs.UploadEx should failed:")
+ ts.Fatal(t, "rs.UploadEx should failed")
}
log.Info(ret, code)
+ }
- fmt.Println("\n==>upload: rotate 0-3\n")
+ // rotate
+ {
for rotate := 0; rotate < 4; rotate++ {
- ret, code, err = rs.UploadEx(upToken, IMAGE_FILE, "foo:a", "", "", "", int64(crc32Data), rotate)
+ fmt.Println("\n==>upload: rotate", rotate, "\n")
+ ret, code, err := rs.UploadEx(upToken, IMAGE_FILE, "foo:a", "", "", "", -1, rotate)
if err != nil || code != 200 {
ts.Fatal(t, "rs.UploadEx failed:", code, err, ret)
}
log.Info(ret, code)
}
fmt.Println("\n==>upload: rotate file which is not image\n")
- ret, code, err = rs.UploadEx(upToken, LIT_FILE, "foo:a", "", "", "", -1, 0)
+ ret, code, err := rs.UploadEx(upToken, FILE_NAME, "foo:a", "", "", "", -1, 0)
if err != nil {
ts.Fatal(t, "rs.UploadEx failed: not image can upload success")
}
@@ -154,115 +184,67 @@ func doTestUpload(rsapi rs.Service, t *testing.T) {
log.Info(ret, code, err)
}
- // TestInject
+ // async action && callback
{
- queryStr := "mimeType/111/meta/222/crc32/333/customer/444"
- err := checkQueryStr(queryStr)
- if err == nil {
- t.Fatal("TestInject:", err)
- }
- queryStr = "mimeType/111/meta/222"
- err = checkQueryStr(queryStr)
- if err != nil {
- t.Fatal("TestInject:", err)
+ fmt.Println("\n==>upload and callback\n")
+ authPolicy.AsyncOps = "imageView/0/w/300/h/300;imagePreview/25"
+ authPolicy.CallbackUrl = RS_HOST + "/callback"
+ upToken := up.MakeAuthTokenString(CLIENT_ID, CLIENT_SECRET, &authPolicy)
+ ret, code, err := rs.Upload("foo:a", FILE_NAME, "", "", "helloworld", upToken)
+ if err != nil || code != 222 {
+ ts.Fatal(t, "rs.Upload callback failed:", code, err, ret)
}
+ fmt.Println(ret)
}
}
func doTestResumablePut(upapi up.Service, rsapi rs.Service, t *testing.T) {
fmt.Println("\ndoTestResumablePut ===>\n")
- // little file
- {
- fsize := int64(3)
- n := up.BlockCount(fsize)
- if n != 1 {
- ts.Fatal(t, "ChunkCount failed:", n)
- }
-
- checksums := make([]string, n)
- progs := make([]up.BlockputProgress, n)
- blockNotify := func(blockIdx int, checksum string) {
- fmt.Println("Notify:", blockIdx, checksum)
- }
- chunkNotify := func(blockIdx int, prog *up.BlockputProgress) {
- fmt.Println("Blockput:", blockIdx, prog.Offset, prog.RestSize)
- }
- ret, code, err := rsapi.ResumablePut(
- upapi, checksums, progs, blockNotify, chunkNotify, "foo:a", "", strings.NewReader("foo"), fsize, "", "")
- if err != nil || code != 200 {
- ts.Fatal(t, "rs.ResumablePut failed:", code, err, progs)
- }
- fmt.Println(ret)
- }
-
- // big file
- if TestBig {
- f, err := os.Open(BIG_FILE)
- if err != nil {
- ts.Fatal(t, err)
- }
- defer f.Close()
-
- fi, _ := f.Stat()
- fsize := fi.Size()
- n := up.BlockCount(fsize)
- if n != 3 {
- ts.Fatal(t, "ChunkCount failed:", n)
- }
-
- checksums := make([]string, n)
- progs := make([]up.BlockputProgress, n)
- blockNotify := func(blockIdx int, checksum string) {
- fmt.Println("Notify:", blockIdx, checksum)
- }
- chunkNotify := func(blockIdx int, prog *up.BlockputProgress) {
- fmt.Println("Blockput:", blockIdx, prog.Offset, prog.RestSize)
- }
- ret, code, err := rsapi.ResumablePut(
- upapi, checksums, progs, blockNotify, chunkNotify, "foo:a", "", f, fsize, "", "<params>")
- if err != nil || code != 200 {
- ts.Fatal(t, "rs.ResumablePut failed:", code, err, progs)
- }
- fmt.Println(ret)
- }
-}
-
-func doTestFile(rsapi rs.Service, t *testing.T) {
-
- fmt.Println("\ndoTestFile ===>\n")
-
- var FILE_NAME string
- if TestBig {
- FILE_NAME = BIG_FILE
- } else {
- FILE_NAME = LIT_FILE
- }
-
- // first put file
-
+
f, err := os.Open(FILE_NAME)
if err != nil {
ts.Fatal(t, err)
}
defer f.Close()
+ h := sha1.New()
+ io.Copy(h, f)
+ digeset1 := h.Sum(nil)
+ f.Seek(0, 0)
+
fi, _ := f.Stat()
fsize := fi.Size()
- ret, code, err := rsapi.Put("foo:a", "", f, fsize, "", "", "")
- if err != nil || code != 200 {
- ts.Fatal(t, "rs.Put failed:", code, err, ret)
+ n := up.BlockCount(fsize)
+ checksums := make([]string, n)
+ progs := make([]up.BlockputProgress, n)
+ blockNotify := func(blockIdx int, checksum string) {
+ fmt.Println("Notify:", blockIdx, checksum)
+ }
+ chunkNotify := func(blockIdx int, prog *up.BlockputProgress) {
+ fmt.Println("Blockput:", blockIdx, prog.Offset, prog.RestSize)
}
- fmt.Println(ret)
-
- // then get file
- fh_, err := rpc.DecodeURI(ret.Hash)
+ ret, code, err := rsapi.ResumablePut(
+ upapi, checksums, progs, blockNotify, chunkNotify, "foo:a", "", f, fsize, "", "<params>")
+ if err != nil || code != 200 {
+ ts.Fatal(t, "rs.ResumablePut failed:", code, err, progs)
+ }
+ log.Info(ret, code)
+
+ fh, err := rpc.DecodeURI(ret.Hash)
if err != nil {
- ts.Fatal(t, "DecodeURI failed:", err, fh_)
+ ts.Fatal(t, "DecodeURI failed:", err, fh)
}
- fh := []byte(fh_)
- fmt.Println("fh: ", fh)
+ digeset2 := download([]byte(fh), fsize, t)
+
+ if !bytes.Equal(digeset1, digeset2) {
+ ts.Fatal(t, "ResumablePut file diff with download file")
+ }
+}
+
+func download(fh []byte, fsize int64, t *testing.T) (digeset []byte) {
+
fhi := &sstore.FhandleInfo{
Fhandle: fh,
MimeType: "application/octet-stream",
@@ -280,26 +262,14 @@ func doTestFile(rsapi rs.Service, t *testing.T) {
ts.Fatal(t, "rs.file failed:", err)
}
defer resp.Body.Close()
- code = resp.StatusCode
+ code := resp.StatusCode
if code/100 != 2 {
ts.Fatal(t, "rs.file failed:", code)
}
- sha1Hash := sha1.New()
- n1, err := io.Copy(sha1Hash, resp.Body)
- digeset1 := sha1Hash.Sum(nil)
-
- f.Close()
- f, err = os.Open(FILE_NAME)
- sha1Hash.Reset()
- n2, err := io.Copy(sha1Hash, f)
- digeset2 := sha1Hash.Sum(nil)
-
- fmt.Println("n1 & n2", n1, n2)
-
- if !bytes.Equal(digeset1, digeset2) {
- ts.Fatal(t, "upload file diff with download file")
- }
+ h := sha1.New()
+ _, err = io.Copy(h, resp.Body)
+ return h.Sum(nil)
}
func rsRun(t *testing.T) {
@@ -331,7 +301,7 @@ func rsRun(t *testing.T) {
mux.HandleFunc("/callback", func(w http.ResponseWriter, req *http.Request) {
fmt.Println("[RS] callback:", req)
- httputil.Reply(w, 200, map[string]string{
+ httputil.Reply(w, 222, map[string]string{
"ret": "<CallbackRet>",
})
})
@@ -342,12 +312,11 @@ func rsRun(t *testing.T) {
}
}
+var root = os.Getenv("HOME") + "/upTestRoot"
+
func upRun(t *testing.T) {
- home := os.Getenv("HOME")
- root := home + "/upTestRoot"
fmt.Println("ROOT:", root)
-
os.RemoveAll(root)
err := os.Mkdir(root, 0777)
if err != nil {
@@ -363,6 +332,11 @@ func upRun(t *testing.T) {
ss := store.NewSimpleStorage2()
bp := NewBlockPool(128)
+ act := func(owner uint32, entryURI string, fh []byte, fsize int64, asyncOps string) (err error) {
+ fmt.Println("owner:", owner, "entryURI:", entryURI, "asyncOps:", asyncOps, "fsize:", fsize, "fh:", fh)
+ return errors.New("return error for test")
+ }
+
cfg := &Config{
Account: acc,
File: f,
@@ -378,6 +352,7 @@ func upRun(t *testing.T) {
MyIdc: 1,
MyHost: "http://localhost" + UP_ADDR,
IgnoreDeadline: true,
+ AsyncAct: act,
}
p, err := New(cfg)
@@ -406,13 +381,13 @@ func doTestUpsvr(t *testing.T) {
doTestResumablePut(upapi, rsapi, t)
doTestRsPut(rsapi, t)
doTestUpload(rsapi, t)
- doTestFile(rsapi, t)
}
func TestUpsvr(t *testing.T) {
go upRun(t)
go rsRun(t)
- time.Sleep(1e9)
+ time.Sleep(2e9)
doTestUpsvr(t)
+ os.RemoveAll(root)
}

0 comments on commit ae52b60

Please sign in to comment.