Permalink
Browse files

Merge pull request #123 from nearbuy/micro_time

Use microsecond precision time for insert/remove
  • Loading branch information...
devdazed committed Oct 25, 2013
2 parents b05fe24 + 21230b7 commit d43538a81a77c5eb15fcb04a918819bda91cf343
Showing with 139 additions and 14 deletions.
  1. +18 −6 lib/column.js
  2. +14 −6 lib/column_family.js
  3. +2 −2 lib/row.js
  4. +12 −0 lib/time.js
  5. +3 −0 package.json
  6. +90 −0 test/thrift.js
View
@@ -1,11 +1,12 @@
var util = require('util'),
- ttypes = require('./cassandra/cassandra_types');
+ ttypes = require('./cassandra/cassandra_types'),
+ time = require('./time');
/**
* Cassandra Column object representation
* @param {Object} name The name of the column, can be any type, for composites use Array
* @param {Object} value The value of the column
- * @param {Date} timestamp The timestamp of the value
+ * @param {Date} timestamp The timestamp of the value, can be Date or time in microseconds
* @param {Number} ttl The ttl for the column
* @constructor
*/
@@ -20,11 +21,22 @@ var Column = function(name, value, timestamp, ttl){
*/
this.value = value;
+ if (timestamp == null) {
+ /**
+ * The timestamp in microseconds of the value
+ * @default {Number} current time in microseconds
+ */
+ this.timestamp_micro = time.microtime();
+ } else if (this.timestamp instanceof Date) {
+ this.timestamp_micro = timestamp.getTime() * 1000;
+ } else {
+ this.timestamp_micro = timestamp;
+ }
/**
* The timestamp of the value
- * @default {Date} new Date();
+ * @default {Date} current time with ms precision
*/
- this.timestamp = timestamp || new Date();
+ this.timestamp = new Date(this.timestamp_micro / 1000);
/**
* The ttl for the column
@@ -42,8 +54,8 @@ Column.prototype.toThrift = function(nameMarshaller, valueMarshaller){
return new ttypes.Column({
name: nameMarshaller.serialize(this.name),
value: valueMarshaller.serialize(this.value),
- timestamp: this.timestamp.getTime() * 1000,
- ttl:this.ttl
+ timestamp: this.timestamp_micro,
+ ttl: this.ttl
});
};
View
@@ -3,7 +3,8 @@ var util = require('util'),
Column = require('./column'),
CounterColumn = require('./counter_column'),
Row = require('./row'),
- ttype = require('./cassandra/cassandra_types');
+ ttype = require('./cassandra/cassandra_types'),
+ time = require('./time');
/**
* NO-Operation for deault callbacks
@@ -120,8 +121,7 @@ function getColumns(columns, options, type){
type = Column;
}
- var keys = Object.keys(columns), len = keys.length, i = 0, key, value, arr = [],
- ts = new Date();
+ var keys = Object.keys(columns), len = keys.length, i = 0, key, value, arr = [];
for(; i < len; i += 1){
key = keys[i];
@@ -131,7 +131,7 @@ function getColumns(columns, options, type){
value = '';
}
- arr.push(new type(key, value, ts, options.ttl));
+ arr.push(new type(key, value, null, options.ttl));
}
return arr;
}
@@ -284,10 +284,18 @@ ColumnFamily.prototype.remove = function() {
var self = this;
var marshalledKey = this.keyMarshaller.serialize(args.key).toString('binary'),
path = columnPath(self, args.column, args.subcolumn),
- timestamp = args.options.timestamp || new Date(),
consistency = args.options.consistency || args.options.consistencyLevel || DEFAULT_WRITE_CONSISTENCY;
- this.connection.execute('remove', marshalledKey, path, timestamp * 1000, consistency, args.callback);
+ var timestamp;
+ if (args.options.timestamp == null) {
+ timestamp = time.microtime();
+ } else if (args.options.timestamp instanceof Date) {
+ timestamp = args.options.timestamp.getTime() * 1000;
+ } else {
+ timestamp = args.options.timestamp;
+ }
+
+ this.connection.execute('remove', marshalledKey, path, timestamp, consistency, args.callback);
};
/**
View
@@ -45,9 +45,9 @@ var Row = function(data, schema){
// Only deserialize if specified
if(schema.noDeserialize) {
- this.push(new Column(name,item.value, new Date(item.timestamp / 1000), item.ttl));
+ this.push(new Column(name,item.value, item.timestamp, item.ttl));
} else {
- this.push(new Column(name,deserializeValue(item.value), new Date(item.timestamp / 1000), item.ttl));
+ this.push(new Column(name,deserializeValue(item.value), item.timestamp, item.ttl));
}
this._map[name] = this.length - 1;
View
@@ -0,0 +1,12 @@
+var time = {};
+
+try {
+ var microtime = require('microtime');
+ time.microtime = microtime.now;
+} catch (e) {
+ time.microtime = function() {
+ return Date.now() * 1000;
+ };
+}
+
+module.exports = time;
View
@@ -16,6 +16,9 @@
"helenus-thrift": "0.7.3"
, "node-uuid": "1.3.3"
}
+ , "optionalDependencies": {
+ "microtime": "0.4.0"
+ }
, "devDependencies": {
"whiskey": "git://github.com/cloudkick/whiskey.git#b3c5bc23e30c95e46083bc7628c2557c1c15ec95"
, "JSDoc": "git://github.com/jsdoc3/jsdoc.git"
View
@@ -4,6 +4,12 @@ var config = require('./helpers/thrift'),
Helenus, conn, ks, cf_standard, row_standard, cf_composite, cf_counter,
cf_reversed, cf_composite_nested_reversed;
+var has_microtime = false;
+try {
+ require('microtime');
+ has_microtime = true;
+} catch(e) { }
+
module.exports = {
'setUp':function(test, assert){
Helenus = require('helenus');
@@ -206,6 +212,48 @@ module.exports = {
});
},
+ 'test cf.insert default microsecond timestamp':function(test, assert){
+ if (!has_microtime) {
+ test.finish();
+ return;
+ }
+
+ //try to tease out same-ms collision with 50 attempts
+ var finished = 0, ok = true;
+ var callback = function() {
+ finished++;
+ if (finished % 2 == 0) {
+ cf_standard.get(config.standard_row_key, function(err, row){
+ assert.ifError(err);
+
+ ok = ok && (row.get("one").value === "a");
+
+ if (finished == 100) {
+ assert.ok(ok);
+ assert.ifError(err);
+ test.finish();
+ } else {
+ setTimeout(try_race, 0);
+ }
+ });
+ }
+ };
+
+ var try_race = function() {
+ cf_standard.insert(config.standard_row_key, {"one": "b"}, function(err, results){
+ assert.ifError(err);
+ callback();
+ });
+
+ cf_standard.insert(config.standard_row_key, {"one": "a"}, function(err, results){
+ assert.ifError(err);
+ callback();
+ });
+ };
+
+ try_race();
+ },
+
'test counter cf.incr':function(test, assert){
var key = 'åbcd';
@@ -662,6 +710,48 @@ module.exports = {
});
},
+ 'test standard cf remove default microsecond timestamp':function(test, assert) {
+ if (!has_microtime) {
+ test.finish();
+ return;
+ }
+
+ //try to tease out same-ms collision with 50 attempts
+ var finished = 0, ok = true;
+ var callback = function() {
+ finished++;
+ if (finished % 2 == 0) {
+ cf_standard.get(config.standard_row_key, function(err, row){
+ assert.ifError(err);
+
+ ok = ok && (row.count === 3);
+
+ if (finished == 100) {
+ assert.ok(ok);
+ assert.ifError(err);
+ test.finish();
+ } else {
+ setTimeout(try_race, 0);
+ }
+ });
+ }
+ };
+
+ var try_race = function() {
+ cf_standard.insert(config.standard_row_key, {"one": "a"}, function(err, results){
+ assert.ifError(err);
+ callback();
+ });
+
+ cf_standard.remove(config.standard_row_key, "one", function(err, results){
+ assert.ifError(err);
+ callback();
+ });
+ };
+
+ try_race();
+ },
+
'test composite cf remove column':function(test, assert) {
var key = [ 'åbcd', new Helenus.UUID('e491d6ac-b124-4795-9ab3-c8a0cf92615c') ],
col = [12345678912345, new Date(1326400762701)];

0 comments on commit d43538a

Please sign in to comment.