Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Qiniu.Test/IO/Resumable/ResumablePutTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Specialized;
using System.IO;
using NUnit.Framework;
using Qiniu.IO.Resumable;
Expand All @@ -25,6 +26,9 @@ public void ResumablePutFileTest()
Settings putSetting = new Settings(); // TODO: 初始化为适当的值
string key=NewKey;
ResumablePutExtra extra = new ResumablePutExtra();
NameValueCollection nc = new NameValueCollection ();
nc.Add("x:username","qiniu");
extra.CallbackParams = nc;
extra.Notify += new EventHandler<PutNotifyEvent>(extra_Notify);
extra.NotifyErr += new EventHandler<PutNotifyErrorEvent>(extra_NotifyErr);
extra.Bucket = Bucket;
Expand All @@ -39,7 +43,7 @@ public void ResumablePutFileTest()
RSHelper.RSDel (Bucket, key);
}
});
target.PutFile (upToken, file.FileName, key);
CallRet ret =target.PutFile (upToken, file.FileName, key);

//Action a = new Action (() =>
//{
Expand Down
69 changes: 45 additions & 24 deletions Qiniu/IO/Resumable/ResumablePut.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@ public class ResumablePut
private const int blockMashk = (1 << blockBits) - 1;
private static int BLOCKSIZE = 4 * 1024 * 1024;
private const string UNDEFINED_KEY = "?";

#region 记录总文件大小,用于计算上传百分比

private long fsize;
private float chunks;
private float uploadedChunks = 0;

#endregion

/// <summary>
/// 上传完成事件
/// </summary>
public event EventHandler<CallRet> PutFinished;
/// <summary>
/// 上传Failure事件
/// </summary>
public event EventHandler<CallRet> PutFailure;
/// <summary>
/// 进度提示事件
/// </summary>
public event Action<float> Progress;
Expand Down Expand Up @@ -71,7 +79,7 @@ public ResumablePut (Settings putSetting, ResumablePutExtra extra)
/// <param name="upToken">上传Token</param>
/// <param name="key">key</param>
/// <param name="localFile">本地文件名</param>
public void PutFile (string upToken, string localFile, string key)
public CallRet PutFile (string upToken, string localFile, string key)
{
PutAuthClient client = new PutAuthClient (upToken);
using (FileStream fs = File.OpenRead(localFile)) {
Expand All @@ -80,36 +88,42 @@ public void PutFile (string upToken, string localFile, string key)
chunks = fsize / extra.chunkSize + 1;
extra.Progresses = new BlkputRet[block_cnt];
//并行上传
Parallel.For (0, block_cnt, (i) =>
{
Parallel.For (0, block_cnt, (i) => {
int readLen = BLOCKSIZE;
if ((i + 1) * BLOCKSIZE > fsize)
readLen = (int)(fsize - i * BLOCKSIZE);
byte[] byteBuf = new byte[readLen];
lock (fs) {
fs.Seek (i * BLOCKSIZE, SeekOrigin.Begin);
fs.Read (byteBuf, 0, readLen);
BlkputRet blkRet = ResumableBlockPut (client, byteBuf, i, readLen);
if (blkRet == null) {
extra.OnNotifyErr (new PutNotifyErrorEvent (i, readLen, "Make Block Error"));
} else {
extra.OnNotify (new PutNotifyEvent (i, readLen, extra.Progresses [i]));
}
}
BlkputRet blkRet = ResumableBlockPut (client, byteBuf, i, readLen);

if (blkRet == null) {
extra.OnNotifyErr (new PutNotifyErrorEvent (i, readLen, "Make Block Error"));
} else {
extra.OnNotify (new PutNotifyEvent (i, readLen, extra.Progresses [i]));
}

});
if (string.IsNullOrEmpty (key)) {
key = UNDEFINED_KEY;
}
CallRet ret = Mkfile (client, key, fs.Length);
if (Progress != null) {
Progress (1.0f);
}
if (PutFinished != null) {
PutFinished (this, ret);
if (ret.OK) {
if (Progress != null) {
Progress (1.0f);
}
if (PutFinished != null) {
PutFinished (this, ret);
}
} else {
if (PutFailure != null) {
PutFailure (this, ret);
}
}
return ret;
}

}

/// <summary>
Expand Down Expand Up @@ -147,8 +161,9 @@ private BlkputRet ResumableBlockPut (Client client, byte[] body, int blkIdex, in
}
}
#endregion

#region PutBlock
while (extra.Progresses[blkIdex].offset < blkSize) {
while (extra.Progresses [blkIdex].offset < blkSize) {
bodyLength = (chunkSize < (blkSize - extra.Progresses [blkIdex].offset)) ? chunkSize : (int)(blkSize - extra.Progresses [blkIdex].offset);
byte[] chunk = new byte[bodyLength];
Array.Copy (body, extra.Progresses [blkIdex].offset, chunk, 0, bodyLength);
Expand Down Expand Up @@ -194,17 +209,23 @@ private BlkputRet BlockPut (Client client, BlkputRet ret, Stream body, long leng

private CallRet Mkfile (Client client, string key, long fsize)
{
EntryPath entry = new EntryPath (extra.Bucket, key);
string url = string.Format ("{0}/rs-mkfile/{1}/fsize/{2}/", Config.UP_HOST, entry.Base64EncodedURI, fsize);
StringBuilder urlBuilder = new StringBuilder ();

urlBuilder.AppendFormat ("{0}/mkfile/{1}/key/{2}", Config.UP_HOST, fsize, key.ToBase64URLSafe ());
if (!string.IsNullOrEmpty (extra.MimeType)) {
url += (string.Format ("/mimeType/{0}", extra.MimeType.ToBase64URLSafe ()));
urlBuilder.AppendFormat ("/mimeType/{0}", extra.MimeType.ToBase64URLSafe ());
}
if (!string.IsNullOrEmpty (extra.CustomMeta)) {
url += (string.Format ("/meta/{0}", extra.CustomMeta.ToBase64URLSafe ()));
urlBuilder.AppendFormat ("/meta/{0}", extra.CustomMeta.ToBase64URLSafe ());
}
if (!string.IsNullOrEmpty (extra.CallbackParams)) {
url += (string.Format ("/params/{0}", extra.CallbackParams.ToBase64URLSafe ()));
if (extra.CallbackParams != null && extra.CallbackParams.Count > 0) {
StringBuilder sb = new StringBuilder ();
foreach (string _key in extra.CallbackParams.Keys) {
sb.AppendFormat ("/{0}/{1}", _key, extra.CallbackParams [_key].ToBase64URLSafe ());
}
urlBuilder.Append (sb.ToString ());
}

int proCount = extra.Progresses.Length;
using (Stream body = new MemoryStream()) {
for (int i = 0; i < proCount; i++) {
Expand All @@ -215,7 +236,7 @@ private CallRet Mkfile (Client client, string key, long fsize)
}
}
body.Seek (0, SeekOrigin.Begin);
return client.CallWithBinary (url, "text/plain", body, body.Length);
return client.CallWithBinary (urlBuilder.ToString (), "text/plain", body, body.Length);
}
}

Expand Down
4 changes: 3 additions & 1 deletion Qiniu/IO/Resumable/ResumablePutExtra.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Specialized;

namespace Qiniu.IO.Resumable
{
Expand Down Expand Up @@ -69,7 +70,8 @@ public PutNotifyErrorEvent (int blkIdx, int blkSize, string error)
/// </summary>
public class ResumablePutExtra
{
public string CallbackParams;
//key format as: "x:var"
public NameValueCollection CallbackParams;
public string Bucket;
public string CustomMeta;
public string MimeType;
Expand Down