Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge pull request #403 from aleitner/master
Browse files Browse the repository at this point in the history
v3.2.0
  • Loading branch information
bookchin committed Sep 14, 2016
2 parents c54a35c + 8ae271a commit 1867930
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 40 deletions.
2 changes: 1 addition & 1 deletion bin/actions/account.js
@@ -1,10 +1,10 @@
'use strict';
var storj = require('../..');
var log = require('./../logger')().log;
var utils = require('./../utils');
var path = require('path');
var fs = require('fs');
var platform = require('os').platform();
var storj = require('../..');

var HOME = platform !== 'win32' ? process.env.HOME : process.env.USERPROFILE;
var DATADIR = path.join(HOME, '.storjcli');
Expand Down
17 changes: 17 additions & 0 deletions bin/actions/files.js
Expand Up @@ -54,6 +54,8 @@ module.exports.getInfo = function(bucketid, fileid, callback) {
return callback(file);
}
});

return callback(null);
});
};

Expand Down Expand Up @@ -89,6 +91,10 @@ module.exports.remove = function(id, fileId, env) {
module.exports.upload = function(bucket, filepath, env) {
var self = this;

if (env.concurrency > 6) {
log('warn', 'A concurrency of %s may result in issues!', env.concurrency);
}

var concurrency = env.concurrency ? parseInt(env.concurrency) : 6;

if (parseInt(env.redundancy) > 12 || parseInt(env.redundancy) < 1) {
Expand All @@ -114,11 +120,18 @@ module.exports.upload = function(bucket, filepath, env) {
var parsedFileArray = globule.find(origFilepath);
if (storj.utils.existsSync(parsedFileArray[0])) {
if (fs.statSync(parsedFileArray[0]).isFile() === true) {
try {
fs.accessSync(parsedFileArray[0], fs.R_OK);
} catch (err) {
return log('error', err.message);
}

expandedFilepaths = expandedFilepaths.concat(parsedFileArray);
}
} else {
return log('error', '%s could not be found', origFilepath);
}

callback();
}, function(err) {
if (err) {
Expand Down Expand Up @@ -328,6 +341,10 @@ module.exports.download = function(bucket, id, filepath, env) {
module.exports.getInfo.call(self, bucket, id, function(file) {
var target;

if (file === null) {
return log('error', 'file %s does not exist in bucket %s', [id, bucket]);
}

// Check if path is an existing path
if (storj.utils.existsSync(filepath) === true ) {
// Check if given path is a directory
Expand Down
17 changes: 13 additions & 4 deletions bin/storj.js
Expand Up @@ -4,7 +4,8 @@

var program = require('commander');
var fs = require('fs');
var platform = require('os').platform();
var os = require('os');
var platform = os.platform();
var path = require('path');
var prompt = require('prompt');
var colors = require('colors/safe');
Expand Down Expand Up @@ -36,6 +37,11 @@ program._storj.loglevel = function() {
};

program._storj.PrivateClient = function(options) {
if (typeof options === 'undefined') {
options = {};
}
options.blacklistFolder = DATADIR;

return storj.BridgeClient(program.url, merge({
keypair: utils.loadKeyPair(),
logger: logger(program._storj.loglevel()).log
Expand Down Expand Up @@ -178,9 +184,12 @@ program
.option('-c, --concurrency <count>', 'max shard upload concurrency')
.option('-C, --fileconcurrency <count>', 'max file upload concurrency', 1)
.option('-r, --redundancy <mirrors>', 'number of mirrors to create for file')
.description('upload a file or files to the network and track in a bucket')
.description('<filepath> can be a path with wildcard or a space separated')
.description(' list of files')
.description('upload a file or files to the network and track in a bucket.' +
'\n upload all files in a single directory using "/path/*"\n' +
' or upload recursively using "/path/**/*".\n' +
' <filepath> can be a path with wildcard or a space separated' +
' list of files.'
)
.action(actions.files.upload.bind(program));

program
Expand Down
4 changes: 2 additions & 2 deletions bin/utils.js
Expand Up @@ -3,9 +3,9 @@ var storj = require('..');
var log = require('./logger')().log;
var path = require('path');
var fs = require('fs');
var platform = require('os').platform();
var prompt = require('prompt');
var os = require('os');
var platform = os.platform();
var tmp = require('tmp');
var assert = require('assert');
var rimraf = require('rimraf');
Expand Down Expand Up @@ -46,7 +46,7 @@ module.exports.getNewPassword = function(msg, callback) {

module.exports.makeTempDir = function(callback) {
var opts = {
dir: os.tmpdir(),
dir: storj.utils.tmpdir(),
prefix: 'storj-',
// 0700.
mode: 448,
Expand Down
5 changes: 5 additions & 0 deletions doc/environment-variables.md
Expand Up @@ -32,3 +32,8 @@ testing against other bridges.
This variable will set the `--keypass` used to unlock the keyring.

Setting your password will make it so other users can't grep it with `ps -a`.

#### `STORJ_TEMP`

This variable will set the folder to which the encrypted file will be placed
when uploading a file. Shards will also be placed in this folder during upload.
83 changes: 83 additions & 0 deletions lib/bridge-client/blacklist.js
@@ -0,0 +1,83 @@
'use strict';

var assert = require('assert');
var fs = require('fs');
var utils = require('../utils');
var path = require('path');

/**
* Manage a blacklist file containing an object with key value pairs of
* nodeids: timestamp
* @constructor
* @license LGPL-3.0
* @see https://github.com/storj/bridge
* @param {String} path - blacklist folder location
*/
function Blacklist(folder) {
if (!(this instanceof Blacklist)) {
return new Blacklist(folder);
}

assert.ok(utils.existsSync(folder), 'Invalid Blacklist Folder');

this.blacklistFile = path.join(folder,'.blacklist');
this.blacklist = this._loadFromDisk();
}

Blacklist.TTL = 86400000;

/**
* Push node to blacklist
* @param {String} nodeid - Node id to be added to blacklist
*/
Blacklist.prototype.push = function(nodeid) {
this.blacklist[nodeid] = Date.now();
this._saveToDisk();
};

/**
* Save blacklist to disk
* @private
*/
Blacklist.prototype._saveToDisk = function() {
fs.writeFileSync(this.blacklistFile, JSON.stringify(this.blacklist));
};

/**
* Read blacklist from disk and Reap old nodeids
* @private
*/
Blacklist.prototype._loadFromDisk = function() {
if (!utils.existsSync(this.blacklistFile)) {
fs.writeFileSync(this.blacklistFile, JSON.stringify([]));
}

return this._reap(JSON.parse(fs.readFileSync(this.blacklistFile)));
};

/**
* Reap old nodeids from blacklist
* @private
*/
Blacklist.prototype._reap = function(blacklist) {
var now = Date.now();

for (var nodeid in blacklist) {
if ((blacklist[nodeid] - now) > Blacklist.TTL) {
delete blacklist[nodeid];
}
}

this.blacklist = blacklist;

return blacklist;
};

/**
* Return list of blacklisted nodeids
*/
Blacklist.prototype.toObject = function() {
return Object.keys(this._reap(this.blacklist));
};

module.exports = Blacklist;
22 changes: 16 additions & 6 deletions lib/bridge-client/index.js
Expand Up @@ -11,16 +11,17 @@ var AuditStream = require('../audit-tools/audit-stream');
var DataChannelClient = require('../data-channels/client');
var Contact = require('../network/contact');
var crypto = require('crypto');
var tmpdir = require('os').tmpdir();
var path = require('path');
var mime = require('mime');
var uuid = require('node-uuid');
var merge = require('merge');
var Logger = require('kad-logger-json');
var EventEmitter = require('events').EventEmitter;
var UploadState = require('./upload-state');
var Blacklist = require('./blacklist');
var stream = require('readable-stream');
var async = require('async');
var os = require('os');

/**
* Represents a client interface to a given bridge server
Expand All @@ -44,6 +45,7 @@ function BridgeClient(uri, options) {
}

this._options = this._checkOptions(uri, options);
this._blacklist = new Blacklist(this._options.blacklistFolder);
this._logger = this._options.logger;
this._transferConcurrency = this._options.concurrency;
}
Expand All @@ -57,7 +59,8 @@ BridgeClient.prototype._checkOptions = function(uri, options) {
baseURI: uri || process.env.STORJ_BRIDGE || 'https://api.storj.io',
logger: new Logger(0),
concurrency: 6,
transferRetries: 3
transferRetries: 3,
blacklistFolder: os.tmpdir()
}, options);

assert.ok(utils.validateLogger(options.logger), 'Invalid logger supplied');
Expand Down Expand Up @@ -395,7 +398,12 @@ BridgeClient.prototype.storeFileInBucket = function(id, token, file, cb) {
return cb(new Error(fileSize +' bytes is not a supported file size.'));
}

var shardSize = FileDemuxer.getOptimalShardSize(fileSize);
var shardSize = FileDemuxer.getOptimalShardSize(
{
fileSize: fileSize,
shardConcurrency: this._transferConcurrency
}
);
var uploadState = new UploadState({
id: id,
file: file,
Expand Down Expand Up @@ -451,14 +459,16 @@ BridgeClient.prototype._shardUploadWorker = function(task, done) {
* @param {UploadState} state - The upload state machine
*/
BridgeClient.prototype._handleShardStream = function(shard, i, frame, state) {
var tmpdir = utils.tmpdir();

var meta = {
frame: frame,
tmpName: path.join(tmpdir, crypto.randomBytes(6).toString('hex')),
size: 0,
index: i,
hasher: crypto.createHash('sha256'),
hash: null,
excludeFarmers: [],
excludeFarmers: this._blacklist.toObject(),
transferRetries: 0
};
var tmpFile = fs.createWriteStream(meta.tmpName);
Expand Down Expand Up @@ -518,7 +528,7 @@ BridgeClient.prototype._handleShardTmpFileFinish = function(state, meta, done) {
index: meta.index,
challenges: challenges,
tree: tree,
exclude: meta.excludeFarmers
exclude: self._blacklist.toObject()
}, function(err, pointer) {
if (state.killed) {
return done();
Expand Down Expand Up @@ -570,7 +580,7 @@ BridgeClient.prototype._startTransfer = function(pointer, state, meta, done) {
meta.transferRetries
);
transferStatus.removeAllListeners();
meta.excludeFarmers.push(pointer.farmer.nodeID);
self._blacklist.push(pointer.farmer.nodeID);
meta.transferRetries = 0;
self._handleShardTmpFileFinish(state, meta, done);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/bridge-client/upload-state.js
@@ -1,11 +1,11 @@
'use strict';

var fs = require('fs');
var async = require('async');
var utils = require('../utils');
var merge = require('merge');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var rimraf = require('rimraf');

/**
* Internal state machine used by {@link BridgeClient}
Expand Down Expand Up @@ -61,7 +61,7 @@ UploadState.prototype.cleanup = function() {

this.cleanQueue.forEach(function(tmpFilePath) {
if (utils.existsSync(tmpFilePath)) {
fs.unlinkSync(tmpFilePath);
rimraf.sync(tmpFilePath);
}
});

Expand Down
40 changes: 30 additions & 10 deletions lib/file-handling/file-demuxer.js
Expand Up @@ -7,6 +7,7 @@ var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var merge = require('merge');
var utils = require('../utils');
var os = require('os');

/**
* Takes a single file read stream and outputs several output streams, used for
Expand Down Expand Up @@ -135,24 +136,43 @@ FileDemuxer.prototype._needsNewOutputStream = function() {

/**
* Determine the optimal shard size given an arbitrary file size in bytes
* @param {Number} fileSize - The number of bytes in the given file
* @param {Object} fileInfo
* @param {Number} fileInfo.fileSize - The number of bytes in the given file
* @param {Number} fileInfo.shardConcurrency - Num of shards uploaded at once
* @param {Number} [acc=1] - Accumulator (number of recursions)
* @returns {Number} shardSize
*/
FileDemuxer.getOptimalShardSize = function(fileSize, acc) {
FileDemuxer.getOptimalShardSize = function(fileInfo, acc) {
var accumulator = typeof acc === 'undefined' ? 0 : acc;
var byteMultiple = (8 * (1024 * 1024)) * Math.pow(2, accumulator);
var check = fileSize / byteMultiple;

if (check > 0 && check < 2) {
var distance = (accumulator - FileDemuxer.SHARD_MULTIPLES_BACK) < 0 ?
0 :
accumulator - FileDemuxer.SHARD_MULTIPLES_BACK;
// Determine hops back by accumulator
var hops = (accumulator - FileDemuxer.SHARD_MULTIPLES_BACK) < 0 ?
0 :
accumulator - FileDemuxer.SHARD_MULTIPLES_BACK;

return (8 * (1024 * 1024)) * Math.pow(2, distance);
// Calculate bytemultiple shard size by hops back
var shardSize = function(hops) {
return (8 * (1024 * 1024)) * Math.pow(2, hops);
};

var byteMultiple = shardSize(accumulator);
var check = fileInfo.fileSize / byteMultiple;

// Determine if bytemultiple is highest bytemultiple that is still <= fileSize
if (check > 0 && check <= 1) {

// Certify the number of concurrency * shardSize doesn't exceed freemem
while (
hops > 0 &&
(os.freemem() / shardSize(hops) <= fileInfo.shardConcurrency)
) {
hops = hops - 1 <= 0 ? 0 : hops - 1;
}

return shardSize(hops);
}

return this.getOptimalShardSize(fileSize, ++accumulator);
return this.getOptimalShardSize(fileInfo, ++accumulator);
};

module.exports = FileDemuxer;

0 comments on commit 1867930

Please sign in to comment.