From 53ee440f1f4756ed28166001b415a75424cc1adc Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Wed, 28 Feb 2024 16:24:28 +0530 Subject: [PATCH 1/5] feat(maybe unstable): get func for leader --- raft-node.js | 10 ++++++++-- server.js | 16 ++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/raft-node.js b/raft-node.js index 6e4fc91..701bf8f 100644 --- a/raft-node.js +++ b/raft-node.js @@ -76,8 +76,14 @@ async function onData(data) { function onCommit(command) { // TODO: Edit this to make perfect console.log("Inside commit", command); - raft.db.set(command.key, command.value); - console.log("Committed", command.key, command.value); + let val; + if (command.type === "SET") { + raft.db.set(command.key, command.value); + console.log("Committed", command.key, command.value); + } else if (command.type === "GET") { + val = raft.db.get(command.key); + console.log("got val in GET: ", val); + } } // main function to create and return the instance diff --git a/server.js b/server.js index 3ed1451..9c85d78 100644 --- a/server.js +++ b/server.js @@ -44,6 +44,7 @@ server.on("connection", (socket) => { try { if (data && data.length > 0) { let cmd = data[0].command; + cmd["type"] = "SET"; await raftNode.command(cmd); socket.write( `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` @@ -57,6 +58,21 @@ server.on("connection", (socket) => { } break; case "GET": + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + await raftNode.command(cmd); + socket.write( + `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` + ); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); + } break; case "EXIT": raftNode.db.closeDb(); From d17224f141d82b9c034ca584bcaf32d186ed6408 Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Wed, 28 Feb 2024 17:00:27 +0530 Subject: [PATCH 2/5] get client implementation --- client.js | 45 +++++++++++++++++++++++++++++++++++++++++++-- server.js | 7 +++---- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/client.js b/client.js index a369978..e6ce680 100644 --- a/client.js +++ b/client.js @@ -25,7 +25,7 @@ var netSocket = net.createConnection({ port: port + 1000 }, () => { // }, // ], // }; -function sequentialCalls(times){ +function sequentialSETCalls(times){ let i = 1; function makeCall() { @@ -50,6 +50,10 @@ function sequentialCalls(times){ if (i <= times) { // Set a timeout for 1 second before making the next call setTimeout(makeCall, 1000); + }else{ + setTimeout(()=>{ + sequentialGETCalls(5); + }, 1000); } } @@ -57,7 +61,44 @@ function sequentialCalls(times){ makeCall(); } -sequentialCalls(5); +function sequentialGETCalls(times){ + let i = 1; + + function makeCall() { + // Your function or code that needs to be called sequentially + let data = { + task: "GET", + data: [ + { + command: { + key: "a" + i + }, + }, + ], + }; + let st = JSON.stringify(data) + netSocket.write(st); + + i++; + + // Check if there are more calls to make + if (i <= times) { + // Set a timeout for 1 second before making the next call + setTimeout(makeCall, 1000); + } + } + + // Start the first call + makeCall(); +} + +sequentialSETCalls(5); + + +netSocket.on("data", (buffer)=>{ + const data = buffer.toString('utf8'); + console.log(data); +}) // send a message to the raft every 5 seconds // setInterval(async () => { diff --git a/server.js b/server.js index 9c85d78..93d5071 100644 --- a/server.js +++ b/server.js @@ -47,7 +47,7 @@ server.on("connection", (socket) => { cmd["type"] = "SET"; await raftNode.command(cmd); socket.write( - `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` + `${JSON.stringify(cmd)} - ack` ); } else { throw new Error("Invalid data format"); @@ -63,9 +63,8 @@ server.on("connection", (socket) => { let cmd = data[0].command; cmd["type"] = "GET"; await raftNode.command(cmd); - socket.write( - `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` - ); + let val = raftNode.db.get(cmd.key) + socket.write(`Value of key : ${cmd.key} is ${val}`); } else { throw new Error("Invalid data format"); } From 67b661f310d107c5ad1a5b12c19d0bd38da9189c Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Wed, 28 Feb 2024 17:54:47 +0530 Subject: [PATCH 3/5] working get and set on all nodes --- raft/index.js | 1 - server.js | 34 +++++++++++++++++++++++++--------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/raft/index.js b/raft/index.js index 799af43..01d075a 100644 --- a/raft/index.js +++ b/raft/index.js @@ -948,7 +948,6 @@ class Raft extends EventEmitter { async commitEntries(entries) { entries.forEach(async (entry) => { await this.log.commit(entry.index) - console.log("RAHUL entries commited in commit entries method", entry); this.emit('commit', entry.command); }); } diff --git a/server.js b/server.js index 93d5071..cc95602 100644 --- a/server.js +++ b/server.js @@ -82,23 +82,39 @@ server.on("connection", (socket) => { } else { switch (task) { case "SET": - let packet = await raftNode.packet("append", data); - console.log("RAHUL received packet", packet, "as not leader in set"); + let cmd = data[0].command; + cmd["type"] = "SET"; + let packet = await raftNode.packet("rpc", cmd); // forward to leader raftNode.message(MsgRaft.LEADER, packet, () => { console.log( "Forwarded the set command to leader since I am a follower." ); }); + socket.write( + `${JSON.stringify(cmd)} - ack` + ); break; case "GET": - // TODO: Test for async - console.log("Received a GET event on socket"); - // Implement round robin here based on current state of Raft - reply(raftNode.db.get(data.key)); - // if (raft.state !== MsgRaft.LEADER) { - // reply(raft.db.get(data.key)); - // } + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + let packet = await raftNode.packet("rpc", cmd); + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); + let val = raftNode.db.get(cmd.key) + socket.write(`Value of key : ${cmd.key} is ${val}`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); + } break; case "EXIT": raftNode.db.closeDb(); From ad77243cf327a7b20de4429510a26b7a09ba47cc Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Tue, 12 Mar 2024 23:23:33 +0530 Subject: [PATCH 4/5] fix: allow for multiple socket writes at a time --- axon-client.js | 57 ++++++++++++++ client.js | 158 ++++++++++++++------------------------ server.js | 192 ++++++++++++++++++++++++++--------------------- single-client.js | 49 ++++++++++++ 4 files changed, 269 insertions(+), 187 deletions(-) create mode 100644 axon-client.js create mode 100644 single-client.js diff --git a/axon-client.js b/axon-client.js new file mode 100644 index 0000000..16ba417 --- /dev/null +++ b/axon-client.js @@ -0,0 +1,57 @@ +const axon = require("axon"); +const argv = require("argh").argv; + +// Create a Push socket +const socket = axon.socket("push"); + +let port = +argv.port || 8081; +// Connect to the socket +socket.connect(port); // Replace 8000 with the port you want to connect to + +// Data to send +const setMessages = createEventObjects(5, "SET"); +const getMessages = createEventObjects(5, "GET"); + +// Iterate through the messages and send each one +for (const message of setMessages) { + console.log("sending message: ", message); + socket.send(JSON.stringify(message)); +} + +// WAIT for a second and send GET messages +setTimeout(() => { + for (const message of getMessages) { + console.log("sending message: ", message); + socket.send(JSON.stringify(message)); + } +}, 1000); + +// Close the socket when done (optional) +socket.close(); + +/** + * + * @param {number} len length of the array required + * @param {string} op operation to be performed + * @returns + */ +function createEventObjects(len, op) { + const items = []; + for (let i = 1; i <= len; i++) { + let data = { + task: op, + data: [ + { + command: { + key: "key_" + i, + value: i + "", + }, + }, + ], + }; + if (op === "GET") delete data.data[0].command.value; + items.push(data); + } + + return items; +} diff --git a/client.js b/client.js index e6ce680..75c0af2 100644 --- a/client.js +++ b/client.js @@ -2,66 +2,43 @@ * this file serves as client to connect to the server raft nodes */ const argv = require("argh").argv; -var axon = require("axon"); const net = require("net"); -// var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments var netSocket = net.createConnection({ port: port + 1000 }, () => { console.log("connected to server at port", port + 1000); }); -// sockPush.connect(port + 1200); +const sendEvent = (op, data) => { + let data = { + task: op, + data: data, + }; + netSocket.write(JSON.stringify(data)); +}; -// const ex = { -// task: "EXIT", -// data: [ -// { -// command: { -// key: "0", -// value: "7", -// }, -// }, -// ], -// }; -function sequentialSETCalls(times){ - let i = 1; - - function makeCall() { - // Your function or code that needs to be called sequentially - let data = { - task: "SET", - data: [ - { - command: { - key: "a" + i, - value: "" + i, - }, +let i = 1; +(() => { + setInterval(() => { + // SEND 5 SEQUENTIAL SETS + setTimeout( + sendEvent("GET", [ + { + command: { + key: "a" + i, + value: "" + i, }, - ], - }; - let st = JSON.stringify(data) - netSocket.write(st); - - i++; - - // Check if there are more calls to make - if (i <= times) { - // Set a timeout for 1 second before making the next call - setTimeout(makeCall, 1000); - }else{ - setTimeout(()=>{ - sequentialGETCalls(5); - }, 1000); - } - } - - // Start the first call - makeCall(); -} + }, + ]), + 500 + ); + // SEND 5 SEQUENTIAL GETS + i++; + }, 1000); +})(); -function sequentialGETCalls(times){ +function sequentialGETCalls(times) { let i = 1; function makeCall() { @@ -71,12 +48,12 @@ function sequentialGETCalls(times){ data: [ { command: { - key: "a" + i + key: "a" + i, }, }, ], }; - let st = JSON.stringify(data) + let st = JSON.stringify(data); netSocket.write(st); i++; @@ -92,55 +69,36 @@ function sequentialGETCalls(times){ makeCall(); } -sequentialSETCalls(5); - - -netSocket.on("data", (buffer)=>{ - const data = buffer.toString('utf8'); - console.log(data); -}) +function sequentialSETCalls(times) { + let i = 1; + for (let i = 0; i < times; i++) { + let data = { + task: "SET", + data: [ + { + command: { + key: "a" + i, + value: "" + i, + }, + }, + ], + }; -// send a message to the raft every 5 seconds -// setInterval(async () => { -// for (var i = 0; i < 5000; i++) { -// console.log("sending to socket with i: ", i); -// const data = { -// op: "SET", -// data: { -// key: i.toString(), -// value: i.toString(), -// }, -// }; -// sockPush.send(data, function (res) { -// console.log(`ack for SET: ${res}`); -// }); -// } + let st = JSON.stringify(data); + netSocket.write(st); + setTimeout(() => { + console.log("waiting 1 second", 1000); + }); + } + //make get calls after set calls + setTimeout(() => { + sequentialGETCalls(5); + }, 1000); +} -// // send 10 GET Commands -// for (var i = 0; i < 10; i++) { -// sockPush.send("GET", { key: i.toString() }, function (res) { -// console.log(`Response for GET: ${res}`); -// }); -// } -// }, 5000); +sequentialSETCalls(5); -// const g1 = { -// task: "GET", -// data: [{ -// command: { -// key : "a" -// } -// }] -// }; -// const g2 = { -// task: "GET", -// data: [{ -// command: { -// key : "b" -// } -// }] -// }; -// const gt1 = JSON.stringify(data); -// const gt2 = JSON.stringify(data2); -// netSocket.write(gt1); -// netSocket.write(gt2) +netSocket.on("data", (buffer) => { + const data = buffer.toString("utf8"); + console.log(data); +}); diff --git a/server.js b/server.js index cc95602..3bb5f03 100644 --- a/server.js +++ b/server.js @@ -34,97 +34,115 @@ server.on("connection", (socket) => { socket.write("Connected\n"); socket.on("data", async (pkt) => { - pkt = JSON.parse(pkt); - const { task, data } = pkt; - - if (raftNode.state === MsgRaft.LEADER) { - switch (task) { - case "SET": - // TODO: Test for async - try { - if (data && data.length > 0) { - let cmd = data[0].command; - cmd["type"] = "SET"; - await raftNode.command(cmd); - socket.write( - `${JSON.stringify(cmd)} - ack` - ); - } else { - throw new Error("Invalid data format"); + console.log("***************************************"); + console.log("***************************************"); + console.log("RECEIVED PACKET IS: "); + console.log(pkt.toString()); + console.log("***************************************"); + console.log("***************************************"); + pkt = pkt + .toString() + .split("\n") + .filter((str) => { + return str.trim() !== ""; + }) + .map((item) => { + return JSON.parse(item.trim()); + }); + console.log("#########################################"); + console.log("#########################################"); + console.log("RECEIVED PACKET AFTER SPLITTING IS: "); + console.log(pkt); + console.log("#########################################"); + console.log("#########################################"); + pkt.forEach(async (item) => { + console.log("item in for each: ", item); + const { task, data } = item; + if (raftNode.state === MsgRaft.LEADER) { + switch (task) { + case "SET": + // TODO: Test for async + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "SET"; + await raftNode.command(cmd); + socket.write(`${JSON.stringify(cmd)} - ack`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); } - } catch (e) { - console.log(e); - socket.write("error 2"); - } - break; - case "GET": - try { - if (data && data.length > 0) { - let cmd = data[0].command; - cmd["type"] = "GET"; - await raftNode.command(cmd); - let val = raftNode.db.get(cmd.key) - socket.write(`Value of key : ${cmd.key} is ${val}`); - } else { - throw new Error("Invalid data format"); + break; + case "GET": + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + await raftNode.command(cmd); + let val = raftNode.db.get(cmd.key); + socket.write(`Value of key : ${cmd.key} is ${val}`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); } - } catch (e) { - console.log(e); - socket.write("error 2"); - } - break; - case "EXIT": - raftNode.db.closeDb(); - break; - default: - socket.write("error 46"); - } - } else { - switch (task) { - case "SET": - let cmd = data[0].command; - cmd["type"] = "SET"; - let packet = await raftNode.packet("rpc", cmd); - // forward to leader - raftNode.message(MsgRaft.LEADER, packet, () => { - console.log( - "Forwarded the set command to leader since I am a follower." - ); - }); - socket.write( - `${JSON.stringify(cmd)} - ack` - ); - break; - case "GET": - try { - if (data && data.length > 0) { - let cmd = data[0].command; - cmd["type"] = "GET"; - let packet = await raftNode.packet("rpc", cmd); - raftNode.message(MsgRaft.LEADER, packet, () => { - console.log( - "Forwarded the set command to leader since I am a follower." - ); - }); - let val = raftNode.db.get(cmd.key) - socket.write(`Value of key : ${cmd.key} is ${val}`); - } else { - throw new Error("Invalid data format"); + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + socket.write("error 46"); + } + } else { + switch (task) { + case "SET": + let cmd = data[0].command; + cmd["type"] = "SET"; + let packet = await raftNode.packet("rpc", cmd); + // forward to leader + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); + socket.write(`${JSON.stringify(cmd)} - ack`); + break; + case "GET": + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + let packet = await raftNode.packet("rpc", cmd); + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); + let val = raftNode.db.get(cmd.key); + socket.write(`Value of key : ${cmd.key} is ${val}`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); } - } catch (e) { - console.log(e); - socket.write("error 2"); - } - break; - case "EXIT": - raftNode.db.closeDb(); - break; - default: - console.log("in default and task: ", task); - reply("error 90"); - break; + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + console.log("in default and task: ", task); + reply("error 90"); + break; + } } - } + }); }); // TODO: Figure out a way to connect sockPull to the socket received with server connection diff --git a/single-client.js b/single-client.js new file mode 100644 index 0000000..1a6d97c --- /dev/null +++ b/single-client.js @@ -0,0 +1,49 @@ +const argv = require("argh").argv; +const net = require("net"); + +let port = +argv.port || 8081; // read the port from command line arguments +var netSocket = net.createConnection({ port: port + 1000 }, () => { + console.log("connected to server at port", port + 1000); +}); + +let set = { + task: "SET", + data: [ + { + command: { + key: "key", + value: 1000, + }, + }, + ], +}; + +let get = { + task: "GET", + data: [ + { + command: { + key: "key_2", + }, + }, + ], +}; + +/** + * @link https://stackoverflow.com/questions/35054868/sending-socket-data-separately-in-node-js -- since this is a TCP stream hence we have to separate out the stuff. + */ +// write to socket +// for (let i = 0; i < 5; i++) { +// set.data[0].command.key = "key_" + i; +// set.data[0].command.value = i + ""; +// netSocket.write(JSON.stringify(set) + "\n"); +// } + +// setTimeout(() => { +netSocket.write(JSON.stringify(get)); +// }, 500); + +netSocket.on("data", (buffer) => { + const data = buffer.toString("utf8"); + console.log(data); +}); From 905c24593e22c19329d8d80aa6c11e805762c81c Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Wed, 13 Mar 2024 11:32:18 +0530 Subject: [PATCH 5/5] fix: working get set and node consensus --- db.js | 98 +++++++++++++++++++++++++----------------------- server.js | 1 + single-client.js | 22 ++++++----- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/db.js b/db.js index 7a9f325..5ad330c 100644 --- a/db.js +++ b/db.js @@ -2,62 +2,66 @@ * @description This file serves as the interface for storage engine of the KV Store */ -const lmdb = require('node-lmdb'); -const fs = require('fs') +const lmdb = require("node-lmdb"); +const fs = require("fs"); // TODO: Turn this into an interface to different storage engine -- try RocksDB on the lines of TiKV class LMDBManager { - constructor(path, mapSize, maxDbs) { - if(!fs.existsSync(path)) { - fs.mkdirSync(path, {recursive: true}); - } - this.env = new lmdb.Env(); - this.env.open({ - path: path || "./db", - mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default - maxDbs: maxDbs || 10 - }); - this.writeTxn = null; + constructor(path, mapSize, maxDbs) { + if (!fs.existsSync(path)) { + fs.mkdirSync(path, { recursive: true }); } + this.env = new lmdb.Env(); + this.env.open({ + path: path || "./db", + mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default + maxDbs: maxDbs || 10, + }); + this.writeTxn = null; + } - openDb(dbName) { - this.dbi = this.env.openDbi({ - name: dbName || "mydb", - create: true - }); - } - - closeDb() { - if(this.writeTxn) this.writeTxn.commit(); - this.dbi.close(); - } + openDb(dbName) { + this.dbi = this.env.openDbi({ + name: dbName || "mydb", + create: true, + }); + } - closeEnv() { - this.env.close(); - } + closeDb() { + if (this.writeTxn) this.writeTxn.commit(); + this.dbi.close(); + } - set(key, value) { - try { - if (!this.writeTxn) { - const txn = this.env.beginTxn(); - this.writeTxn = txn; - } - this.writeTxn.putString(this.dbi, key, value); - console.log('wrote', key, value); - } catch (e) { - console.error("Not a valid key", key, value); - } + closeEnv() { + this.env.close(); + } + set(key, value) { + try { + if (!this.writeTxn) { + const txn = this.env.beginTxn(); + this.writeTxn = txn; + } + this.writeTxn.putString(this.dbi, key, value); + console.log("wrote", key, value); + } catch (e) { + console.error("Not a valid key", key, value); } + } - get(key) { - // const txn = this.env.beginTxn({ readOnly: true }); - if (!this.writeTxn) { - const txn = this.env.beginTxn(); - this.writeTxn = txn; - } - const value = this.writeTxn.getString(this.dbi, key); - return value; + get(key) { + // const txn = this.env.beginTxn({ readOnly: true }); + try { + if (!this.writeTxn) { + const txn = this.env.beginTxn(); + this.writeTxn = txn; + } + const value = this.writeTxn.getString(this.dbi, key); + return value; + } catch (err) { + console.error(err); + return "error finding key"; } + } } module.exports = LMDBManager; @@ -77,4 +81,4 @@ console.log(dbManager.get(1)); // Output: Hello world! dbManager.closeDb(); dbManager.closeEnv(); -*/ \ No newline at end of file +*/ diff --git a/server.js b/server.js index 3bb5f03..6fd7115 100644 --- a/server.js +++ b/server.js @@ -43,6 +43,7 @@ server.on("connection", (socket) => { pkt = pkt .toString() .split("\n") + .map((str) => str.trim()) .filter((str) => { return str.trim() !== ""; }) diff --git a/single-client.js b/single-client.js index 1a6d97c..88b2b79 100644 --- a/single-client.js +++ b/single-client.js @@ -23,7 +23,7 @@ let get = { data: [ { command: { - key: "key_2", + key: "key_1", }, }, ], @@ -33,15 +33,19 @@ let get = { * @link https://stackoverflow.com/questions/35054868/sending-socket-data-separately-in-node-js -- since this is a TCP stream hence we have to separate out the stuff. */ // write to socket -// for (let i = 0; i < 5; i++) { -// set.data[0].command.key = "key_" + i; -// set.data[0].command.value = i + ""; -// netSocket.write(JSON.stringify(set) + "\n"); -// } +for (let i = 0; i < 5; i++) { + set.data[0].command.key = "key_" + i; + set.data[0].command.value = i + ""; + console.log("set obj: ", set); + netSocket.write(JSON.stringify(set) + "\n"); +} -// setTimeout(() => { -netSocket.write(JSON.stringify(get)); -// }, 500); +setTimeout(() => { + for (let i = 0; i < 5; i++) { + get.data[0].command.key = "key_" + i; + netSocket.write(JSON.stringify(get) + "\n"); + } +}, 1000); netSocket.on("data", (buffer) => { const data = buffer.toString("utf8");