Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/upload_simple.js → examples/form_upload_simple.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ var putPolicy = new qiniu.rs.PutPolicy(options);

var uploadToken = putPolicy.uploadToken(mac);
var config = new qiniu.conf.Config();
var localFile = "/Users/jemy/Documents/github.png";
var localFile = "/Users/jemy/Documents/qiniu.mp4";
//config.zone = qiniu.zone.Zone_z0;
var formUploader = new qiniu.form_io.FormUploader(config);
var putExtra = new qiniu.form_io.PutExtra();


//bytes
formUploader.put(uploadToken, null, "hello", null, function(respErr,
respBody, respInfo) {
Expand Down
39 changes: 39 additions & 0 deletions examples/resume_upload_simple.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const qiniu = require("../index.js");
const proc = require("process");

var bucket = 'if-pbl';
var accessKey = proc.env.QINIU_ACCESS_KEY;
var secretKey = proc.env.QINIU_SECRET_KEY;
var mac = new qiniu.auth.digest.Mac(accessKey, secretKey);
var options = {
scope: bucket,
}
var putPolicy = new qiniu.rs.PutPolicy(options);

var uploadToken = putPolicy.uploadToken(mac);
var config = new qiniu.conf.Config();
var localFile = "/Users/jemy/Documents/qiniu.mp4";
config.zone = qiniu.zone.Zone_z0;
config.useCdnDomain = true;
var resumeUploader = new qiniu.resume_io.ResumeUploader(config);
var putExtra = new qiniu.resume_io.PutExtra();
putExtra.params = {
"x:name": "",
"x:age": 27,
}
putExtra.fname = 'testfile.mp4';

//file
resumeUploader.putFile(uploadToken, null, localFile, putExtra, function(respErr,
respBody, respInfo) {
if (respErr) {
throw respErr;
}

if (respInfo.statusCode == 200) {
console.log(respBody);
} else {
console.log(respInfo.statusCode);
console.log(respBody);
}
});
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = {
},
cdn: require(libPath + "/cdn.js"),
form_io: require(libPath + '/io/form.js'),
resume_io: require(libPath + '/io/resume.js'),
rs: require(libPath + '/rs.js'),
fop: require(libPath + '/fop.js'),
conf: require(libPath + '/conf.js'),
Expand Down
8 changes: 4 additions & 4 deletions qiniu/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ var defaultUserAgent = function() {
}

exports.USER_AGENT = defaultUserAgent();
const BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change
exports.BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change

//define api form mime type
exports.FormMimeUrl = "application/x-www-form-urlencoded";
exports.FormMimeJson = "application/json";
exports.FormMimeRaw = "application/octet-stream";
exports.RS_HOST = "http://rs.qiniu.com";
exports.RPC_TIMEOUT = 30000; //30s
exports.RPC_TIMEOUT = 60000; //60s

exports.Config = function Config(options) {
options = options || {};
Expand All @@ -28,11 +28,11 @@ exports.Config = function Config(options) {
//response timeout, in seconds
this.responseTimeout = options.responseTimeout || 30;
//put threshold, in bytes
this.putThreshold = options.putThreshold || BLOCK_SIZE;
this.putThreshold = options.putThreshold || exports.BLOCK_SIZE;
//use http or https protocol
this.useHttpsDomain = options.useHttpsDomain || false;
//use cdn accerlated domains
this.useCdnDomain = options.useCdnDomain || false;
this.useCdnDomain = options.useCdnDomain || true;
//max retry times for chunk upload
this.maxRetryTimes = options.maxRetryTimes || 3;
//zone of the bucket
Expand Down
252 changes: 252 additions & 0 deletions qiniu/io/resume.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
const conf = require('../conf');
const zone = require('../zone');
const util = require('../util');
const rpc = require('../rpc');
const path = require('path');
const mime = require('mime');
const fs = require('fs');

exports.ResumeUploader = ResumeUploader;
exports.PutExtra = PutExtra;

function ResumeUploader(config) {
this.config = config || new conf.Config();
}

// 上传可选参数
// @params fname 请求体中的文件的名称
// @params params 额外参数设置,参数名称必须以x:开头
// @param mimeType 指定文件的mimeType
// @param progressCallback(BlkputRet) 上传进度回调
// @param ctxList 断点续传的已上传的文件ctx列表,
// 在上传的过程中可以在progressCallback中写入本地文件
function PutExtra(fname, params, mimeType, blkputRets, progressCallback) {
this.fname = fname || '';
this.params = params || {};
this.mimeType = mimeType || null;
this.blkputRets = blkputRets || [];
this.progressCallback = progressCallback || null;
}

function BlkputRet(options) {
this.ctx = options.ctx || null;
this.checksum = options.checksum || null;
this.crc32 = options.crc32 || null;
this.offset = options.offset || null;
this.host = options.host || null;
this.expired_at = options.expired_at || null;
}

ResumeUploader.prototype.putStream = function(uploadToken, key, rsStream,
rsStreamLen, putExtra, callbackFunc) {
putExtra = putExtra || new PutExtra();
if (!putExtra.mimeType) {
putExtra.mimeType = 'application/octet-stream';
}

if (!putExtra.fname) {
putExtra.fname = key ? key : '?';
}

rsStream.on("error", function(err) {
//callbackFunc
callbackFunc(err, null, null);
return;
});

var useCache = false;
var that = this;
if (this.config.zone) {
if (this.config.zoneExpire == -1) {
useCache = true;
} else {
if (!util.isTimestampExpired(this.config.zoneExpire)) {
useCache = true;
}
}
}

var accessKey = util.getAKFromUptoken(uploadToken);
var bucket = util.getBucketFromUptoken(uploadToken);
if (useCache) {
putReq(this.config, uploadToken, key, rsStream, rsStreamLen, putExtra,
callbackFunc);
} else {
zone.getZoneInfo(accessKey, bucket, function(err, cZoneInfo,
cZoneExpire) {
if (err) {
callbackFunc(err, null, null);
return;
}

//update object
that.config.zone = cZoneInfo;
that.config.zoneExpire = cZoneExpire;

//req
putReq(that.config, uploadToken, key, rsStream, rsStreamLen,
putExtra,
callbackFunc);
});
}
}

function putReq(config, uploadToken, key, rsStream, rsStreamLen, putExtra,
callbackFunc) {
//set up hosts order
var upHosts = [];

if (config.useCdnDomain) {
if (config.zone.cdnUpHosts) {
config.zone.cdnUpHosts.forEach(function(host) {
upHosts.push(host);
});
}
config.zone.srcUpHosts.forEach(function(host) {
upHosts.push(host);
});
} else {
config.zone.srcUpHosts.forEach(function(host) {
upHosts.push(host);
});
config.zone.cdnUpHosts.forEach(function(host) {
upHosts.push(host);
});
}

var scheme = config.useHttpsDomain ? "https://" : "http://";
var upDomain = scheme + upHosts[0];
// block upload

var fileSize = rsStreamLen;
//console.log("file size:" + fileSize);
var blockCnt = fileSize / conf.BLOCK_SIZE
var totalBlockNum = (fileSize % conf.BLOCK_SIZE == 0) ? blockCnt : (blockCnt +
1);
var finishedBlock = 0;
var curBlock = 0;
var readLen = 0;
var readBuffers = [];
var finishedCtxList = [];

//check putExtra.blkputRets
if (putExtra.blkputRets) {
for (var index = 0; index < putExtra.blkputRets.length; index++) {
//check ctx expired or not
var blkputRet = putExtra.blkputRets[index];
var expiredAt = blkputRet.expired_at;
//make sure the ctx at least has one day expiration
expiredAt += 3600 * 24;
if (util.isTimestampExpired(expiredAt)) {
//discard these ctxs
break;
}

finishedBlock += 1;
finishedCtxList.push(blkputRet.ctx);
}
}

//check when to mkblk
rsStream.on('data', function(chunk) {
readLen += chunk.length;
readBuffers.push(chunk);

if (readLen % conf.BLOCK_SIZE == 0 || readLen == fileSize) {
//console.log(readLen);
var readData = Buffer.concat(readBuffers);
readBuffers = []; //reset read buffer
curBlock += 1; //set current block
if (curBlock > finishedBlock) {
rsStream.pause();
mkblkReq(upDomain, uploadToken, readData, function(respErr,
respBody,
respInfo) {
if (respInfo.statusCode != 200) {
callbackFunc(respErr, respBody, respInfo);
return;
} else {
finishedBlock += 1;
rsStream.resume();
var blkputRet = respBody;
finishedCtxList.push(blkputRet.ctx);
if (putExtra.progressCallback) {
putExtra.progressCallback(blkputRet);
}
}
});
}
}
});

//check when to mkfile
rsStream.on('end', function() {
//console.log("end");
mkfileReq(upDomain, uploadToken, fileSize, finishedCtxList, key,
putExtra, callbackFunc);
});
}

function mkblkReq(upDomain, uploadToken, blkData, callbackFunc) {
//console.log("mkblk");
var requestURI = upDomain + "/mkblk/" + blkData.length;
var auth = 'UpToken ' + uploadToken;
var headers = {
'Authorization': auth,
'Content-Type': 'application/octet-stream'
}
rpc.post(requestURI, blkData, headers, callbackFunc);
}

function mkfileReq(upDomain, uploadToken, fileSize, ctxList, key, putExtra,
callbackFunc) {
//console.log("mkfile");
var requestURI = upDomain + "/mkfile/" + fileSize;
if (key) {
requestURI += "/key/" + util.urlsafeBase64Encode(key);
}
if (putExtra.mimeType) {
requestURI += "/mimeType/" + util.urlsafeBase64Encode(putExtra.mimeType);
}
if (putExtra.fname) {
requestURI += "/fname/" + util.urlsafeBase64Encode(putExtra.fname);
}
if (putExtra.params) {
//putExtra params
for (var k in putExtra.params) {
if (k.startsWith("x:") && putExtra.params[k]) {
requestURI += "/" + k + "/" + util.urlsafeBase64Encode(putExtra.params[
k].toString());
}
}
}
var auth = 'UpToken ' + uploadToken;
var headers = {
'Authorization': auth,
'Content-Type': 'application/octet-stream'
}
var postBody = ctxList.join(",");
rpc.post(requestURI, postBody, headers, callbackFunc);
}

ResumeUploader.prototype.putFile = function(uploadToken, key, localFile,
putExtra, callbackFunc) {
putExtra = putExtra || new PutExtra();
var rsStream = fs.createReadStream(localFile);
var rsStreamLen = fs.statSync(localFile).size;
if (!putExtra.mimeType) {
putExtra.mimeType = mime.lookup(localFile);
}

if (!putExtra.fname) {
putExtra.fname = path.basename(localFile);
}

return this.putStream(uploadToken, key, rsStream, rsStreamLen, putExtra,
callbackFunc);
}

ResumeUploader.prototype.putFileWithoutKey = function(uploadToken, localFile,
putExtra, callbackFunc) {
return this.putFile(uploadToken, null, localFile, putExtra, callbackFunc);
}
14 changes: 14 additions & 0 deletions qiniu/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var urllib = require('urllib');
var util = require('./util');
var conf = require('./conf');

exports.post = post;
exports.postMultipart = postMultipart;
exports.postWithForm = postWithForm;
exports.postWithoutForm = postWithoutForm;
Expand Down Expand Up @@ -31,14 +32,18 @@ function postWithoutForm(requestURI, token, callbackFunc) {
}

function post(requestURI, requestForm, headers, callbackFunc) {
//var start = parseInt(Date.now() / 1000);
headers = headers || {};
headers['User-Agent'] = headers['User-Agent'] || conf.USER_AGENT;
headers['Connection'] = 'keep-alive';

var data = {
headers: headers,
method: 'POST',
dataType: 'json',
timeout: conf.RPC_TIMEOUT,
gzip: true,
// timing: true,
};

if (Buffer.isBuffer(requestForm) || typeof requestForm === 'string') {
Expand All @@ -51,6 +56,15 @@ function post(requestURI, requestForm, headers, callbackFunc) {

var req = urllib.request(requestURI, data, function(respErr, respBody,
respInfo) {
//var end = parseInt(Date.now() / 1000);
// console.log((end - start) + " seconds");
// console.log("queuing:\t" + respInfo.timing.queuing);
// console.log("dnslookup:\t" + respInfo.timing.dnslookup);
// console.log("connected:\t" + respInfo.timing.connected);
// console.log("requestSent:\t" + respInfo.timing.requestSent);
// console.log("waiting:\t" + respInfo.timing.waiting);
// console.log("contentDownload:\t" + respInfo.timing.contentDownload);

callbackFunc(respErr, respBody, respInfo);
});

Expand Down
Loading