diff --git a/client.js b/client.js index 75c0af2..498b788 100644 --- a/client.js +++ b/client.js @@ -1,12 +1,10 @@ /** * this file serves as client to connect to the server raft nodes */ -const argv = require("argh").argv; const net = require("net"); -let port = +argv.port || 8081; // read the port from command line arguments -var netSocket = net.createConnection({ port: port + 1000 }, () => { - console.log("connected to server at port", port + 1000); +var netSocket = net.createConnection({ port: 6767 }, () => { + console.log("connected to server at port", 6767); }); const sendEvent = (op, data) => { diff --git a/events.json b/events.json new file mode 100644 index 0000000..60f95c7 --- /dev/null +++ b/events.json @@ -0,0 +1 @@ +[{"task":"SET","data":[{"command":{"key":"key_1","value":"1"}}]},{"task":"SET","data":[{"command":{"key":"key_2","value":"2"}}]},{"task":"SET","data":[{"command":{"key":"key_3","value":"3"}}]},{"task":"SET","data":[{"command":{"key":"key_4","value":"4"}}]},{"task":"SET","data":[{"command":{"key":"key_5","value":"5"}}]},{"task":"SET","data":[{"command":{"key":"key_6","value":"6"}}]},{"task":"SET","data":[{"command":{"key":"key_7","value":"7"}}]},{"task":"SET","data":[{"command":{"key":"key_8","value":"8"}}]},{"task":"SET","data":[{"command":{"key":"key_9","value":"9"}}]},{"task":"SET","data":[{"command":{"key":"key_10","value":"10"}}]},{"task":"SET","data":[{"command":{"key":"key_11","value":"11"}}]},{"task":"SET","data":[{"command":{"key":"key_12","value":"12"}}]},{"task":"SET","data":[{"command":{"key":"key_13","value":"13"}}]},{"task":"SET","data":[{"command":{"key":"key_14","value":"14"}}]},{"task":"SET","data":[{"command":{"key":"key_15","value":"15"}}]},{"task":"SET","data":[{"command":{"key":"key_16","value":"16"}}]},{"task":"SET","data":[{"command":{"key":"key_17","value":"17"}}]},{"task":"SET","data":[{"command":{"key":"key_18","value":"18"}}]},{"task":"SET","data":[{"command":{"key":"key_19","value":"19"}}]},{"task":"SET","data":[{"command":{"key":"key_20","value":"20"}}]},{"task":"SET","data":[{"command":{"key":"key_21","value":"21"}}]},{"task":"SET","data":[{"command":{"key":"key_22","value":"22"}}]},{"task":"SET","data":[{"command":{"key":"key_23","value":"23"}}]},{"task":"SET","data":[{"command":{"key":"key_24","value":"24"}}]},{"task":"SET","data":[{"command":{"key":"key_25","value":"25"}}]},{"task":"SET","data":[{"command":{"key":"key_26","value":"26"}}]},{"task":"SET","data":[{"command":{"key":"key_27","value":"27"}}]},{"task":"SET","data":[{"command":{"key":"key_28","value":"28"}}]},{"task":"SET","data":[{"command":{"key":"key_29","value":"29"}}]},{"task":"SET","data":[{"command":{"key":"key_30","value":"30"}}]},{"task":"SET","data":[{"command":{"key":"key_31","value":"31"}}]},{"task":"SET","data":[{"command":{"key":"key_32","value":"32"}}]},{"task":"SET","data":[{"command":{"key":"key_33","value":"33"}}]},{"task":"SET","data":[{"command":{"key":"key_34","value":"34"}}]},{"task":"SET","data":[{"command":{"key":"key_35","value":"35"}}]},{"task":"SET","data":[{"command":{"key":"key_36","value":"36"}}]},{"task":"SET","data":[{"command":{"key":"key_37","value":"37"}}]},{"task":"SET","data":[{"command":{"key":"key_38","value":"38"}}]},{"task":"SET","data":[{"command":{"key":"key_39","value":"39"}}]},{"task":"SET","data":[{"command":{"key":"key_40","value":"40"}}]},{"task":"SET","data":[{"command":{"key":"key_41","value":"41"}}]},{"task":"SET","data":[{"command":{"key":"key_42","value":"42"}}]},{"task":"SET","data":[{"command":{"key":"key_43","value":"43"}}]},{"task":"SET","data":[{"command":{"key":"key_44","value":"44"}}]},{"task":"SET","data":[{"command":{"key":"key_45","value":"45"}}]},{"task":"SET","data":[{"command":{"key":"key_46","value":"46"}}]},{"task":"SET","data":[{"command":{"key":"key_47","value":"47"}}]},{"task":"SET","data":[{"command":{"key":"key_48","value":"48"}}]},{"task":"SET","data":[{"command":{"key":"key_49","value":"49"}}]},{"task":"SET","data":[{"command":{"key":"key_50","value":"50"}}]},{"task":"SET","data":[{"command":{"key":"key_51","value":"51"}}]},{"task":"SET","data":[{"command":{"key":"key_52","value":"52"}}]},{"task":"SET","data":[{"command":{"key":"key_53","value":"53"}}]},{"task":"SET","data":[{"command":{"key":"key_54","value":"54"}}]},{"task":"SET","data":[{"command":{"key":"key_55","value":"55"}}]},{"task":"SET","data":[{"command":{"key":"key_56","value":"56"}}]},{"task":"SET","data":[{"command":{"key":"key_57","value":"57"}}]},{"task":"SET","data":[{"command":{"key":"key_58","value":"58"}}]},{"task":"SET","data":[{"command":{"key":"key_59","value":"59"}}]},{"task":"SET","data":[{"command":{"key":"key_60","value":"60"}}]},{"task":"SET","data":[{"command":{"key":"key_61","value":"61"}}]},{"task":"SET","data":[{"command":{"key":"key_62","value":"62"}}]},{"task":"SET","data":[{"command":{"key":"key_63","value":"63"}}]},{"task":"SET","data":[{"command":{"key":"key_64","value":"64"}}]},{"task":"SET","data":[{"command":{"key":"key_65","value":"65"}}]},{"task":"SET","data":[{"command":{"key":"key_66","value":"66"}}]},{"task":"SET","data":[{"command":{"key":"key_67","value":"67"}}]},{"task":"SET","data":[{"command":{"key":"key_68","value":"68"}}]},{"task":"SET","data":[{"command":{"key":"key_69","value":"69"}}]},{"task":"SET","data":[{"command":{"key":"key_70","value":"70"}}]},{"task":"SET","data":[{"command":{"key":"key_71","value":"71"}}]},{"task":"SET","data":[{"command":{"key":"key_72","value":"72"}}]},{"task":"SET","data":[{"command":{"key":"key_73","value":"73"}}]},{"task":"SET","data":[{"command":{"key":"key_74","value":"74"}}]},{"task":"SET","data":[{"command":{"key":"key_75","value":"75"}}]},{"task":"SET","data":[{"command":{"key":"key_76","value":"76"}}]},{"task":"SET","data":[{"command":{"key":"key_77","value":"77"}}]},{"task":"SET","data":[{"command":{"key":"key_78","value":"78"}}]},{"task":"SET","data":[{"command":{"key":"key_79","value":"79"}}]},{"task":"SET","data":[{"command":{"key":"key_80","value":"80"}}]},{"task":"SET","data":[{"command":{"key":"key_81","value":"81"}}]},{"task":"SET","data":[{"command":{"key":"key_82","value":"82"}}]},{"task":"SET","data":[{"command":{"key":"key_83","value":"83"}}]},{"task":"SET","data":[{"command":{"key":"key_84","value":"84"}}]},{"task":"SET","data":[{"command":{"key":"key_85","value":"85"}}]},{"task":"SET","data":[{"command":{"key":"key_86","value":"86"}}]},{"task":"SET","data":[{"command":{"key":"key_87","value":"87"}}]},{"task":"SET","data":[{"command":{"key":"key_88","value":"88"}}]},{"task":"SET","data":[{"command":{"key":"key_89","value":"89"}}]},{"task":"SET","data":[{"command":{"key":"key_90","value":"90"}}]},{"task":"SET","data":[{"command":{"key":"key_91","value":"91"}}]},{"task":"SET","data":[{"command":{"key":"key_92","value":"92"}}]},{"task":"SET","data":[{"command":{"key":"key_93","value":"93"}}]},{"task":"SET","data":[{"command":{"key":"key_94","value":"94"}}]},{"task":"SET","data":[{"command":{"key":"key_95","value":"95"}}]},{"task":"SET","data":[{"command":{"key":"key_96","value":"96"}}]},{"task":"SET","data":[{"command":{"key":"key_97","value":"97"}}]},{"task":"SET","data":[{"command":{"key":"key_98","value":"98"}}]},{"task":"SET","data":[{"command":{"key":"key_99","value":"99"}}]},{"task":"SET","data":[{"command":{"key":"key_100","value":"100"}}]},{"task":"GET","data":[{"command":{"key":"key_1","value":"1"}}]},{"task":"GET","data":[{"command":{"key":"key_2","value":"2"}}]},{"task":"GET","data":[{"command":{"key":"key_3","value":"3"}}]},{"task":"GET","data":[{"command":{"key":"key_4","value":"4"}}]},{"task":"GET","data":[{"command":{"key":"key_5","value":"5"}}]},{"task":"GET","data":[{"command":{"key":"key_6","value":"6"}}]},{"task":"GET","data":[{"command":{"key":"key_7","value":"7"}}]},{"task":"GET","data":[{"command":{"key":"key_8","value":"8"}}]},{"task":"GET","data":[{"command":{"key":"key_9","value":"9"}}]},{"task":"GET","data":[{"command":{"key":"key_10","value":"10"}}]},{"task":"GET","data":[{"command":{"key":"key_11","value":"11"}}]},{"task":"GET","data":[{"command":{"key":"key_12","value":"12"}}]},{"task":"GET","data":[{"command":{"key":"key_13","value":"13"}}]},{"task":"GET","data":[{"command":{"key":"key_14","value":"14"}}]},{"task":"GET","data":[{"command":{"key":"key_15","value":"15"}}]},{"task":"GET","data":[{"command":{"key":"key_16","value":"16"}}]},{"task":"GET","data":[{"command":{"key":"key_17","value":"17"}}]},{"task":"GET","data":[{"command":{"key":"key_18","value":"18"}}]},{"task":"GET","data":[{"command":{"key":"key_19","value":"19"}}]},{"task":"GET","data":[{"command":{"key":"key_20","value":"20"}}]},{"task":"GET","data":[{"command":{"key":"key_21","value":"21"}}]},{"task":"GET","data":[{"command":{"key":"key_22","value":"22"}}]},{"task":"GET","data":[{"command":{"key":"key_23","value":"23"}}]},{"task":"GET","data":[{"command":{"key":"key_24","value":"24"}}]},{"task":"GET","data":[{"command":{"key":"key_25","value":"25"}}]},{"task":"GET","data":[{"command":{"key":"key_26","value":"26"}}]},{"task":"GET","data":[{"command":{"key":"key_27","value":"27"}}]},{"task":"GET","data":[{"command":{"key":"key_28","value":"28"}}]},{"task":"GET","data":[{"command":{"key":"key_29","value":"29"}}]},{"task":"GET","data":[{"command":{"key":"key_30","value":"30"}}]},{"task":"GET","data":[{"command":{"key":"key_31","value":"31"}}]},{"task":"GET","data":[{"command":{"key":"key_32","value":"32"}}]},{"task":"GET","data":[{"command":{"key":"key_33","value":"33"}}]},{"task":"GET","data":[{"command":{"key":"key_34","value":"34"}}]},{"task":"GET","data":[{"command":{"key":"key_35","value":"35"}}]},{"task":"GET","data":[{"command":{"key":"key_36","value":"36"}}]},{"task":"GET","data":[{"command":{"key":"key_37","value":"37"}}]},{"task":"GET","data":[{"command":{"key":"key_38","value":"38"}}]},{"task":"GET","data":[{"command":{"key":"key_39","value":"39"}}]},{"task":"GET","data":[{"command":{"key":"key_40","value":"40"}}]},{"task":"GET","data":[{"command":{"key":"key_41","value":"41"}}]},{"task":"GET","data":[{"command":{"key":"key_42","value":"42"}}]},{"task":"GET","data":[{"command":{"key":"key_43","value":"43"}}]},{"task":"GET","data":[{"command":{"key":"key_44","value":"44"}}]},{"task":"GET","data":[{"command":{"key":"key_45","value":"45"}}]},{"task":"GET","data":[{"command":{"key":"key_46","value":"46"}}]},{"task":"GET","data":[{"command":{"key":"key_47","value":"47"}}]},{"task":"GET","data":[{"command":{"key":"key_48","value":"48"}}]},{"task":"GET","data":[{"command":{"key":"key_49","value":"49"}}]},{"task":"GET","data":[{"command":{"key":"key_50","value":"50"}}]},{"task":"GET","data":[{"command":{"key":"key_51","value":"51"}}]},{"task":"GET","data":[{"command":{"key":"key_52","value":"52"}}]},{"task":"GET","data":[{"command":{"key":"key_53","value":"53"}}]},{"task":"GET","data":[{"command":{"key":"key_54","value":"54"}}]},{"task":"GET","data":[{"command":{"key":"key_55","value":"55"}}]},{"task":"GET","data":[{"command":{"key":"key_56","value":"56"}}]},{"task":"GET","data":[{"command":{"key":"key_57","value":"57"}}]},{"task":"GET","data":[{"command":{"key":"key_58","value":"58"}}]},{"task":"GET","data":[{"command":{"key":"key_59","value":"59"}}]},{"task":"GET","data":[{"command":{"key":"key_60","value":"60"}}]},{"task":"GET","data":[{"command":{"key":"key_61","value":"61"}}]},{"task":"GET","data":[{"command":{"key":"key_62","value":"62"}}]},{"task":"GET","data":[{"command":{"key":"key_63","value":"63"}}]},{"task":"GET","data":[{"command":{"key":"key_64","value":"64"}}]},{"task":"GET","data":[{"command":{"key":"key_65","value":"65"}}]},{"task":"GET","data":[{"command":{"key":"key_66","value":"66"}}]},{"task":"GET","data":[{"command":{"key":"key_67","value":"67"}}]},{"task":"GET","data":[{"command":{"key":"key_68","value":"68"}}]},{"task":"GET","data":[{"command":{"key":"key_69","value":"69"}}]},{"task":"GET","data":[{"command":{"key":"key_70","value":"70"}}]},{"task":"GET","data":[{"command":{"key":"key_71","value":"71"}}]},{"task":"GET","data":[{"command":{"key":"key_72","value":"72"}}]},{"task":"GET","data":[{"command":{"key":"key_73","value":"73"}}]},{"task":"GET","data":[{"command":{"key":"key_74","value":"74"}}]},{"task":"GET","data":[{"command":{"key":"key_75","value":"75"}}]},{"task":"GET","data":[{"command":{"key":"key_76","value":"76"}}]},{"task":"GET","data":[{"command":{"key":"key_77","value":"77"}}]},{"task":"GET","data":[{"command":{"key":"key_78","value":"78"}}]},{"task":"GET","data":[{"command":{"key":"key_79","value":"79"}}]},{"task":"GET","data":[{"command":{"key":"key_80","value":"80"}}]},{"task":"GET","data":[{"command":{"key":"key_81","value":"81"}}]},{"task":"GET","data":[{"command":{"key":"key_82","value":"82"}}]},{"task":"GET","data":[{"command":{"key":"key_83","value":"83"}}]},{"task":"GET","data":[{"command":{"key":"key_84","value":"84"}}]},{"task":"GET","data":[{"command":{"key":"key_85","value":"85"}}]},{"task":"GET","data":[{"command":{"key":"key_86","value":"86"}}]},{"task":"GET","data":[{"command":{"key":"key_87","value":"87"}}]},{"task":"GET","data":[{"command":{"key":"key_88","value":"88"}}]},{"task":"GET","data":[{"command":{"key":"key_89","value":"89"}}]},{"task":"GET","data":[{"command":{"key":"key_90","value":"90"}}]},{"task":"GET","data":[{"command":{"key":"key_91","value":"91"}}]},{"task":"GET","data":[{"command":{"key":"key_92","value":"92"}}]},{"task":"GET","data":[{"command":{"key":"key_93","value":"93"}}]},{"task":"GET","data":[{"command":{"key":"key_94","value":"94"}}]},{"task":"GET","data":[{"command":{"key":"key_95","value":"95"}}]},{"task":"GET","data":[{"command":{"key":"key_96","value":"96"}}]},{"task":"GET","data":[{"command":{"key":"key_97","value":"97"}}]},{"task":"GET","data":[{"command":{"key":"key_98","value":"98"}}]},{"task":"GET","data":[{"command":{"key":"key_99","value":"99"}}]},{"task":"GET","data":[{"command":{"key":"key_100","value":"100"}}]}] \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 96df31f..856556f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,7 +28,8 @@ "one-time": "^1.0.0", "promise-queue": "^2.2.5", "rimraf": "^5.0.5", - "tick-tock": "^1.0.0" + "tick-tock": "^1.0.0", + "uuid": "^9.0.1" } }, "node_modules/@ampproject/remapping": { @@ -4260,6 +4261,18 @@ "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" }, + "node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-to-istanbul": { "version": "9.2.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.2.0.tgz", @@ -7462,6 +7475,11 @@ "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" }, + "uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==" + }, "v8-to-istanbul": { "version": "9.2.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.2.0.tgz", diff --git a/package.json b/package.json index e6086c9..4d4c228 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "one-time": "^1.0.0", "promise-queue": "^2.2.5", "rimraf": "^5.0.5", - "tick-tock": "^1.0.0" + "tick-tock": "^1.0.0", + "uuid": "^9.0.1" } } diff --git a/proxy.js b/proxy.js index 064cd1c..016945b 100644 --- a/proxy.js +++ b/proxy.js @@ -1,10 +1,15 @@ /** * @description This file serves as the basis for the event emitting proxy server that talks to the raft cluster + * @link https://stackoverflow.com/questions/35054868/sending-socket-data-separately-in-node-js -- since this is a TCP stream hence we have to separate out the stuff. + * @link https://stackoverflow.com/questions/12872563/issues-when-reading-a-string-from-tcp-socket-in-node-js */ +const argv = require("argh").argv; const net = require("net"); const { EventEmitter } = require("events"); +let port = +argv.port || 8081; // read the port from command line arguments + class QueryQueue extends EventEmitter { constructor() { super(); @@ -17,24 +22,66 @@ class QueryQueue extends EventEmitter { this.emit("newQuery"); } + async execute(query) { + console.log("inside execute"); + return new Promise((resolve, reject) => { + var client = net.createConnection({ port: port + 1000 }, () => { + console.log("connected to server at port", port + 1000); + }); + // write this query to socket + console.log("query: ", query); + client.write(JSON.stringify(query) + "\n"); + client.on("data", (data) => { + if (data === undefined) { + console.log("*****************"); + console.log("data is undefined"); + console.log("*****************"); + } + if (data.toString().trim() != "Connected") { + console.log( + "data in response of proxy socket write:", + data.toString() + ); + resolve(data.toString()); + client.destroy(); + } else { + console.log("data: ", data.toString()); + } + }); + + client.on("close", () => {}); + + client.on("error", reject); + }); + } + processQueue() { if (this.isProcessing || this.queue.length === 0) { return; } this.isProcessing = true; - const { query, callback, queryId } = this.queue.shift(); + const { query, callback, queryId } = this.queue[0]; + this.execute(query) - .then((result) => callback(null, queryId, result)) - .catch((error) => callback(error, queryId)) + .then((result) => { + console.log("result: ", result); + console.log("result.toString(): ", result.toString()); + if (result.toString().trim() === "error 8") { + let e = new Error("Error 8 Another connection active trying again"); + callback(e, queryId); + } else this.queue.shift(); + return callback(null, queryId, result); + }) + .catch((error) => { + console.log("error: ", error); + console.log("error.toString(): ", error.toString()); + return callback(error, queryId); + }) .finally(() => { this.isProcessing = false; this.processQueue(); }); } - - async execute(query) { - // TODO: Replace with KV GET SET Logic - } } const queryQueue = new QueryQueue(); @@ -51,18 +98,38 @@ server.on("connection", (socket) => { activeConnection = true; socket.write("Connected\n"); - + let chunk = ""; // stores data from the stream socket.on("data", (data) => { - const [queryId, query] = data.toString().trim().split(":", 2); - queryQueue.addQuery(queryId, query, (error, queryId, result) => { - let response; - if (error) { - response = `${queryId}<|>Error: ${error.message}`; - } else { - response = `${queryId}<|>${JSON.stringify(result)}`; - } - socket.write(response + "\n"); - }); + console.log("data: ", data.toString()); + chunk += data.toString(); + d_index = chunk.indexOf("\n"); + while (d_index > -1) { + const element = chunk.substring(0, d_index); + console.log("element: ", element); + const [queryId, query] = element.trim().split("|", 2); + console.log("queryId ", queryId, "query ", query); + queryQueue.addQuery(queryId, query, (error, queryId, result) => { + let response; + if (error) { + response = `${queryId}<|>Error: ${error.message}`; + } else { + console.log("result in final formatter: ", result); + response = `${queryId}<|>${JSON.stringify(result)}`; + } + socket.write(response + "\n"); + }); + chunk = chunk.substring(d_index + 1); // Cuts off the processed chunk + d_index = chunk.indexOf("\n"); + } + // data = data + // .toString() + // .split("\n") + // .map((str) => { + // console.log("str: ", str); + // return str.trim(); + // }) + // .filter((str) => str !== "") + // .forEach((element) => {}); }); socket.on("close", () => { diff --git a/raft-node.js b/raft-node.js index 701bf8f..946f650 100644 --- a/raft-node.js +++ b/raft-node.js @@ -59,18 +59,6 @@ async function onData(data) { const arr = data?.data; if (arr && arr.length > 0) console.log("in data.on('data'): ", arr[0].command); - // send this data to leader - if (raft.state !== MsgRaft.LEADER) { - try { - // send acknowledgement - if (arr && arr.length > 0) { - const ackPacket = await raft.packet("append ack", arr[0].command); - raft.message(MsgRaft.LEADER, ackPacket); - } - } catch (err) { - console.error("error while forwarding response to leader: ", err); - } - } } function onCommit(command) { @@ -90,17 +78,23 @@ function onCommit(command) { // TODO: Turn this into a Promise returning thing const registerNode = function (port, config = {}) { // return new Promise((resolve, reject) => { - raft = new MsgRaft("tcp://0.0.0.0:" + port, { - "election min": config.min, - "election max": config.max, - heartbeat: config.heartbeat, - adapter: require("leveldown"), - path: `./log/${port}/`, - Log: new Log(this, { + try { + let log = new Log({ + adapter: require("leveldown"), + path: `./log/${port}/`, + }) + raft = new MsgRaft("tcp://0.0.0.0:" + port, { + "election min": config.min, + "election max": config.max, + heartbeat: config.heartbeat, adapter: require("leveldown"), path: `./log/${port}/`, - }), - }); + Log: log, + }); + log.setNode(raft) + } catch (err) { + console.log("error creating msgraft node: ", err); + } // registering callbacks on the instance raft diff --git a/raft/log.js b/raft/log.js index c2a53b7..a34e481 100644 --- a/raft/log.js +++ b/raft/log.js @@ -1,14 +1,14 @@ -const encode = require('encoding-down'); -const levelup = require('levelup'); -const PromiseQueue = require('promise-queue') +const encode = require("encoding-down"); +const levelup = require("levelup"); +const PromiseQueue = require("promise-queue"); const keyEncoding = { - decode: function (raw) { - return parseInt(raw); - }, - encode: function (key) { - return key.toString(16).padStart(8, '0'); - } + decode: function (raw) { + return parseInt(raw); + }, + encode: function (key) { + return key.toString(16).padStart(8, "0"); + }, }; /** @@ -20,345 +20,388 @@ const keyEncoding = { * @property {object} command The command to be used in the raft state machine */ class Log { - /** - * @class - * @param {object} node The raft node using this log - * @param {object} Options Options object - * @param {object} Options.[adapter= require('leveldown')] Leveldown adapter, defaults to leveldown - * @param {string} Options.[path='./'] Path to save the log db to - * @return {Log} - */ - constructor(node, { adapter = require('leveldown'), path = '' }) { - this.node = node; - this.committedIndex = 0; - // this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding: 'binary' })); - this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding }), { - cacheSize: 100 * 1024 * 1024 - }); - this.commandAckQueue = new PromiseQueue(1, Infinity); + /** + * @class + * @param {object} node The raft node using this log + * @param {object} Options Options object + * @param {object} Options.[adapter= require('leveldown')] Leveldown adapter, defaults to leveldown + * @param {string} Options.[path='./'] Path to save the log db to + * @return {Log} + */ + constructor({ adapter = require("leveldown"), path = "" }) { + // this.node = node; + this.committedIndex = 0; +// this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding: 'binary' })); + try { + this.db = levelup( + encode(adapter(path), { valueEncoding: "json", keyEncoding }), + { + cacheSize: 100 * 1024 * 1024, + } + ); + } catch (err) { + console.log("error initialising db adapter for logs in raft: ", err); } - - /** - * saveCommand - Saves a command to the log - * Initially the command is uncommitted. Once a majority - * of follower nodes have saved the log entry it will be - * committed. - * - * A follow node will also use this method to save a received command to - * its log - * - * @async - * @param {object} command A json object to save to the log - * @param {number} term Term to save with the log entry - * @param {number} [index] Index to save the entry with. This is used by the followers - * @return {Promise} Description - */ - async saveCommand(command, term, index) { - - if (!index) { - const { - index: lastIndex, - } = await this.getLastInfo(); - - index = lastIndex + 1; - } - - const entry = { - term: term, - index, - committed: false, - responses: [{ - address: this.node.address, // start with vote from leader - ack: true - }], - command, - } - - await this.put(entry); - return entry; - } - - /** - * put - Save entry to database using the index as the key - * - * @async - * @param {Entry} entry entry to save - * @return {Promise} Resolves once entry is saved - * @public - */ - put(entry) { - return this.db.put(entry.index, entry); + this.commandAckQueue = new PromiseQueue(1, Infinity); + } + + setNode(node){ + this.node = node + } + + /** + * saveCommand - Saves a command to the log + * Initially the command is uncommitted. Once a majority + * of follower nodes have saved the log entry it will be + * committed. + * + * A follow node will also use this method to save a received command to + * its log + * + * @async + * @param {object} command A json object to save to the log + * @param {number} term Term to save with the log entry + * @param {number} [index] Index to save the entry with. This is used by the followers + * @return {Promise} Description + */ + async saveCommand(command, term, index) { + if (!index) { + const { index: lastIndex } = await this.getLastInfo(); + + index = parseInt(lastIndex) + 1; } - - /** - * getEntriesAfter - Get all the entries after a specific index - * - * @param {number} index Index that entries must be greater than - * @return {Promise} returns all entries - * @public - */ - getEntriesAfter(index) { - const entries = []; - return new Promise((resolve, reject) => { - this.db.createReadStream({ gt: index }) - .on('data', data => { - entries.push(data.value); - }) - .on('error', err => { - reject(err) - }) - .on('end', () => { - resolve(entries); - }) - }); - - } - - /** - * removeEntriesAfter - Removes all entries after a given index - * - * @async - * @param {Number} index Index to use to find all entries after - * @return {Promise} Returns once all antries are removed - * @public - */ - async removeEntriesAfter(index) { - const entries = await this.getEntriesAfter(index) - return Promise.all(entries.map(entry => { - return this.db.del(entry.index); - })); - } - - /** - * has - Checks if entry exists at index - * - * @async - * @param {number} index Index position to check if entry exists - * @return {boolean} Boolean on whether entry exists at index - * @public - */ - async has(index) { - try { - const entry = await this.db.get(index); - return true - } catch (err) { - return false; - } + + const entry = { + term: term, + index, + committed: false, + responses: [ + { + address: this.node.address, // start with vote from leader + ack: true, + }, + ], + command, + }; + + await this.put(entry); + return entry; + } + + /** + * put - Save entry to database using the index as the key + * + * @async + * @param {Entry} entry entry to save + * @return {Promise} Resolves once entry is saved + * @public + */ + async put(entry) { + return await this.db.put(entry.index, entry); + } + + /** + * getEntriesAfter - Get all the entries after a specific index + * + * @param {number} index Index that entries must be greater than + * @return {Promise} returns all entries + * @public + */ + getEntriesAfter(index) { + const entries = []; + return new Promise((resolve, reject) => { +try { + this.db + .createReadStream({ gt: index }) + .on("data", (data) => { + entries.push(data.value); + }) + .on("error", (err) => { + reject(err); + }) + .on("end", () => { + resolve(entries); + }); +} catch (err) { + console.error("error getting log entries after index: ", index); + } + }); + } + + /** + * removeEntriesAfter - Removes all entries after a given index + * + * @async + * @param {Number} index Index to use to find all entries after + * @return {Promise} Returns once all antries are removed + * @public + */ + async removeEntriesAfter(index) { + const entries = await this.getEntriesAfter(index); + return Promise.all( + entries.map((entry) => { + return this.db.del(entry.index); + }) + ); + } + + /** + * has - Checks if entry exists at index + * + * @async + * @param {number} index Index position to check if entry exists + * @return {boolean} Boolean on whether entry exists at index + * @public + */ + async has(index) { +try { + const entry = await this.db.get(index); + // if (!entry) return false; + return true; +} catch (err) { + console.log("error fetching key from db: ", err); + return false; } - - /** - * get - Gets an entry at the specified index position - * - * @param {type} index Index position of entry - * @return {Promise} Promise of found entry returns NotFoundError if does not exist - * @public - */ - get(index) { - return this.db.get(index); + } + + /** + * get - Gets an entry at the specified index position + * + * @param {type} index Index position of entry + * @return {Promise} Promise of found entry returns NotFoundError if does not exist + * @public + */ + get(index) { +try { + return this.db.get(index); +} catch (err) { + console.error("error fetching key from log db: ", err); } - - /** - * getLastInfo - Returns index, term of the last entry in the long along with - * the committedIndex - * - * @async - * @return {Promise} Last entries index, term and committedIndex - */ - async getLastInfo() { + } + + /** + * getLastInfo - Returns index, term of the last entry in the long along with + * the committedIndex + * + * @async + * @return {Promise} Last entries index, term and committedIndex + */ + async getLastInfo() { const { index, term } = await this.getLastEntry(); - - return { - index, - term, - committedIndex: this.committedIndex - }; + + return { + index, + term, + committedIndex: this.committedIndex, + }; + } + + /** + * getLastEntry - Returns last entry in the log + * + * @return {Promise} returns {index: 0, term: node.term} if there are no entries in the log + */ + getLastEntry() { + return new Promise((resolve, reject) => { + let hasResolved = false; + let entry = { + index: 0, + term: this.node.term, + }; +try { + this.db + .createReadStream({ reverse: true, limit: 1 }) + .on("data", (data) => { + hasResolved = true; + entry = data.value; + }) + .on("error", (err) => { + hasResolved = true; + reject(err); + }) + .on("end", () => { + resolve(entry); + }); +} catch (err) { + console.error("error getting last log entry in raft: ", err); + reject(err); + } + }); + } + + /** + * getEntryInfoBefore - Gets the index and term of the previous entry along with the log's committedIndex + * If there is no item before it returns {index: 0} + * + * + * @async + * @param {Entry} entry + * @return {Promise} {index, term, committedIndex} + */ + async getEntryInfoBefore(entry) { + const { index, term } = await this.getEntryBefore(entry); + + return { + index, + term, + committedIndex: this.committedIndex, + }; + } + + /** + * getEntryBefore - Get entry before the specified entry + * If there is no item before it returns {index: 0} + * + * @async + * @param {Entry} entry + * + * @return {Promise} + */ + getEntryBefore(entry) { + const defaultInfo = { + index: 0, + term: this.node.term, + }; + // We know it is the first entry, so save the query time + if (entry.index === 1) { + return Promise.resolve(defaultInfo); } - /** - * getLastEntry - Returns last entry in the log - * - * @return {Promise} returns {index: 0, term: node.term} if there are no entries in the log - */ - getLastEntry() { - return new Promise((resolve, reject) => { - let hasResolved = false; - let entry = { - index: 0, - term: this.node.term - }; - - this.db.createReadStream({ reverse: true, limit: 1 }) - .on('data', data => { - hasResolved = true; - entry = data.value; - }) - .on('error', err => { - hasResolved = true; - reject(err) - }) - .on('end', () => { - resolve(entry); - }) - }); - } - - /** - * getEntryInfoBefore - Gets the index and term of the previous entry along with the log's committedIndex - * If there is no item before it returns {index: 0} - * - * - * @async - * @param {Entry} entry - * @return {Promise} {index, term, committedIndex} - */ - async getEntryInfoBefore(entry) { - const { index, term } = await this.getEntryBefore(entry); - + return new Promise((resolve, reject) => { + let hasResolved = false; +try { + this.db + .createReadStream({ + reverse: true, + limit: 1, + lt: entry.index, + }) + .on("data", (data) => { + hasResolved = true; + resolve(data.value); + }) + .on("error", (err) => { + hasResolved = true; + reject(err); + }) + .on("end", () => { + if (!hasResolved) { + // Returns empty index if there is no items + // before entry or log is empty + resolve(defaultInfo); + } + }); +} catch (err) { + console.error("Error getting log entry before ", err); + reject(err); + } + }); + } + + /** + * commandAck - acknowledges a follow with address has stored entry at index + * This is used to determine if a quorom has been met for a log entry and + * if enough followers have stored it so that it can be committed + * + * @async + * @param {number} index Index of entry that follow has stored + * @param {string} address Address of follower that has stored log + * @return {Promise} + */ + async commandAck(index, address) { + return this.commandAckQueue.add(async () => { + let entry; +try { + entry = await this.get(index); +} catch (err) { return { - index, - term, - committedIndex: this.committedIndex - }; - } - - /** - * getEntryBefore - Get entry before the specified entry - * If there is no item before it returns {index: 0} - * - * @async - * @param {Entry} entry - * - * @return {Promise} - */ - getEntryBefore(entry) { - const defaultInfo = { - index: 0, - term: this.node.term + responses: [], }; - // We know it is the first entry, so save the query time - if (entry.index === 1) { - return Promise.resolve(defaultInfo); - } + } - return new Promise((resolve, reject) => { - let hasResolved = false; + const entryIndex = await entry.responses.findIndex( + (resp) => resp.address === address + ); - this.db.createReadStream({ - reverse: true, - limit: 1, - lt: entry.index - }) - .on('data', (data) => { - hasResolved = true; - resolve(data.value); - }) - .on('error', (err) => { - hasResolved = true; - reject(err); - }) - .on('end', () => { - if (!hasResolved) { - // Returns empty index if there is no items - // before entry or log is empty - resolve(defaultInfo); - } - }); + // node hasn't voted yet. Add response + if (entryIndex === -1) { + entry.responses.push({ + address, + ack: true, }); + } + + await this.put(entry); + + return entry; + }); + } + /** + * commit - Set the entry to committed + * + * @async + * @param {number} Index index + * + * @return {Promise} + */ + async commit(index) { +try { + const entry = await this.db.get(index); + + entry.committed = true; + this.committedIndex = entry.index; + + return this.put(entry); +} catch (err) { + console.error("error committing log: ", err); } - - /** - * commandAck - acknowledges a follow with address has stored entry at index - * This is used to determine if a quorom has been met for a log entry and - * if enough followers have stored it so that it can be committed - * - * @async - * @param {number} index Index of entry that follow has stored - * @param {string} address Address of follower that has stored log - * @return {Promise} - */ - async commandAck(index, address) { - return this.commandAckQueue.add(async () => { - let entry; - try { - entry = await this.get(index); - } catch (err) { - return { - responses: [] - } - } - - const entryIndex = await entry.responses.findIndex(resp => resp.address === address); - - // node hasn't voted yet. Add response - if (entryIndex === -1) { - entry.responses.push({ - address, - ack: true - }); + } + + /** + * getUncommittedEntriesUpToIndex - Returns all entries before index that have not been committed yet + * + * @param {number} index Index value to find all entries up to + * @return {Promise { + let hasResolved = false; + const entries = []; +try { + this.db + .createReadStream({ + gt: this.committedIndex, + lte: index, + }) + .on("data", (data) => { + if (!data.value.committed) { + entries.push(data.value); } - - await this.put(entry); - - return entry; - }) - } - /** - * commit - Set the entry to committed - * - * @async - * @param {number} Index index - * - * @return {Promise} - */ - async commit(index) { - const entry = await this.db.get(index); - - entry.committed = true; - this.committedIndex = entry.index; - - return this.put(entry); - } - - /** - * getUncommittedEntriesUpToIndex - Returns all entries before index that have not been committed yet - * - * @param {number} index Index value to find all entries up to - * @return {Promise { - let hasResolved = false; - const entries = []; - - this.db.createReadStream({ - gt: this.committedIndex, - lte: index - }) - .on('data', data => { - if (!data.value.committed) { - entries.push(data.value); - } - }) - .on('error', err => { - reject(err) - }) - .on('end', () => { - resolve(entries); - }); - }); - } - - /** - * end - Log end - * Called when the node is shutting down - * - * @return {boolean} Successful close. - * @private - */ - end() { - return this.db.close(); - } -}; + }) + .on("error", (err) => { + reject(err); + }) + .on("end", () => { + resolve(entries); + }); +} catch (err) { + console.error("error gettingUncommittedEntriesUptoIndex: ", err); + reject(err); + } + }); + } + + /** + * end - Log end + * Called when the node is shutting down + * + * @return {boolean} Successful close. + * @private + */ + end() { + return this.db.close(); + } +} module.exports = Log; diff --git a/server.js b/server.js index 6fd7115..293a177 100644 --- a/server.js +++ b/server.js @@ -26,7 +26,7 @@ server.on("connection", (socket) => { if (activeConnection) { // this limits to one connection at a time // TODO: Move to multi connections - socket.end("Another connection is already active."); + socket.end("error 8"); return; } activeConnection = true; @@ -34,6 +34,9 @@ server.on("connection", (socket) => { socket.write("Connected\n"); socket.on("data", async (pkt) => { + if (pkt === undefined) { + socket.write("undefined packet received"); + } console.log("***************************************"); console.log("***************************************"); console.log("RECEIVED PACKET IS: "); @@ -56,9 +59,9 @@ server.on("connection", (socket) => { console.log(pkt); console.log("#########################################"); console.log("#########################################"); - pkt.forEach(async (item) => { + for(const item of pkt) { console.log("item in for each: ", item); - const { task, data } = item; + const { task, data } = JSON.parse(item); if (raftNode.state === MsgRaft.LEADER) { switch (task) { case "SET": @@ -81,9 +84,10 @@ server.on("connection", (socket) => { try { if (data && data.length > 0) { let cmd = data[0].command; - cmd["type"] = "GET"; - await raftNode.command(cmd); let val = raftNode.db.get(cmd.key); + if(val == null){ + + } socket.write(`Value of key : ${cmd.key} is ${val}`); } else { throw new Error("Invalid data format"); @@ -117,14 +121,10 @@ server.on("connection", (socket) => { try { if (data && data.length > 0) { let cmd = data[0].command; - cmd["type"] = "GET"; - let packet = await raftNode.packet("rpc", cmd); - raftNode.message(MsgRaft.LEADER, packet, () => { - console.log( - "Forwarded the set command to leader since I am a follower." - ); - }); let val = raftNode.db.get(cmd.key); + if(val == null){ + + } socket.write(`Value of key : ${cmd.key} is ${val}`); } else { throw new Error("Invalid data format"); @@ -138,12 +138,15 @@ server.on("connection", (socket) => { raftNode.db.closeDb(); break; default: + if (task === undefined) { + console.warn("one of the packets was with undefined task"); + socket.write("one of the packets was with undefined task"); + } console.log("in default and task: ", task); - reply("error 90"); break; } } - }); + } }); // TODO: Figure out a way to connect sockPull to the socket received with server connection diff --git a/single-client-proxy.js b/single-client-proxy.js new file mode 100644 index 0000000..80c3c12 --- /dev/null +++ b/single-client-proxy.js @@ -0,0 +1,47 @@ +/** + * @description This file serves as the client to send out events to the proxy server + * @link https://stackoverflow.com/questions/35054868/sending-socket-data-separately-in-node-js -- since this is a TCP stream hence we have to separate out the stuff. + */ +const argv = require("argh").argv; +const net = require("net"); +const { v4: uuidv4 } = require("uuid"); + +const givePayloadSkeleton = (op, key, val = null) => { + return { + task: op, + data: [ + { + command: { + key: key, + value: val, + }, + }, + ], + }; +}; +const generatePayload = (op, len) => { + const payloads = []; + for (let i = 1; i <= len; i++) { + payloads.push(givePayloadSkeleton(op, "key_" + i, i + "")); + } + + return payloads; +}; + +const events = []; +events.push(...generatePayload("SET", 100_000)); +events.push(...generatePayload("GET", 100_000)); +require("fs").writeFileSync("./events.json", JSON.stringify(events)); +var netSocket = net.createConnection({ port: 6767 }, () => { + console.log("connected to server at port", 6767); +}); + +for (const event of events) { + console.log("event: ", event); + netSocket.write(`${uuidv4()}|` + JSON.stringify(event) + "\n"); +} + +netSocket.on("data", (buffer) => { + const data = buffer.toString("utf8"); + console.log(data); +});