diff --git a/axon-client.js b/axon-client.js new file mode 100644 index 0000000..16ba417 --- /dev/null +++ b/axon-client.js @@ -0,0 +1,57 @@ +const axon = require("axon"); +const argv = require("argh").argv; + +// Create a Push socket +const socket = axon.socket("push"); + +let port = +argv.port || 8081; +// Connect to the socket +socket.connect(port); // Replace 8000 with the port you want to connect to + +// Data to send +const setMessages = createEventObjects(5, "SET"); +const getMessages = createEventObjects(5, "GET"); + +// Iterate through the messages and send each one +for (const message of setMessages) { + console.log("sending message: ", message); + socket.send(JSON.stringify(message)); +} + +// WAIT for a second and send GET messages +setTimeout(() => { + for (const message of getMessages) { + console.log("sending message: ", message); + socket.send(JSON.stringify(message)); + } +}, 1000); + +// Close the socket when done (optional) +socket.close(); + +/** + * + * @param {number} len length of the array required + * @param {string} op operation to be performed + * @returns + */ +function createEventObjects(len, op) { + const items = []; + for (let i = 1; i <= len; i++) { + let data = { + task: op, + data: [ + { + command: { + key: "key_" + i, + value: i + "", + }, + }, + ], + }; + if (op === "GET") delete data.data[0].command.value; + items.push(data); + } + + return items; +} diff --git a/client.js b/client.js index a369978..75c0af2 100644 --- a/client.js +++ b/client.js @@ -2,104 +2,103 @@ * this file serves as client to connect to the server raft nodes */ const argv = require("argh").argv; -var axon = require("axon"); const net = require("net"); -// var sockPush = axon.socket("req"); let port = +argv.port || 8081; // read the port from command line arguments var netSocket = net.createConnection({ port: port + 1000 }, () => { console.log("connected to server at port", port + 1000); }); -// sockPush.connect(port + 1200); +const sendEvent = (op, data) => { + let data = { + task: op, + data: data, + }; + netSocket.write(JSON.stringify(data)); +}; -// const ex = { -// task: "EXIT", -// data: [ -// { -// command: { -// key: "0", -// value: "7", -// }, -// }, -// ], -// }; -function sequentialCalls(times){ - let i = 1; - - function makeCall() { - // Your function or code that needs to be called sequentially - let data = { - task: "SET", - data: [ - { - command: { - key: "a" + i, - value: "" + i, - }, +let i = 1; +(() => { + setInterval(() => { + // SEND 5 SEQUENTIAL SETS + setTimeout( + sendEvent("GET", [ + { + command: { + key: "a" + i, + value: "" + i, }, - ], - }; - let st = JSON.stringify(data) - netSocket.write(st); - - i++; - - // Check if there are more calls to make - if (i <= times) { - // Set a timeout for 1 second before making the next call - setTimeout(makeCall, 1000); - } + }, + ]), + 500 + ); + // SEND 5 SEQUENTIAL GETS + i++; + }, 1000); +})(); + +function sequentialGETCalls(times) { + let i = 1; + + function makeCall() { + // Your function or code that needs to be called sequentially + let data = { + task: "GET", + data: [ + { + command: { + key: "a" + i, + }, + }, + ], + }; + let st = JSON.stringify(data); + netSocket.write(st); + + i++; + + // Check if there are more calls to make + if (i <= times) { + // Set a timeout for 1 second before making the next call + setTimeout(makeCall, 1000); } - - // Start the first call - makeCall(); + } + + // Start the first call + makeCall(); } -sequentialCalls(5); +function sequentialSETCalls(times) { + let i = 1; + for (let i = 0; i < times; i++) { + let data = { + task: "SET", + data: [ + { + command: { + key: "a" + i, + value: "" + i, + }, + }, + ], + }; -// send a message to the raft every 5 seconds -// setInterval(async () => { -// for (var i = 0; i < 5000; i++) { -// console.log("sending to socket with i: ", i); -// const data = { -// op: "SET", -// data: { -// key: i.toString(), -// value: i.toString(), -// }, -// }; -// sockPush.send(data, function (res) { -// console.log(`ack for SET: ${res}`); -// }); -// } + let st = JSON.stringify(data); + netSocket.write(st); + setTimeout(() => { + console.log("waiting 1 second", 1000); + }); + } + //make get calls after set calls + setTimeout(() => { + sequentialGETCalls(5); + }, 1000); +} -// // send 10 GET Commands -// for (var i = 0; i < 10; i++) { -// sockPush.send("GET", { key: i.toString() }, function (res) { -// console.log(`Response for GET: ${res}`); -// }); -// } -// }, 5000); +sequentialSETCalls(5); -// const g1 = { -// task: "GET", -// data: [{ -// command: { -// key : "a" -// } -// }] -// }; -// const g2 = { -// task: "GET", -// data: [{ -// command: { -// key : "b" -// } -// }] -// }; -// const gt1 = JSON.stringify(data); -// const gt2 = JSON.stringify(data2); -// netSocket.write(gt1); -// netSocket.write(gt2) +netSocket.on("data", (buffer) => { + const data = buffer.toString("utf8"); + console.log(data); +}); diff --git a/db.js b/db.js index 7a9f325..5ad330c 100644 --- a/db.js +++ b/db.js @@ -2,62 +2,66 @@ * @description This file serves as the interface for storage engine of the KV Store */ -const lmdb = require('node-lmdb'); -const fs = require('fs') +const lmdb = require("node-lmdb"); +const fs = require("fs"); // TODO: Turn this into an interface to different storage engine -- try RocksDB on the lines of TiKV class LMDBManager { - constructor(path, mapSize, maxDbs) { - if(!fs.existsSync(path)) { - fs.mkdirSync(path, {recursive: true}); - } - this.env = new lmdb.Env(); - this.env.open({ - path: path || "./db", - mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default - maxDbs: maxDbs || 10 - }); - this.writeTxn = null; + constructor(path, mapSize, maxDbs) { + if (!fs.existsSync(path)) { + fs.mkdirSync(path, { recursive: true }); } + this.env = new lmdb.Env(); + this.env.open({ + path: path || "./db", + mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default + maxDbs: maxDbs || 10, + }); + this.writeTxn = null; + } - openDb(dbName) { - this.dbi = this.env.openDbi({ - name: dbName || "mydb", - create: true - }); - } - - closeDb() { - if(this.writeTxn) this.writeTxn.commit(); - this.dbi.close(); - } + openDb(dbName) { + this.dbi = this.env.openDbi({ + name: dbName || "mydb", + create: true, + }); + } - closeEnv() { - this.env.close(); - } + closeDb() { + if (this.writeTxn) this.writeTxn.commit(); + this.dbi.close(); + } - set(key, value) { - try { - if (!this.writeTxn) { - const txn = this.env.beginTxn(); - this.writeTxn = txn; - } - this.writeTxn.putString(this.dbi, key, value); - console.log('wrote', key, value); - } catch (e) { - console.error("Not a valid key", key, value); - } + closeEnv() { + this.env.close(); + } + set(key, value) { + try { + if (!this.writeTxn) { + const txn = this.env.beginTxn(); + this.writeTxn = txn; + } + this.writeTxn.putString(this.dbi, key, value); + console.log("wrote", key, value); + } catch (e) { + console.error("Not a valid key", key, value); } + } - get(key) { - // const txn = this.env.beginTxn({ readOnly: true }); - if (!this.writeTxn) { - const txn = this.env.beginTxn(); - this.writeTxn = txn; - } - const value = this.writeTxn.getString(this.dbi, key); - return value; + get(key) { + // const txn = this.env.beginTxn({ readOnly: true }); + try { + if (!this.writeTxn) { + const txn = this.env.beginTxn(); + this.writeTxn = txn; + } + const value = this.writeTxn.getString(this.dbi, key); + return value; + } catch (err) { + console.error(err); + return "error finding key"; } + } } module.exports = LMDBManager; @@ -77,4 +81,4 @@ console.log(dbManager.get(1)); // Output: Hello world! dbManager.closeDb(); dbManager.closeEnv(); -*/ \ No newline at end of file +*/ diff --git a/raft-node.js b/raft-node.js index 6e4fc91..701bf8f 100644 --- a/raft-node.js +++ b/raft-node.js @@ -76,8 +76,14 @@ async function onData(data) { function onCommit(command) { // TODO: Edit this to make perfect console.log("Inside commit", command); - raft.db.set(command.key, command.value); - console.log("Committed", command.key, command.value); + let val; + if (command.type === "SET") { + raft.db.set(command.key, command.value); + console.log("Committed", command.key, command.value); + } else if (command.type === "GET") { + val = raft.db.get(command.key); + console.log("got val in GET: ", val); + } } // main function to create and return the instance diff --git a/raft/index.js b/raft/index.js index 799af43..01d075a 100644 --- a/raft/index.js +++ b/raft/index.js @@ -948,7 +948,6 @@ class Raft extends EventEmitter { async commitEntries(entries) { entries.forEach(async (entry) => { await this.log.commit(entry.index) - console.log("RAHUL entries commited in commit entries method", entry); this.emit('commit', entry.command); }); } diff --git a/server.js b/server.js index 3ed1451..6fd7115 100644 --- a/server.js +++ b/server.js @@ -34,66 +34,116 @@ server.on("connection", (socket) => { socket.write("Connected\n"); socket.on("data", async (pkt) => { - pkt = JSON.parse(pkt); - const { task, data } = pkt; - - if (raftNode.state === MsgRaft.LEADER) { - switch (task) { - case "SET": - // TODO: Test for async - try { - if (data && data.length > 0) { - let cmd = data[0].command; - await raftNode.command(cmd); - socket.write( - `${JSON.stringify(cmd)} - ack, ${raftNode.log.length}` + console.log("***************************************"); + console.log("***************************************"); + console.log("RECEIVED PACKET IS: "); + console.log(pkt.toString()); + console.log("***************************************"); + console.log("***************************************"); + pkt = pkt + .toString() + .split("\n") + .map((str) => str.trim()) + .filter((str) => { + return str.trim() !== ""; + }) + .map((item) => { + return JSON.parse(item.trim()); + }); + console.log("#########################################"); + console.log("#########################################"); + console.log("RECEIVED PACKET AFTER SPLITTING IS: "); + console.log(pkt); + console.log("#########################################"); + console.log("#########################################"); + pkt.forEach(async (item) => { + console.log("item in for each: ", item); + const { task, data } = item; + if (raftNode.state === MsgRaft.LEADER) { + switch (task) { + case "SET": + // TODO: Test for async + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "SET"; + await raftNode.command(cmd); + socket.write(`${JSON.stringify(cmd)} - ack`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); + } + break; + case "GET": + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + await raftNode.command(cmd); + let val = raftNode.db.get(cmd.key); + socket.write(`Value of key : ${cmd.key} is ${val}`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); + } + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + socket.write("error 46"); + } + } else { + switch (task) { + case "SET": + let cmd = data[0].command; + cmd["type"] = "SET"; + let packet = await raftNode.packet("rpc", cmd); + // forward to leader + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." ); - } else { - throw new Error("Invalid data format"); + }); + socket.write(`${JSON.stringify(cmd)} - ack`); + break; + case "GET": + try { + if (data && data.length > 0) { + let cmd = data[0].command; + cmd["type"] = "GET"; + let packet = await raftNode.packet("rpc", cmd); + raftNode.message(MsgRaft.LEADER, packet, () => { + console.log( + "Forwarded the set command to leader since I am a follower." + ); + }); + let val = raftNode.db.get(cmd.key); + socket.write(`Value of key : ${cmd.key} is ${val}`); + } else { + throw new Error("Invalid data format"); + } + } catch (e) { + console.log(e); + socket.write("error 2"); } - } catch (e) { - console.log(e); - socket.write("error 2"); - } - break; - case "GET": - break; - case "EXIT": - raftNode.db.closeDb(); - break; - default: - socket.write("error 46"); + break; + case "EXIT": + raftNode.db.closeDb(); + break; + default: + console.log("in default and task: ", task); + reply("error 90"); + break; + } } - } else { - switch (task) { - case "SET": - let packet = await raftNode.packet("append", data); - console.log("RAHUL received packet", packet, "as not leader in set"); - // forward to leader - raftNode.message(MsgRaft.LEADER, packet, () => { - console.log( - "Forwarded the set command to leader since I am a follower." - ); - }); - break; - case "GET": - // TODO: Test for async - console.log("Received a GET event on socket"); - // Implement round robin here based on current state of Raft - reply(raftNode.db.get(data.key)); - // if (raft.state !== MsgRaft.LEADER) { - // reply(raft.db.get(data.key)); - // } - break; - case "EXIT": - raftNode.db.closeDb(); - break; - default: - console.log("in default and task: ", task); - reply("error 90"); - break; - } - } + }); }); // TODO: Figure out a way to connect sockPull to the socket received with server connection diff --git a/single-client.js b/single-client.js new file mode 100644 index 0000000..88b2b79 --- /dev/null +++ b/single-client.js @@ -0,0 +1,53 @@ +const argv = require("argh").argv; +const net = require("net"); + +let port = +argv.port || 8081; // read the port from command line arguments +var netSocket = net.createConnection({ port: port + 1000 }, () => { + console.log("connected to server at port", port + 1000); +}); + +let set = { + task: "SET", + data: [ + { + command: { + key: "key", + value: 1000, + }, + }, + ], +}; + +let get = { + task: "GET", + data: [ + { + command: { + key: "key_1", + }, + }, + ], +}; + +/** + * @link https://stackoverflow.com/questions/35054868/sending-socket-data-separately-in-node-js -- since this is a TCP stream hence we have to separate out the stuff. + */ +// write to socket +for (let i = 0; i < 5; i++) { + set.data[0].command.key = "key_" + i; + set.data[0].command.value = i + ""; + console.log("set obj: ", set); + netSocket.write(JSON.stringify(set) + "\n"); +} + +setTimeout(() => { + for (let i = 0; i < 5; i++) { + get.data[0].command.key = "key_" + i; + netSocket.write(JSON.stringify(get) + "\n"); + } +}, 1000); + +netSocket.on("data", (buffer) => { + const data = buffer.toString("utf8"); + console.log(data); +});