diff --git a/examples/upload_simple.js b/examples/form_upload_simple.js similarity index 95% rename from examples/upload_simple.js rename to examples/form_upload_simple.js index 0265f208..763df57d 100644 --- a/examples/upload_simple.js +++ b/examples/form_upload_simple.js @@ -12,12 +12,11 @@ var putPolicy = new qiniu.rs.PutPolicy(options); var uploadToken = putPolicy.uploadToken(mac); var config = new qiniu.conf.Config(); -var localFile = "/Users/jemy/Documents/github.png"; +var localFile = "/Users/jemy/Documents/qiniu.mp4"; //config.zone = qiniu.zone.Zone_z0; var formUploader = new qiniu.form_io.FormUploader(config); var putExtra = new qiniu.form_io.PutExtra(); - //bytes formUploader.put(uploadToken, null, "hello", null, function(respErr, respBody, respInfo) { diff --git a/examples/resume_upload_simple.js b/examples/resume_upload_simple.js new file mode 100644 index 00000000..48241be1 --- /dev/null +++ b/examples/resume_upload_simple.js @@ -0,0 +1,39 @@ +const qiniu = require("../index.js"); +const proc = require("process"); + +var bucket = 'if-pbl'; +var accessKey = proc.env.QINIU_ACCESS_KEY; +var secretKey = proc.env.QINIU_SECRET_KEY; +var mac = new qiniu.auth.digest.Mac(accessKey, secretKey); +var options = { + scope: bucket, +} +var putPolicy = new qiniu.rs.PutPolicy(options); + +var uploadToken = putPolicy.uploadToken(mac); +var config = new qiniu.conf.Config(); +var localFile = "/Users/jemy/Documents/qiniu.mp4"; +config.zone = qiniu.zone.Zone_z0; +config.useCdnDomain = true; +var resumeUploader = new qiniu.resume_io.ResumeUploader(config); +var putExtra = new qiniu.resume_io.PutExtra(); +putExtra.params = { + "x:name": "", + "x:age": 27, +} +putExtra.fname = 'testfile.mp4'; + +//file +resumeUploader.putFile(uploadToken, null, localFile, putExtra, function(respErr, + respBody, respInfo) { + if (respErr) { + throw respErr; + } + + if (respInfo.statusCode == 200) { + console.log(respBody); + } else { + console.log(respInfo.statusCode); + console.log(respBody); + } +}); diff --git a/index.js b/index.js index 85f9a0a6..ee610d1a 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ module.exports = { }, cdn: require(libPath + "/cdn.js"), form_io: require(libPath + '/io/form.js'), + resume_io: require(libPath + '/io/resume.js'), rs: require(libPath + '/rs.js'), fop: require(libPath + '/fop.js'), conf: require(libPath + '/conf.js'), diff --git a/qiniu/conf.js b/qiniu/conf.js index cc748855..159cd68c 100644 --- a/qiniu/conf.js +++ b/qiniu/conf.js @@ -12,14 +12,14 @@ var defaultUserAgent = function() { } exports.USER_AGENT = defaultUserAgent(); -const BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change +exports.BLOCK_SIZE = 4 * 1024 * 1024; //4MB, never change //define api form mime type exports.FormMimeUrl = "application/x-www-form-urlencoded"; exports.FormMimeJson = "application/json"; exports.FormMimeRaw = "application/octet-stream"; exports.RS_HOST = "http://rs.qiniu.com"; -exports.RPC_TIMEOUT = 30000; //30s +exports.RPC_TIMEOUT = 60000; //60s exports.Config = function Config(options) { options = options || {}; @@ -28,11 +28,11 @@ exports.Config = function Config(options) { //response timeout, in seconds this.responseTimeout = options.responseTimeout || 30; //put threshold, in bytes - this.putThreshold = options.putThreshold || BLOCK_SIZE; + this.putThreshold = options.putThreshold || exports.BLOCK_SIZE; //use http or https protocol this.useHttpsDomain = options.useHttpsDomain || false; //use cdn accerlated domains - this.useCdnDomain = options.useCdnDomain || false; + this.useCdnDomain = options.useCdnDomain || true; //max retry times for chunk upload this.maxRetryTimes = options.maxRetryTimes || 3; //zone of the bucket diff --git a/qiniu/io/resume.js b/qiniu/io/resume.js new file mode 100644 index 00000000..52bfcbb9 --- /dev/null +++ b/qiniu/io/resume.js @@ -0,0 +1,252 @@ +const conf = require('../conf'); +const zone = require('../zone'); +const util = require('../util'); +const rpc = require('../rpc'); +const path = require('path'); +const mime = require('mime'); +const fs = require('fs'); + +exports.ResumeUploader = ResumeUploader; +exports.PutExtra = PutExtra; + +function ResumeUploader(config) { + this.config = config || new conf.Config(); +} + +// 上传可选参数 +// @params fname 请求体中的文件的名称 +// @params params 额外参数设置,参数名称必须以x:开头 +// @param mimeType 指定文件的mimeType +// @param progressCallback(BlkputRet) 上传进度回调 +// @param ctxList 断点续传的已上传的文件ctx列表, +// 在上传的过程中可以在progressCallback中写入本地文件 +function PutExtra(fname, params, mimeType, blkputRets, progressCallback) { + this.fname = fname || ''; + this.params = params || {}; + this.mimeType = mimeType || null; + this.blkputRets = blkputRets || []; + this.progressCallback = progressCallback || null; +} + +function BlkputRet(options) { + this.ctx = options.ctx || null; + this.checksum = options.checksum || null; + this.crc32 = options.crc32 || null; + this.offset = options.offset || null; + this.host = options.host || null; + this.expired_at = options.expired_at || null; +} + +ResumeUploader.prototype.putStream = function(uploadToken, key, rsStream, + rsStreamLen, putExtra, callbackFunc) { + putExtra = putExtra || new PutExtra(); + if (!putExtra.mimeType) { + putExtra.mimeType = 'application/octet-stream'; + } + + if (!putExtra.fname) { + putExtra.fname = key ? key : '?'; + } + + rsStream.on("error", function(err) { + //callbackFunc + callbackFunc(err, null, null); + return; + }); + + var useCache = false; + var that = this; + if (this.config.zone) { + if (this.config.zoneExpire == -1) { + useCache = true; + } else { + if (!util.isTimestampExpired(this.config.zoneExpire)) { + useCache = true; + } + } + } + + var accessKey = util.getAKFromUptoken(uploadToken); + var bucket = util.getBucketFromUptoken(uploadToken); + if (useCache) { + putReq(this.config, uploadToken, key, rsStream, rsStreamLen, putExtra, + callbackFunc); + } else { + zone.getZoneInfo(accessKey, bucket, function(err, cZoneInfo, + cZoneExpire) { + if (err) { + callbackFunc(err, null, null); + return; + } + + //update object + that.config.zone = cZoneInfo; + that.config.zoneExpire = cZoneExpire; + + //req + putReq(that.config, uploadToken, key, rsStream, rsStreamLen, + putExtra, + callbackFunc); + }); + } +} + +function putReq(config, uploadToken, key, rsStream, rsStreamLen, putExtra, + callbackFunc) { + //set up hosts order + var upHosts = []; + + if (config.useCdnDomain) { + if (config.zone.cdnUpHosts) { + config.zone.cdnUpHosts.forEach(function(host) { + upHosts.push(host); + }); + } + config.zone.srcUpHosts.forEach(function(host) { + upHosts.push(host); + }); + } else { + config.zone.srcUpHosts.forEach(function(host) { + upHosts.push(host); + }); + config.zone.cdnUpHosts.forEach(function(host) { + upHosts.push(host); + }); + } + + var scheme = config.useHttpsDomain ? "https://" : "http://"; + var upDomain = scheme + upHosts[0]; + // block upload + + var fileSize = rsStreamLen; + //console.log("file size:" + fileSize); + var blockCnt = fileSize / conf.BLOCK_SIZE + var totalBlockNum = (fileSize % conf.BLOCK_SIZE == 0) ? blockCnt : (blockCnt + + 1); + var finishedBlock = 0; + var curBlock = 0; + var readLen = 0; + var readBuffers = []; + var finishedCtxList = []; + + //check putExtra.blkputRets + if (putExtra.blkputRets) { + for (var index = 0; index < putExtra.blkputRets.length; index++) { + //check ctx expired or not + var blkputRet = putExtra.blkputRets[index]; + var expiredAt = blkputRet.expired_at; + //make sure the ctx at least has one day expiration + expiredAt += 3600 * 24; + if (util.isTimestampExpired(expiredAt)) { + //discard these ctxs + break; + } + + finishedBlock += 1; + finishedCtxList.push(blkputRet.ctx); + } + } + + //check when to mkblk + rsStream.on('data', function(chunk) { + readLen += chunk.length; + readBuffers.push(chunk); + + if (readLen % conf.BLOCK_SIZE == 0 || readLen == fileSize) { + //console.log(readLen); + var readData = Buffer.concat(readBuffers); + readBuffers = []; //reset read buffer + curBlock += 1; //set current block + if (curBlock > finishedBlock) { + rsStream.pause(); + mkblkReq(upDomain, uploadToken, readData, function(respErr, + respBody, + respInfo) { + if (respInfo.statusCode != 200) { + callbackFunc(respErr, respBody, respInfo); + return; + } else { + finishedBlock += 1; + rsStream.resume(); + var blkputRet = respBody; + finishedCtxList.push(blkputRet.ctx); + if (putExtra.progressCallback) { + putExtra.progressCallback(blkputRet); + } + } + }); + } + } + }); + + //check when to mkfile + rsStream.on('end', function() { + //console.log("end"); + mkfileReq(upDomain, uploadToken, fileSize, finishedCtxList, key, + putExtra, callbackFunc); + }); +} + +function mkblkReq(upDomain, uploadToken, blkData, callbackFunc) { + //console.log("mkblk"); + var requestURI = upDomain + "/mkblk/" + blkData.length; + var auth = 'UpToken ' + uploadToken; + var headers = { + 'Authorization': auth, + 'Content-Type': 'application/octet-stream' + } + rpc.post(requestURI, blkData, headers, callbackFunc); +} + +function mkfileReq(upDomain, uploadToken, fileSize, ctxList, key, putExtra, + callbackFunc) { + //console.log("mkfile"); + var requestURI = upDomain + "/mkfile/" + fileSize; + if (key) { + requestURI += "/key/" + util.urlsafeBase64Encode(key); + } + if (putExtra.mimeType) { + requestURI += "/mimeType/" + util.urlsafeBase64Encode(putExtra.mimeType); + } + if (putExtra.fname) { + requestURI += "/fname/" + util.urlsafeBase64Encode(putExtra.fname); + } + if (putExtra.params) { + //putExtra params + for (var k in putExtra.params) { + if (k.startsWith("x:") && putExtra.params[k]) { + requestURI += "/" + k + "/" + util.urlsafeBase64Encode(putExtra.params[ + k].toString()); + } + } + } + var auth = 'UpToken ' + uploadToken; + var headers = { + 'Authorization': auth, + 'Content-Type': 'application/octet-stream' + } + var postBody = ctxList.join(","); + rpc.post(requestURI, postBody, headers, callbackFunc); +} + +ResumeUploader.prototype.putFile = function(uploadToken, key, localFile, + putExtra, callbackFunc) { + putExtra = putExtra || new PutExtra(); + var rsStream = fs.createReadStream(localFile); + var rsStreamLen = fs.statSync(localFile).size; + if (!putExtra.mimeType) { + putExtra.mimeType = mime.lookup(localFile); + } + + if (!putExtra.fname) { + putExtra.fname = path.basename(localFile); + } + + return this.putStream(uploadToken, key, rsStream, rsStreamLen, putExtra, + callbackFunc); +} + +ResumeUploader.prototype.putFileWithoutKey = function(uploadToken, localFile, + putExtra, callbackFunc) { + return this.putFile(uploadToken, null, localFile, putExtra, callbackFunc); +} diff --git a/qiniu/rpc.js b/qiniu/rpc.js index c9b2fe67..76c6578c 100644 --- a/qiniu/rpc.js +++ b/qiniu/rpc.js @@ -2,6 +2,7 @@ var urllib = require('urllib'); var util = require('./util'); var conf = require('./conf'); +exports.post = post; exports.postMultipart = postMultipart; exports.postWithForm = postWithForm; exports.postWithoutForm = postWithoutForm; @@ -31,14 +32,18 @@ function postWithoutForm(requestURI, token, callbackFunc) { } function post(requestURI, requestForm, headers, callbackFunc) { + //var start = parseInt(Date.now() / 1000); headers = headers || {}; headers['User-Agent'] = headers['User-Agent'] || conf.USER_AGENT; + headers['Connection'] = 'keep-alive'; var data = { headers: headers, method: 'POST', dataType: 'json', timeout: conf.RPC_TIMEOUT, + gzip: true, + // timing: true, }; if (Buffer.isBuffer(requestForm) || typeof requestForm === 'string') { @@ -51,6 +56,15 @@ function post(requestURI, requestForm, headers, callbackFunc) { var req = urllib.request(requestURI, data, function(respErr, respBody, respInfo) { + //var end = parseInt(Date.now() / 1000); + // console.log((end - start) + " seconds"); + // console.log("queuing:\t" + respInfo.timing.queuing); + // console.log("dnslookup:\t" + respInfo.timing.dnslookup); + // console.log("connected:\t" + respInfo.timing.connected); + // console.log("requestSent:\t" + respInfo.timing.requestSent); + // console.log("waiting:\t" + respInfo.timing.waiting); + // console.log("contentDownload:\t" + respInfo.timing.contentDownload); + callbackFunc(respErr, respBody, respInfo); }); diff --git a/test/io.test.js b/test/form_io.test.js similarity index 82% rename from test/io.test.js rename to test/form_io.test.js index fb5c9343..8c5e0432 100644 --- a/test/io.test.js +++ b/test/form_io.test.js @@ -68,7 +68,9 @@ describe('test form io', function() { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); @@ -84,7 +86,9 @@ describe('test form io', function() { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); @@ -99,7 +103,9 @@ describe('test form io', function() { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); @@ -107,14 +113,16 @@ describe('test form io', function() { describe('test form io#putWithoutKey', function() { it('test form io#putWithoutKey', function(done) { - var key = null; - formUploader.put(uploadToken, key, "hello world", putExtra, + formUploader.putWithoutKey(uploadToken, "hello world", + putExtra, function(respErr, respBody, respInfo) { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); @@ -123,14 +131,16 @@ describe('test form io', function() { describe('test form io#putFile', function() { it('test form io#putFile', function(done) { var key = 'io_putFile_test' + Math.random(1000); - formUploader.put(uploadToken, key, imageFile, putExtra, + formUploader.putFile(uploadToken, key, imageFile, putExtra, function( respErr, respBody, respInfo) { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); @@ -138,15 +148,17 @@ describe('test form io', function() { describe('test form io#putFileWithoutKey', function() { it('test form io#putFileWithoutKey', function(done) { - var key = null; - formUploader.put(uploadToken, key, imageFile, putExtra, + formUploader.putFileWithoutKey(uploadToken, imageFile, + putExtra, function( respErr, respBody, respInfo) { //console.log(respBody); should.not.exist(respErr); respBody.should.have.keys('key', 'hash'); - keysToDelete.push(respBody.key); + if (!keysToDelete.includes(respBody.key)) { + keysToDelete.push(respBody.key); + } done(); }); }); diff --git a/test/resume_io.test.js b/test/resume_io.test.js new file mode 100644 index 00000000..8cdf978d --- /dev/null +++ b/test/resume_io.test.js @@ -0,0 +1,92 @@ +const path = require('path'); +const should = require('should'); +const assert = require('assert'); +const qiniu = require("../index.js"); +const proc = require("process"); +const fs = require("fs"); + +before(function(done) { + if (!process.env.QINIU_ACCESS_KEY || !process.env.QINIU_SECRET_KEY || ! + process.env.QINIU_TEST_BUCKET || !process.env.QINIU_TEST_DOMAIN) { + console.log('should run command `source test-env.sh` first\n'); + process.exit(0); + } + done(); +}); + + +//file to upload +var imageFile = path.join(__dirname, 'logo.png'); + +describe('test resume io', function() { + var accessKey = proc.env.QINIU_ACCESS_KEY; + var secretKey = proc.env.QINIU_SECRET_KEY; + var bucket = proc.env.QINIU_TEST_BUCKET; + var mac = new qiniu.auth.digest.Mac(accessKey, secretKey); + var config = new qiniu.conf.Config(); + //config.useHttpsDomain = true; + config.zone = qiniu.zone.Zone_z0; + var bucketManager = new qiniu.rs.BucketManager(mac, config); + + //delete all the files uploaded + var keysToDelete = []; + + after(function(done) { + var deleteOps = []; + keysToDelete.forEach(function(key) { + deleteOps.push(qiniu.rs.deleteOp(bucket, key)); + }); + + bucketManager.batch(deleteOps, function(respErr, respBody, respInfo) { + //console.log(respBody); + respBody.forEach(function(ret) { + ret.should.eql({ + code: 200 + }); + }); + done(); + }); + }); + + var options = { + scope: bucket, + } + var putPolicy = new qiniu.rs.PutPolicy(options); + var uploadToken = putPolicy.uploadToken(mac); + var config = new qiniu.conf.Config(); + config.zone = qiniu.zone.Zone_z0; + var resumeUploader = new qiniu.resume_io.ResumeUploader(config); + var putExtra = new qiniu.form_io.PutExtra(); + + describe('test resume io#putFileWithoutKey', function() { + it('test resume io#putFileWithoutKey', function(done) { + resumeUploader.putFileWithoutKey(uploadToken, imageFile, + putExtra, + function( + respErr, + respBody, respInfo) { + //console.log(respBody); + should.not.exist(respErr); + respBody.should.have.keys('key', 'hash'); + keysToDelete.push(respBody.key); + done(); + }); + }); + }); + + describe('test resume io#putFile', function() { + it('test resume io#putFile', function(done) { + var key = 'io_putFile_test' + Math.random(1000); + resumeUploader.putFile(uploadToken, key, imageFile, putExtra, + function( + respErr, + respBody, respInfo) { + //console.log(respBody); + should.not.exist(respErr); + respBody.should.have.keys('key', 'hash'); + keysToDelete.push(respBody.key); + done(); + }); + }); + }); +});