Skip to content
Browse files

had to internalize dedup to be able to use it for master disconnect n…

…otification but it made it simpler too
  • Loading branch information...
1 parent 5965190 commit d358a457ded08c866156db6197ddbce1148bfd3a @shimondoodkin committed Jun 30, 2014
Showing with 89 additions and 35 deletions.
  1. +6 −6 README.md
  2. +15 −21 example.js
  3. +68 −8 index.js
View
12 README.md
@@ -1,28 +1,28 @@
-#Node.js P2P Dist Components
+##Node.js p2p auto discovery distributed components
this is an attempt to create reliable data processing and insertion application where one node can replace another at runtime.
uses telepathine for p2p auto discovery and zmq for fast communication between nodes.
-##the code is quite alpha but working.
+###the code is quite alpha but working.
it is written as quick solution using globals (because it faster and easier to develop and easy for debugging using repl) and no tests. but it seems working anyway.
if you like you can rewrite this without globals. i will be happy to merge your pull request.
read the example and the code to get to know how it works.
-##it has a an idea how to have multiple announcers:
+###it has a an idea how to have multiple announcers:
send hash + the data.
the receive function only processes unique values by hash
some times data is semi-unique like it has a more or less a same timestamp.
so you have a data for hashing that is without this timestamp. or with a lower resolion time stamp to reduce the error of using same data from two anoucers.
-##it has an idea how to make sure only one component of same type is a master so only one does inserts.
+###it has an idea how to make sure only one component of same type is a master so only one does inserts.
the solution is simple. the last component that says that it is the master, it is the master.
some times like in 3 nodes scenario , master disconnects and the other two set a shared key that they are the master
the one that catches is the master.
-##telepathine synchronizes all changes from all peers to shared keys once in 2.5 seconds.
+###telepathine synchronizes all changes from all peers to shared keys once in 2.5 seconds.
-## todo
+### todo
a planed master switch. for exect switching: when peer says bye ,for each component if a master. it says to others after what number of uniques from what hash i stop inserting and you start. also maybe to emmit empty values every second and ignore them in dedup.
View
36 example.js
@@ -31,15 +31,15 @@ example_announcer_start=function()
var component_description=zmq_new_component_description('example_announcer');
//no inputs
//just clients that connect to servers that want our information
+
+
var sendclients_dbinserter=component_description.sendclients
- var clientid=Math.round(Math.random()*1000)+1;
- var dedupsendstate1= {prev_send:null,count_send:0};
// put here some event emmiter subscription code,
// that on('data') does:
- // sendclients_dbinserter( zmqdedupsend(data,dedupsendstate1,clientid) ) //clientid is not required (you can put empty string there) is used to know witch client passed the dedupreceive
+ // component_description.sendclients( data )
// the kind of simple emiter of data to insert
setInterval(function(){/// T
@@ -49,7 +49,7 @@ example_announcer_start=function()
n=Math.floor(n/3);
var messagetype='time';
var data=n;
- sendclients_dbinserter( zmqdedupsend([messagetype,data],dedupsendstate1,clientid) )
+ sendclients_dbinserter([messagetype,data])
},3000);
@@ -63,17 +63,12 @@ example_announcer_start=function()
example_dbinserter_start=function()
{
var component_description=zmq_new_component_description('example_dbinserter');
-
-
//accept example_announcer:
-
- var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
- var zmqs_example_log=component_description.addportfor('example_announcer',function(str)
+ var zmqs_example_log=component_description.addportfor('example_announcer',function(re)
{
- var re=dedupreceive(str.toString(),receivestate1);
if(re!==undefined)
{
- if(!component_description.checkisportmaster(zmqs_example_log,'example_announcer')) { return;}
+ if(!component_description.checkisportmaster(zmqs_example_log)) { return;}
//var x=flat.flat(re);x['app']=appid;
//console.log('dbinsert');
//console.log('dbinsert','public.example_log',x);
@@ -82,13 +77,11 @@ example_dbinserter_start=function()
}
});
- var receivestate2={emitedh:[],emitedt:[],emitedd:[]}
- var zmqs_insert=component_description.addportfor('example_processor',function(str)
+ var zmqs_insert=component_description.addportfor('example_processor',function(re)
{
- var re=dedupreceive(str.toString(),receivestate2);
if(re!==undefined)
{
- if(!component_description.checkisportmaster(zmqs_insert,'example_processor')) { return;}
+ if(!component_description.checkisportmaster(zmqs_insert)) { return;}
//var x=flat.flat(re);x['app']=appid;
//console.log('dbinsert2');
console.log('dbinsert2',re[0],re[1]);
@@ -110,22 +103,23 @@ example_processor_start=function()
{
var component_description=zmq_new_component_description('example_processor');
+ //there is
+ //zmqs_example_log.sendclients(datatosend,datatohash=undefined)
+
var sendclients_dbinserter=component_description.sendclients;
var clientid=Math.round(Math.random()*1000)+1;
- var dedupsendstate1= {prev_send:null,count_send:0};
-
+
function ex_dbinsert(t,d)
{
- sendclients_dbinserter( zmqdedupsend([t,d],dedupsendstate1,clientid) )
+ sendclients_dbinserter([t,d])
//sendclients_dbinserter([t,d])
}
- var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
- var zmqs_example_log=component_description.addportfor('example_announcer',function(str)
+
+ var zmqs_example_log=component_description.addportfor('example_announcer',function(re)
{
console.log('example processor: received data');
- var re=dedupreceive(str.toString(),receivestate1);
if(re!==undefined)
{
//console.log('example processor: data=',re);
View
76 index.js
@@ -1,6 +1,6 @@
/*
*
- * Node.js P2P Dist Components
+ * Node.js P2P Distributed Components
* License 2 Close BSD: Copyright to Shimon Doodkin helpmepro1@gmail.com
*
*/
@@ -65,7 +65,7 @@ zmqdedupsend=function(data,state,clientid,hashdata)// var state= {prev_send:null
hashstring=JSON.stringify(hashdata);
d=JSON.stringify(data);
}
- else if(data&&data.a1_otimestamp) //my case
+/* else if(data&&data.a1_otimestamp) //my case
{
var dataclone=JSON.parse(d=JSON.stringify(data))
dataclone.a1_otimestamp=Math.round(dataclone.a1_otimestamp/30000);
@@ -77,6 +77,7 @@ zmqdedupsend=function(data,state,clientid,hashdata)// var state= {prev_send:null
dataclone.timestamp=Math.round(dataclone.timestamp/30000);
hashstring=JSON.stringify(dataclone);
}
+ */
else // no relativly simular timestamps
{
d=hashstring=JSON.stringify(data);
@@ -405,7 +406,12 @@ zmq_new_component_description=function (component_name)
//listening_zmq1,listening_zmq2
];
+ var clientid=Math.round(Math.random()*1000)+1;
+ var dedupsendstate1= {prev_send:null,count_send:0};
+
Object.defineProperty(component_description, "clients", { value : clients } );
+ Object.defineProperty(component_description, "dedupsendstate", { value : dedupsendstate1 } );
+ Object.defineProperty(component_description, "clientid", { value : clientid } );//clientid is not required (you can put empty string there) is used to know witch client passed the dedupreceive
Object.defineProperty(component_description, "clients_all", { value : clients_all } );
Object.defineProperty(component_description, "clients_bypeer", { value : clients_bypeer } );
Object.defineProperty(component_description, "servers", { value : servers} );
@@ -417,23 +423,35 @@ zmq_new_component_description=function (component_name)
Object.defineProperty(component_description, "addportfor", { value : function(n,f){return addportfor(n,f)} }); //, enumerable:false is default
Object.defineProperty(component_description, "checkisportmaster", { value : function(n,f){return checkisportmaster(n,f)} }); //, enumerable:false is default
Object.defineProperty(component_description, "setportmaster", { value : function(n,f){return setportmaster(n,f)} }); //, enumerable:false is default
+ Object.defineProperty(component_description, "setportmasterfrom", { value : function(n,f){return setportmasterfrom(n,f)} }); //, enumerable:false is default
+ Object.defineProperty(component_description, "getportmasterfrom", { value : function(p){return getportmasterfrom(p)} }); //, enumerable:false is default
+
function addportfor(other_component_name,onmessage)
{
//accept bitstamp_announcer:
var dbinserterport = 'tcp://*:0';
var zmqs_bitstamp_log=zmqlisten(dbinserterport,component_description.name+' for '+other_component_name);
+ var receivestate1={emitedh:[],emitedt:[],emitedd:[]}
+ zmqs_bitstamp_log.other_component_name=other_component_name;
+ zmqs_bitstamp_log.dedup_receivestate=receivestate1;
servers.push(zmqs_bitstamp_log);
component_description.inputs[other_component_name]={'zmqport':zmqs_bitstamp_log.last_endpoint,externalip:zmq_getExternalIp()} // bitstamp_announcer can connect to this port
- if(onmessage)zmqs_bitstamp_log.on("message", onmessage); //function onmessage(str){}
+ if(!clients[other_component_name]){clients[other_component_name]=[];} var other_clients=clients[other_component_name];
+ zmqs_bitstamp_log.sendclients=function(datatosend,datatohash){return sendclients(datatosend,datatohash,other_clients)};
+ if(onmessage)zmqs_bitstamp_log.on("message", function(str){
+ var re=dedupreceive(str.toString(),receivestate1);
+ return onmessage(re);
+ }); //function onmessage(str){}
return zmqs_bitstamp_log;
- }
+ }
- function sendclients(data,selectedclients)
+ function sendclients(datatosend,datatohash,selectedclients)
{
+ var data=zmqdedupsend(datatosend,dedupsendstate1,clientid,datatohash);//sends repeated data as diferent hashes, to be able to send same data multiple times despite the de duplication
//console.log('announcer: clients_all.len',clients_all.length);
//var ct=clients.bitstamp_dbinserter;//no search fast local access
- var ct=selectedclients!==undefined?selectedclients:clients_all;
+ var ct=selectedclients!==undefined?selectedclients:clients_all;
for(var i=0;i<ct.length;i++)
{
console.log(component_description.name+': sending data to',ct[i].last_endpoint);
@@ -506,11 +524,40 @@ zmq_new_component_description=function (component_name)
var x=ct.splice(i,1)[0];
console.log('remove from clients_bypeer['+peer_name+']: ',x.last_endpoint);
c.close();
+ }
+ }
+
+ function futureclose()
+ {
+ for(var zmq1s,i=0;i<servers.length;i++)
+ {
+ zmqs1=servers[i]
+ if ( checkisportmaster(zmqs1) )
+ {
+ setportmasterfrom(zmq1,zmqs1.dedup_state.lasthash)
+ }
}
}
+ function havemasters()
+ {
+ // this function used: after futureclose if !havemasters() than can exit -> do exit;
+ // go over all zmq listening ports
+ for(var zmqs1,i=0;i<servers.length;i++)
+ {
+ zmqs1=servers[i]
+ if ( checkisportmaster(zmqs1) )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
function close()
{
+
+
//close all clients
//console.log('announcer: clients_all.len',clients_all.length);
@@ -595,8 +642,9 @@ zmq_new_component_description=function (component_name)
//})
}
- function checkisportmaster(zmqs1,other_component_description_name)
+ function checkisportmaster(zmqs1/*,other_component_description_name*/)
{
+ var other_component_description_name=zmqs1.other_component_name;
var masterpeername= zmq_telepathine.get('last_component.'+component_description.name+'.'+other_component_description_name+'.peer');
if(!masterpeername)return false;
if(!( zmq_telepathine.peers[ masterpeername ].alive || masterpeername==zmq_telepathine_self.peer_name ) )
@@ -608,12 +656,24 @@ zmq_new_component_description=function (component_name)
return true;
}
- function setportmaster(zmqs1,other_component_description_name)
+ function setportmaster(zmqs1/*,other_component_description_name*/)
{
+ var other_component_description_name=zmqs1.other_component_name;
zmq_telepathine.set('last_component.'+component_description.name+'.'+other_component_description_name+'',zmqs1.last_endpoint);
zmq_telepathine.set('last_component.'+component_description.name+'.'+other_component_description_name+'.peer',zmq_telepathine_self.peer_name);
console.log("setting:",'last_component.'+component_description.name+'.'+other_component_description_name+'.peer',zmq_telepathine_self.peer_name);
}
+ function getportmasterfrom(zmqs1)
+ {
+ return zmq_telepathine.get('last_component.'+component_description.name+'.from_hash',from_hash);
+ }
+
+ function setportmasterfrom(zmqs1,from_hash)
+ {
+ zmq_telepathine.set('last_component.'+component_description.name+'.from_hash',from_hash);
+ console.log("setting:",'last_component.'+component_description.name+'.from_hash',from_hash);
+ }
+
return component_description;
}

0 comments on commit d358a45

Please sign in to comment.
Something went wrong with that request. Please try again.