From d89ebd82a9ed21db20feb600d61ecefdc29f2af3 Mon Sep 17 00:00:00 2001 From: Yash Mittal Date: Tue, 27 Feb 2024 15:59:54 +0530 Subject: [PATCH 1/5] feat(wip): figure out get set --- client.js | 2 +- proxy.js | 19 +++---------------- server.js | 37 +++++-------------------------------- 3 files changed, 9 insertions(+), 49 deletions(-) diff --git a/client.js b/client.js index 50b04cc..7764ac4 100644 --- a/client.js +++ b/client.js @@ -7,7 +7,7 @@ var axon = require("axon"); var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments -sockPush.bind(port + 100); +sockPush.connect(port + 1000); // send a message to the raft every 5 seconds setInterval(async () => { diff --git a/proxy.js b/proxy.js index dcb5ac8..064cd1c 100644 --- a/proxy.js +++ b/proxy.js @@ -4,11 +4,6 @@ const net = require("net"); const { EventEmitter } = require("events"); -const { - executeSELECTQuery, - executeINSERTQuery, - executeDELETEQuery, -} = require("./queryExecuter"); class QueryQueue extends EventEmitter { constructor() { @@ -38,15 +33,7 @@ class QueryQueue extends EventEmitter { } async execute(query) { - if (query.toLowerCase().startsWith("select")) { - return await executeSELECTQuery(query); - } else if (query.toLowerCase().startsWith("insert into")) { - return await executeINSERTQuery(query); - } else if (query.toLowerCase().startsWith("delete from")) { - return await executeDELETEQuery(query); - } else { - throw new Error("Unsupported command"); - } + // TODO: Replace with KV GET SET Logic } } @@ -83,6 +70,6 @@ server.on("connection", (socket) => { }); }); -server.listen(5432, () => { - console.log("Server listening on port 5432"); +server.listen(6767, () => { + console.log("Server listening on port 6767"); }); diff --git a/server.js b/server.js index 9e7bb01..6117417 100644 --- a/server.js +++ b/server.js @@ -101,7 +101,8 @@ server.on("connection", (socket) => { }); }); -server.listen(port + 1000, () => { +let raftNodeServerPort = port + 1000; +server.listen(raftNodeServerPort, () => { // raftNode is initialised whenever the server starts to listen // initiaise the node raftNode = registerNode(port, { @@ -116,35 +117,7 @@ server.listen(port + 1000, () => { raftNode.join("tcp://0.0.0.0:" + nr); }); - console.log(`Initialsied raft node and server on port ${port}`); - - // send a message to the raft every 5 seconds - // setInterval(async () => { - // if (raftNode.state === MsgRaft.LEADER) { - // for (var i = 0; i < 5000; i++) { - // const command = { - // command: "SET", - // data: { - // key: i.toString(), - // value: i.toString(), - // }, - // }; - // await raftNode.command(command); - // } - // // sockPush.send('SET', { - // // 'key': i.toString(), 'value': i.toString() - // // }, function (res) { - // // console.log(`ack for SET: ${res}`); - // // }); - // } - - // // for (var i = 0; i < 10; i++) { - // // sockPush.send('GET', { 'key': i.toString() }, function (res) { - // // console.log(`Response for GET: ${res}`); - // // }); - // // } - // // raft.message(MsgRaft.LEADER, { foo: 'bar' }, () => { - // // console.log('message sent'); - // // }); - // }, 5000); + console.log( + `Initialsied raft node at socket ${port} and raft node server on port ${raftNodeServerPort}` + ); }); From 074f22c485d4a9d60b847245cb021f106748aba6 Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Tue, 27 Feb 2024 22:19:14 +0530 Subject: [PATCH 2/5] Initial configuration of set --- client.js | 59 ++++++++++++++++++++++++++++++-------------------- server.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 93 insertions(+), 31 deletions(-) diff --git a/client.js b/client.js index 7764ac4..dc336e9 100644 --- a/client.js +++ b/client.js @@ -3,32 +3,45 @@ */ const argv = require("argh").argv; var axon = require("axon"); +const net = require('net'); -var sockPush = axon.socket("req"); +// var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments +var netSocket = net.createConnection({port: port + 1000}, ()=>{ + console.log("connected to server at port", port + 1000); +}); -sockPush.connect(port + 1000); +// sockPush.connect(port + 1200); + +const data = { + task: "SET", + data: [{ + command: "SET a = 26" + }] +}; +const st = JSON.stringify(data); +netSocket.write(st); // send a message to the raft every 5 seconds -setInterval(async () => { - for (var i = 0; i < 5000; i++) { - console.log("sending to socket with i: ", i); - const data = { - op: "SET", - data: { - key: i.toString(), - value: i.toString(), - }, - }; - sockPush.send(data, function (res) { - console.log(`ack for SET: ${res}`); - }); - } +// setInterval(async () => { +// for (var i = 0; i < 5000; i++) { +// console.log("sending to socket with i: ", i); +// const data = { +// op: "SET", +// data: { +// key: i.toString(), +// value: i.toString(), +// }, +// }; +// sockPush.send(data, function (res) { +// console.log(`ack for SET: ${res}`); +// }); +// } - // send 10 GET Commands - for (var i = 0; i < 10; i++) { - sockPush.send("GET", { key: i.toString() }, function (res) { - console.log(`Response for GET: ${res}`); - }); - } -}, 5000); +// // send 10 GET Commands +// for (var i = 0; i < 10; i++) { +// sockPush.send("GET", { key: i.toString() }, function (res) { +// console.log(`Response for GET: ${res}`); +// }); +// } +// }, 5000); diff --git a/server.js b/server.js index 6117417..9fbb23a 100644 --- a/server.js +++ b/server.js @@ -33,11 +33,59 @@ server.on("connection", (socket) => { socket.write("Connected\n"); - socket.on("data", (data) => { - const { type, args } = data; - console.log(`Received request of type ${type} with arguments ${args}`); - raftNode.command(data); + socket.on("data", async (pkt) => { + pkt = JSON.parse(pkt); + const { task, data } = pkt; + + if (raftNode.state === MsgRaft.LEADER) { + switch (task) { + case "SET": + // TODO: Test for async + try { + await raftNode.command(data); + socket.write(`${JSON.stringify(data)} - ack, ${raftNode.log.length}`); + } catch (e) { + console.log(e); + socket.write("error 2"); + } + break; + default: + socket.write("error 46"); + } + } else { + + switch (task) { + case "SET": + let packet = await raftNode.packet("append ack", data); + // forward to leader + raftNode.message( + MsgRaft.LEADER, + packet, + () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + } + ); + break; + case "GET": + // TODO: Test for async + console.log("Received a GET event on socket"); + // Implement round robin here based on current state of Raft + reply(raftNode.db.get(data.key)); + // if (raft.state !== MsgRaft.LEADER) { + // reply(raft.db.get(data.key)); + // } + break; + default: + console.log("in default and task: ", task); + reply("error 90"); + break; + } + } + }); + // TODO: Figure out a way to connect sockPull to the socket received with server connection sockPull.connect(port + 100); @@ -54,7 +102,7 @@ server.on("connection", (socket) => { console.log("Nodes", raftNode.nodes); try { console.log("Inside SET"); - await raft.command(data); + await raftNode.command(data); reply(`${JSON.stringify(data)} - ack, ${raftNode.log.length}`); } catch (e) { console.log(e); @@ -67,20 +115,21 @@ server.on("connection", (socket) => { } else { switch (task) { case "SET": - debug("Received a SET event on socket"); + console.log("Received a SET event on socket"); // forward to leader raftNode.message( MsgRaft.LEADER, - MsgRaft.packet("append ack", JSON.stringify(task)), + raftNode.packet("append ack", JSON.stringify(task)), () => { console.log( "Forwarded the set command to leader since I am a follower." ); } ); + break; case "GET": // TODO: Test for async - debug("Received a GET event on socket"); + console.log("Received a GET event on socket"); // Implement round robin here based on current state of Raft reply(raftNode.db.get(data.key)); // if (raft.state !== MsgRaft.LEADER) { From 29025c3878cef3b334a1f2e109b0b296c6579f46 Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Wed, 28 Feb 2024 11:02:35 +0530 Subject: [PATCH 3/5] set resolve errors --- client.js | 37 ++++++++++++++++++++++++++++++++++++- raft-node.js | 4 ++-- raft/index.js | 1 + server.js | 13 ++++++++++--- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/client.js b/client.js index dc336e9..14ebd4d 100644 --- a/client.js +++ b/client.js @@ -16,11 +16,25 @@ var netSocket = net.createConnection({port: port + 1000}, ()=>{ const data = { task: "SET", data: [{ - command: "SET a = 26" + command: { + key : "a", + value: "26" + } + }] +}; +const data2 = { + task: "SET", + data: [{ + command: { + key : "b", + value: "27" + } }] }; const st = JSON.stringify(data); +const st2 = JSON.stringify(data2); netSocket.write(st); +netSocket.write(st2); // send a message to the raft every 5 seconds // setInterval(async () => { @@ -45,3 +59,24 @@ netSocket.write(st); // }); // } // }, 5000); + +const g1 = { + task: "GET", + data: [{ + command: { + key : "a" + } + }] +}; +const g2 = { + task: "GET", + data: [{ + command: { + key : "b" + } + }] +}; +const gt1 = JSON.stringify(data); +const gt2 = JSON.stringify(data2); +netSocket.write(gt1); +netSocket.write(gt2) \ No newline at end of file diff --git a/raft-node.js b/raft-node.js index 7262094..6e4fc91 100644 --- a/raft-node.js +++ b/raft-node.js @@ -47,7 +47,7 @@ function candidate() { console.log("----------------------------------"); } -function onData(data) { +async function onData(data) { // TODO: Edit this to make perfect console.log( "From Raft 'on' data method", @@ -64,7 +64,7 @@ function onData(data) { try { // send acknowledgement if (arr && arr.length > 0) { - const ackPacket = raft.packet("append ack", arr[0].command); + const ackPacket = await raft.packet("append ack", arr[0].command); raft.message(MsgRaft.LEADER, ackPacket); } } catch (err) { diff --git a/raft/index.js b/raft/index.js index 01d075a..799af43 100644 --- a/raft/index.js +++ b/raft/index.js @@ -948,6 +948,7 @@ class Raft extends EventEmitter { async commitEntries(entries) { entries.forEach(async (entry) => { await this.log.commit(entry.index) + console.log("RAHUL entries commited in commit entries method", entry); this.emit('commit', entry.command); }); } diff --git a/server.js b/server.js index 9fbb23a..4a4fb87 100644 --- a/server.js +++ b/server.js @@ -42,8 +42,14 @@ server.on("connection", (socket) => { case "SET": // TODO: Test for async try { - await raftNode.command(data); - socket.write(`${JSON.stringify(data)} - ack, ${raftNode.log.length}`); + if (data && data.length > 0){ + let cmd = data[0].command + await raftNode.command(cmd); + socket.write(`${JSON.stringify(cmd)} - ack, ${raftNode.log.length}`); + }else { + throw new Error("Invalid data format"); + } + } catch (e) { console.log(e); socket.write("error 2"); @@ -56,7 +62,8 @@ server.on("connection", (socket) => { switch (task) { case "SET": - let packet = await raftNode.packet("append ack", data); + let packet = await raftNode.packet("append", data); + console.log("RAHUL received packet", packet, "as not leader in set"); // forward to leader raftNode.message( MsgRaft.LEADER, From d9cc3755fc2d1a5ed4bf0fd0efd999f251b0dfdd Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Wed, 28 Feb 2024 11:37:55 +0530 Subject: [PATCH 4/5] Closing the db on EXIT command --- client.js | 90 +++++++++++++++++++++++++++++++++---------------------- server.js | 35 ++++++++++++---------- 2 files changed, 74 insertions(+), 51 deletions(-) diff --git a/client.js b/client.js index 14ebd4d..0d825c2 100644 --- a/client.js +++ b/client.js @@ -3,11 +3,11 @@ */ const argv = require("argh").argv; var axon = require("axon"); -const net = require('net'); +const net = require("net"); // var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments -var netSocket = net.createConnection({port: port + 1000}, ()=>{ +var netSocket = net.createConnection({ port: port + 1000 }, () => { console.log("connected to server at port", port + 1000); }); @@ -15,26 +15,46 @@ var netSocket = net.createConnection({port: port + 1000}, ()=>{ const data = { task: "SET", - data: [{ - command: { - key : "a", - value: "26" - } - }] + data: [ + { + command: { + key: "a", + value: "26", + }, + }, + ], }; const data2 = { task: "SET", - data: [{ - command: { - key : "b", - value: "27" - } - }] + data: [ + { + command: { + key: "b", + value: "27", + }, + }, + ], +}; + +const ex = { + task: "EXIT", + data: [ + { + command: { + key: "0", + value: "7", + }, + }, + ], }; const st = JSON.stringify(data); const st2 = JSON.stringify(data2); +const ext = JSON.stringify(ex); netSocket.write(st); -netSocket.write(st2); +// netSocket.write(st2, 'utf-8'); +setTimeout(() => { + netSocket.write(ext); +}, 1000); // send a message to the raft every 5 seconds // setInterval(async () => { @@ -60,23 +80,23 @@ netSocket.write(st2); // } // }, 5000); -const g1 = { - task: "GET", - data: [{ - command: { - key : "a" - } - }] -}; -const g2 = { - task: "GET", - data: [{ - command: { - key : "b" - } - }] -}; -const gt1 = JSON.stringify(data); -const gt2 = JSON.stringify(data2); -netSocket.write(gt1); -netSocket.write(gt2) \ No newline at end of file +// const g1 = { +// task: "GET", +// data: [{ +// command: { +// key : "a" +// } +// }] +// }; +// const g2 = { +// task: "GET", +// data: [{ +// command: { +// key : "b" +// } +// }] +// }; +// const gt1 = JSON.stringify(data); +// const gt2 = JSON.stringify(data2); +// netSocket.write(gt1); +// netSocket.write(gt2) diff --git a/server.js b/server.js index 4a4fb87..3ed1451 100644 --- a/server.js +++ b/server.js @@ -42,38 +42,39 @@ server.on("connection", (socket) => { case "SET": // TODO: Test for async try { - if (data && data.length > 0){ - let cmd = data[0].command + if (data && data.length > 0) { + let cmd = data[0].command; await raftNode.command(cmd); - socket.write(`${JSON.stringify(cmd)} - ack, ${raftNode.log.length}`); - }else { + socket.write( + `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` + ); + } else { throw new Error("Invalid data format"); } - } catch (e) { console.log(e); socket.write("error 2"); } break; + case "GET": + break; + case "EXIT": + raftNode.db.closeDb(); + break; default: socket.write("error 46"); } } else { - switch (task) { case "SET": let packet = await raftNode.packet("append", data); console.log("RAHUL received packet", packet, "as not leader in set"); // forward to leader - raftNode.message( - MsgRaft.LEADER, - packet, - () => { - console.log( - "Forwarded the set command to leader since I am a follower." - ); - } - ); + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); break; case "GET": // TODO: Test for async @@ -84,13 +85,15 @@ server.on("connection", (socket) => { // reply(raft.db.get(data.key)); // } break; + case "EXIT": + raftNode.db.closeDb(); + break; default: console.log("in default and task: ", task); reply("error 90"); break; } } - }); // TODO: Figure out a way to connect sockPull to the socket received with server connection From f00c25426700784a5845380c0582bca087a93962 Mon Sep 17 00:00:00 2001 From: Rahul Shrimali Date: Wed, 28 Feb 2024 12:28:07 +0530 Subject: [PATCH 5/5] sequential calls for set --- client.js | 85 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/client.js b/client.js index 0d825c2..a369978 100644 --- a/client.js +++ b/client.js @@ -13,48 +13,51 @@ var netSocket = net.createConnection({ port: port + 1000 }, () => { // sockPush.connect(port + 1200); -const data = { - task: "SET", - data: [ - { - command: { - key: "a", - value: "26", - }, - }, - ], -}; -const data2 = { - task: "SET", - data: [ - { - command: { - key: "b", - value: "27", - }, - }, - ], -}; -const ex = { - task: "EXIT", - data: [ - { - command: { - key: "0", - value: "7", - }, - }, - ], -}; -const st = JSON.stringify(data); -const st2 = JSON.stringify(data2); -const ext = JSON.stringify(ex); -netSocket.write(st); -// netSocket.write(st2, 'utf-8'); -setTimeout(() => { - netSocket.write(ext); -}, 1000); +// const ex = { +// task: "EXIT", +// data: [ +// { +// command: { +// key: "0", +// value: "7", +// }, +// }, +// ], +// }; +function sequentialCalls(times){ + let i = 1; + + function makeCall() { + // Your function or code that needs to be called sequentially + let data = { + task: "SET", + data: [ + { + command: { + key: "a" + i, + value: "" + i, + }, + }, + ], + }; + let st = JSON.stringify(data) + netSocket.write(st); + + i++; + + // Check if there are more calls to make + if (i <= times) { + // Set a timeout for 1 second before making the next call + setTimeout(makeCall, 1000); + } + } + + // Start the first call + makeCall(); +} + +sequentialCalls(5); // send a message to the raft every 5 seconds // setInterval(async () => {