Permalink
Browse files

initial

  • Loading branch information...
0 parents commit f4ac821297ec5ac9cadec9a80851372c21e5faf4 @dominictarr dominictarr committed Jul 25, 2012
Showing with 371 additions and 0 deletions.
  1. +3 −0 .gitignore
  2. +15 −0 LICENSE.APACHE2
  3. +24 −0 LICENSE.MIT
  4. +3 −0 README.markdown
  5. +141 −0 index.js
  6. +11 −0 package.json
  7. +73 −0 test/index.js
  8. +22 −0 test/integrate.js
  9. +32 −0 test/integrate2.js
  10. +47 −0 timestamp.js
@@ -0,0 +1,3 @@
+node_modules
+node_modules/*
+npm_debug.log
@@ -0,0 +1,15 @@
+Apache License, Version 2.0
+
+Copyright (c) 2012 Dominic Tarr
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
@@ -0,0 +1,24 @@
+The MIT License
+
+Copyright (c) 2012 Dominic Tarr
+
+Permission is hereby granted, free of charge,
+to any person obtaining a copy of this software and
+associated documentation files (the "Software"), to
+deal in the Software without restriction, including
+without limitation the rights to use, copy, modify,
+merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom
+the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
+ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,3 @@
+# scuttlebutt
+
+Oh, it looks like I've used some wacky name, but this is science here.
141 index.js
@@ -0,0 +1,141 @@
+//really simple data replication.
+
+var EventEmitter = require('events').EventEmitter
+var i = require('iterate')
+var timestamp = require('./timestamp')
+var duplex = require('duplex')
+
+
+function createID () {
+ return i.map(3, function (i) {
+ return Math.random().toString(16).substring(2).toUpperCase()
+ }).join('')
+}
+
+module.exports = Model
+
+Model.createID = createID
+Model.timestamp = timestamp
+
+function Model (id) {
+ var emitter = new EventEmitter()
+
+ emitter.store = {}
+ emitter.timestamps = {}
+ emitter.sources = {}
+ emitter.id = id = id || createID()
+ emitter.set = function (k, v) {
+ emitter._update(k, v, id, timestamp())
+ }
+ emitter.get = function (k) {
+ if(emitter.store[k])
+ return emitter.store[k][1]
+ }
+ emitter._update = function (key, value, source, ts) {
+ var cur = emitter.timestamps[key]
+ var latest = emitter.sources[source]
+ var update = [].slice.call(arguments)
+ console.log('UPDATE', emitter.id, update)
+ //if this message is older for it's source,
+ //ignore it. it's out of order.
+ //each node must emit it's changes in order!
+ if(latest && latest >= ts)
+ return emitter.emit('old_data', update), false
+
+ emitter.sources[source] = ts
+
+ //check if this message is older than
+ //the value we already have.
+ //do nothing if so
+ //emit an 'old-data' event because i'll want to track how many
+ //unnecessary messages are sent.
+ if(cur && cur > ts)
+ return emitter.emit('old-data', [key, value, source, ts]), false
+
+ emitter.store[key] = update
+ //key, value,
+ emitter.emit('data', update)
+ emitter.emit('update', key, value, source, ts)
+
+ return true
+
+ }
+
+ function validate (data) {
+ //must be an 4 element array
+ //string, *, string, number
+ //log a message and ignore if invalid.
+ function error () {
+ console.error('invalid update', data)
+ }
+ var key = data[0], source = data[2], ts = data[3]
+ /*console.log(!Array.isArray(data)
+ , data.length !== 4
+ , 'string' !== typeof key
+ , 'string' !== typeof source
+ , 'number' !== typeof ts
+ )*/
+ if( !Array.isArray(data)
+ || data.length !== 4
+ || 'string' !== typeof key
+ || 'string' !== typeof source
+ || 'number' !== typeof ts
+ )
+ return error(), false
+
+ return true
+ }
+
+ emitter.createStream = function () {
+ //the sources for the remote end.
+ var sources = {}
+ var d = duplex()
+ .on('write', function (data) {
+ //if it's an array, it's an update.
+ //if it's an object, it's a scuttlebut digest.
+ if(Array.isArray(data) && validate(data))
+ return emitter._update.apply(emitter, data)
+ if('object' === typeof data && data) {
+ //when the digest is recieved from the other end,
+ //send the histroy.
+ //merge with the current list of sources.
+
+ sources = data
+ i.each(emitter.histroy(data), d.emitData.bind(d))
+
+ this.emit('sync')
+ }
+ }).on('ended', function () { d.emitEnd() })
+ .on('close', function () {
+ emitter.removeListener('update', onUpdate)
+ })
+
+ function onUpdate (key, value, source, ts) {
+ if(sources[source] && sources[source] >= ts)
+ return //the other end has already seen this message.
+ d.emit('data', [key, value, source, ts])
+ //update source
+ sources[source] = ts
+ }
+ d.emitData(emitter.sources)
+
+ emitter.on('update', onUpdate)
+ return d
+ }
+
+ emitter.filter = function (e, filter) {
+ var source = e[2]
+ var ts = e[3]
+ return (!filter || !filter[source] || filter[source] < ts) }
+
+ emitter.histroy = function (filter) {
+ var h = []
+ i.each(emitter.store, function (e) {
+ if(emitter.filter(e, filter))
+ h.push(e)
+ })
+ return h
+ }
+
+ return emitter
+}
@@ -0,0 +1,11 @@
+{
+ "author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)",
+ "name": "scuttlebutt",
+ "description": "replicate data via scuttlebutt protocol",
+ "version": "0.0.0",
+ "homepage": "https://github.com/dominictarr/scuttlebutt",
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/dominictarr/scuttlebutt.git"
+ },
+}
@@ -0,0 +1,73 @@
+
+var gossip = require('..')
+var i = require('iterate')
+var assert = require('assert')
+
+function test(name, test) {
+ console.log('#', name)
+ test(gossip())
+}
+
+test('updates appear in histroy', function (g) {
+ var key = 'key'
+ var value = Math.random()
+ var source = gossip.createID()
+ var ts = gossip.timestamp()
+
+
+ assert.equal(g._update(key, value, source, ts)
+ , true
+ , '_update returns true to indicate update applied')
+
+ assert.equal(g.get(key), value)
+
+ assert.deepEqual(g.histroy(), [['key', value, source, ts]])
+
+ var value2 = Math.random()
+ //older timestamps are not appled.
+ assert.equal(g._update(key, value2, source, ts - 1)
+ , false
+ , '_update returns false to indicate update did not apply')
+
+ //the second update was older, so must not be in the histroy
+ assert.deepEqual(g.histroy(), [['key', value, source, ts]])
+
+ assert.equal(g.get(key), value)
+})
+
+test('can filter histroy with {sources: timestamps}', function (g) {
+ var A = gossip.createID()
+ var B = gossip.createID()
+ var C = gossip.createID()
+ var ts = gossip.timestamp()
+ g._update('A', 'aaa', A, ts)
+ g._update('B', 'bbb', B, ts)
+ g._update('C', 'ccc', C, ts)
+
+ //filter should only return timestamps that are after
+ //the given timestamps.
+ var filter = {}
+ filter[A] = ts
+ filter[B] = ts
+ filter[C] = ts
+
+ assert.deepEqual(
+ g.histroy(filter)
+ , [])
+
+ filter[B] = ts - 1
+
+ assert.deepEqual(
+ g.histroy(filter)
+ , [['B', 'bbb', B, ts]])
+
+ //if an item is not available, it
+
+ filter[C] = null
+ assert.deepEqual(
+ g.histroy(filter)
+ , [ ['B', 'bbb', B, ts]
+ , ['C', 'ccc', C, ts]])
+
+})
+
@@ -0,0 +1,22 @@
+var gossip = require('..')
+var assert = require('assert')
+
+var g1 = gossip()
+var g2 = gossip()
+var s1, s2
+(s1 = g1.createStream())
+ .pipe(s2 = g2.createStream()).pipe(s1)
+
+s1.on('data', console.log)
+
+//I like to have streams that work sync.
+//if you can do that, you know it's tight.
+s1.resume()
+s2.resume()
+
+var value = Math.random()
+
+g1.set('key', value)
+
+assert.equal(g2.get('key'), g1.get('key'))
+
@@ -0,0 +1,32 @@
+var gossip = require('..')
+var assert = require('assert')
+
+var g1 = gossip()
+var g2 = gossip()
+var g3 = gossip()
+
+function sync(g, h) {
+ var s = g.createStream()
+ var r = h.createStream()
+ g.on('old_data', function (d) {
+ console.log('old_data', d)
+ })
+ s.pipe(r).pipe(s)
+ s.resume()
+ r.resume()
+}
+
+sync(g1, g2)
+sync(g2, g3)
+sync(g3, g1)
+
+//I like to have streams that work sync.
+//if you can do that, you know it's tight.
+
+var value = Math.random()
+
+g1.set('key', value)
+
+assert.equal(g3.get('key'), g1.get('key'))
+assert.equal(g2.get('key'), g1.get('key'))
+
@@ -0,0 +1,47 @@
+/*
+this is just copied from crdt
+
+this is NOT a wholly accurate representation of the time.
+since js only measures time as ms, if you call Date.now()
+twice quickly, it's possible to get two identical time stamps.
+
+subsequent calls to timestamp() are ALWAYS strictly ordered.
+
+which is the important part.
+
+maybe call this something other than timestamp?
+
+what about 'close-enough' since that's what it is.
+
+also, it may be a very good idea to add something to syncronize
+network time.
+
+I'm guessing you ping your time stamps back in time, and make the minimal adjustment so that all messages are measured to
+arrive on one machine after the time they claim to make left the other machine.
+
+will need to spin up a cluster to test this.
+*/
+
+module.exports = timestamp
+
+var _last = 0
+var _count = 1
+var LAST
+function timestamp () {
+ var t = Date.now()
+ var _t = t
+ if(_last == t) {
+// while(_last == _t)
+ _t += ((_count++)/1000)
+ }
+ else _count = 1
+
+ _last = t
+
+ if(_t === LAST)
+ throw new Error('LAST:' + LAST + ',' + _t)
+ LAST = _t
+ return _t
+}
+
+

0 comments on commit f4ac821

Please sign in to comment.