diff --git a/client.js b/client.js index 50b04cc..a369978 100644 --- a/client.js +++ b/client.js @@ -3,32 +3,103 @@ */ const argv = require("argh").argv; var axon = require("axon"); +const net = require("net"); -var sockPush = axon.socket("req"); +// var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments +var netSocket = net.createConnection({ port: port + 1000 }, () => { + console.log("connected to server at port", port + 1000); +}); -sockPush.bind(port + 100); +// sockPush.connect(port + 1200); + + +// const ex = { +// task: "EXIT", +// data: [ +// { +// command: { +// key: "0", +// value: "7", +// }, +// }, +// ], +// }; +function sequentialCalls(times){ + let i = 1; + + function makeCall() { + // Your function or code that needs to be called sequentially + let data = { + task: "SET", + data: [ + { + command: { + key: "a" + i, + value: "" + i, + }, + }, + ], + }; + let st = JSON.stringify(data) + netSocket.write(st); + + i++; + + // Check if there are more calls to make + if (i <= times) { + // Set a timeout for 1 second before making the next call + setTimeout(makeCall, 1000); + } + } + + // Start the first call + makeCall(); +} + +sequentialCalls(5); // send a message to the raft every 5 seconds -setInterval(async () => { - for (var i = 0; i < 5000; i++) { - console.log("sending to socket with i: ", i); - const data = { - op: "SET", - data: { - key: i.toString(), - value: i.toString(), - }, - }; - sockPush.send(data, function (res) { - console.log(`ack for SET: ${res}`); - }); - } - - // send 10 GET Commands - for (var i = 0; i < 10; i++) { - sockPush.send("GET", { key: i.toString() }, function (res) { - console.log(`Response for GET: ${res}`); - }); - } -}, 5000); +// setInterval(async () => { +// for (var i = 0; i < 5000; i++) { +// console.log("sending to socket with i: ", i); +// const data = { +// op: "SET", +// data: { +// key: i.toString(), +// value: i.toString(), +// }, +// }; +// sockPush.send(data, function (res) { +// console.log(`ack for SET: ${res}`); +// }); +// } + +// // send 10 GET Commands +// for (var i = 0; i < 10; i++) { +// sockPush.send("GET", { key: i.toString() }, function (res) { +// console.log(`Response for GET: ${res}`); +// }); +// } +// }, 5000); + +// const g1 = { +// task: "GET", +// data: [{ +// command: { +// key : "a" +// } +// }] +// }; +// const g2 = { +// task: "GET", +// data: [{ +// command: { +// key : "b" +// } +// }] +// }; +// const gt1 = JSON.stringify(data); +// const gt2 = JSON.stringify(data2); +// netSocket.write(gt1); +// netSocket.write(gt2) diff --git a/proxy.js b/proxy.js index dcb5ac8..064cd1c 100644 --- a/proxy.js +++ b/proxy.js @@ -4,11 +4,6 @@ const net = require("net"); const { EventEmitter } = require("events"); -const { - executeSELECTQuery, - executeINSERTQuery, - executeDELETEQuery, -} = require("./queryExecuter"); class QueryQueue extends EventEmitter { constructor() { @@ -38,15 +33,7 @@ class QueryQueue extends EventEmitter { } async execute(query) { - if (query.toLowerCase().startsWith("select")) { - return await executeSELECTQuery(query); - } else if (query.toLowerCase().startsWith("insert into")) { - return await executeINSERTQuery(query); - } else if (query.toLowerCase().startsWith("delete from")) { - return await executeDELETEQuery(query); - } else { - throw new Error("Unsupported command"); - } + // TODO: Replace with KV GET SET Logic } } @@ -83,6 +70,6 @@ server.on("connection", (socket) => { }); }); -server.listen(5432, () => { - console.log("Server listening on port 5432"); +server.listen(6767, () => { + console.log("Server listening on port 6767"); }); diff --git a/raft-node.js b/raft-node.js index 7262094..6e4fc91 100644 --- a/raft-node.js +++ b/raft-node.js @@ -47,7 +47,7 @@ function candidate() { console.log("----------------------------------"); } -function onData(data) { +async function onData(data) { // TODO: Edit this to make perfect console.log( "From Raft 'on' data method", @@ -64,7 +64,7 @@ function onData(data) { try { // send acknowledgement if (arr && arr.length > 0) { - const ackPacket = raft.packet("append ack", arr[0].command); + const ackPacket = await raft.packet("append ack", arr[0].command); raft.message(MsgRaft.LEADER, ackPacket); } } catch (err) { diff --git a/raft/index.js b/raft/index.js index 01d075a..799af43 100644 --- a/raft/index.js +++ b/raft/index.js @@ -948,6 +948,7 @@ class Raft extends EventEmitter { async commitEntries(entries) { entries.forEach(async (entry) => { await this.log.commit(entry.index) + console.log("RAHUL entries commited in commit entries method", entry); this.emit('commit', entry.command); }); } diff --git a/server.js b/server.js index 9e7bb01..3ed1451 100644 --- a/server.js +++ b/server.js @@ -33,11 +33,69 @@ server.on("connection", (socket) => { socket.write("Connected\n"); - socket.on("data", (data) => { - const { type, args } = data; - console.log(`Received request of type ${type} with arguments ${args}`); - raftNode.command(data); + socket.on("data", async (pkt) => { + pkt = JSON.parse(pkt); + const { task, data } = pkt; + + if (raftNode.state === MsgRaft.LEADER) { + switch (task) { + case "SET": + // TODO: Test for async + try { + if (data && data.length > 0) { + let cmd = data[0].command; + await raftNode.command(cmd); + socket.write( + `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` + ); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); + } + break; + case "GET": + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + socket.write("error 46"); + } + } else { + switch (task) { + case "SET": + let packet = await raftNode.packet("append", data); + console.log("RAHUL received packet", packet, "as not leader in set"); + // forward to leader + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); + break; + case "GET": + // TODO: Test for async + console.log("Received a GET event on socket"); + // Implement round robin here based on current state of Raft + reply(raftNode.db.get(data.key)); + // if (raft.state !== MsgRaft.LEADER) { + // reply(raft.db.get(data.key)); + // } + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + console.log("in default and task: ", task); + reply("error 90"); + break; + } + } }); + // TODO: Figure out a way to connect sockPull to the socket received with server connection sockPull.connect(port + 100); @@ -54,7 +112,7 @@ server.on("connection", (socket) => { console.log("Nodes", raftNode.nodes); try { console.log("Inside SET"); - await raft.command(data); + await raftNode.command(data); reply(`${JSON.stringify(data)} - ack, ${raftNode.log.length}`); } catch (e) { console.log(e); @@ -67,20 +125,21 @@ server.on("connection", (socket) => { } else { switch (task) { case "SET": - debug("Received a SET event on socket"); + console.log("Received a SET event on socket"); // forward to leader raftNode.message( MsgRaft.LEADER, - MsgRaft.packet("append ack", JSON.stringify(task)), + raftNode.packet("append ack", JSON.stringify(task)), () => { console.log( "Forwarded the set command to leader since I am a follower." ); } ); + break; case "GET": // TODO: Test for async - debug("Received a GET event on socket"); + console.log("Received a GET event on socket"); // Implement round robin here based on current state of Raft reply(raftNode.db.get(data.key)); // if (raft.state !== MsgRaft.LEADER) { @@ -101,7 +160,8 @@ server.on("connection", (socket) => { }); }); -server.listen(port + 1000, () => { +let raftNodeServerPort = port + 1000; +server.listen(raftNodeServerPort, () => { // raftNode is initialised whenever the server starts to listen // initiaise the node raftNode = registerNode(port, { @@ -116,35 +176,7 @@ server.listen(port + 1000, () => { raftNode.join("tcp://0.0.0.0:" + nr); }); - console.log(`Initialsied raft node and server on port ${port}`); - - // send a message to the raft every 5 seconds - // setInterval(async () => { - // if (raftNode.state === MsgRaft.LEADER) { - // for (var i = 0; i < 5000; i++) { - // const command = { - // command: "SET", - // data: { - // key: i.toString(), - // value: i.toString(), - // }, - // }; - // await raftNode.command(command); - // } - // // sockPush.send('SET', { - // // 'key': i.toString(), 'value': i.toString() - // // }, function (res) { - // // console.log(`ack for SET: ${res}`); - // // }); - // } - - // // for (var i = 0; i < 10; i++) { - // // sockPush.send('GET', { 'key': i.toString() }, function (res) { - // // console.log(`Response for GET: ${res}`); - // // }); - // // } - // // raft.message(MsgRaft.LEADER, { foo: 'bar' }, () => { - // // console.log('message sent'); - // // }); - // }, 5000); + console.log( + `Initialsied raft node at socket ${port} and raft node server on port ${raftNodeServerPort}` + ); });