Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 95 additions & 24 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,103 @@
*/
const argv = require("argh").argv;
var axon = require("axon");
const net = require("net");

var sockPush = axon.socket("req");
// var sockPush = axon.socket("req");
let port = +argv.port || 8081; // read the port from command line arguments
var netSocket = net.createConnection({ port: port + 1000 }, () => {
console.log("connected to server at port", port + 1000);
});

sockPush.bind(port + 100);
// sockPush.connect(port + 1200);


// const ex = {
// task: "EXIT",
// data: [
// {
// command: {
// key: "0",
// value: "7",
// },
// },
// ],
// };
function sequentialCalls(times){
let i = 1;

function makeCall() {
// Your function or code that needs to be called sequentially
let data = {
task: "SET",
data: [
{
command: {
key: "a" + i,
value: "" + i,
},
},
],
};
let st = JSON.stringify(data)
netSocket.write(st);

i++;

// Check if there are more calls to make
if (i <= times) {
// Set a timeout for 1 second before making the next call
setTimeout(makeCall, 1000);
}
}

// Start the first call
makeCall();
}

sequentialCalls(5);

// send a message to the raft every 5 seconds
setInterval(async () => {
for (var i = 0; i < 5000; i++) {
console.log("sending to socket with i: ", i);
const data = {
op: "SET",
data: {
key: i.toString(),
value: i.toString(),
},
};
sockPush.send(data, function (res) {
console.log(`ack for SET: ${res}`);
});
}

// send 10 GET Commands
for (var i = 0; i < 10; i++) {
sockPush.send("GET", { key: i.toString() }, function (res) {
console.log(`Response for GET: ${res}`);
});
}
}, 5000);
// setInterval(async () => {
// for (var i = 0; i < 5000; i++) {
// console.log("sending to socket with i: ", i);
// const data = {
// op: "SET",
// data: {
// key: i.toString(),
// value: i.toString(),
// },
// };
// sockPush.send(data, function (res) {
// console.log(`ack for SET: ${res}`);
// });
// }

// // send 10 GET Commands
// for (var i = 0; i < 10; i++) {
// sockPush.send("GET", { key: i.toString() }, function (res) {
// console.log(`Response for GET: ${res}`);
// });
// }
// }, 5000);

// const g1 = {
// task: "GET",
// data: [{
// command: {
// key : "a"
// }
// }]
// };
// const g2 = {
// task: "GET",
// data: [{
// command: {
// key : "b"
// }
// }]
// };
// const gt1 = JSON.stringify(data);
// const gt2 = JSON.stringify(data2);
// netSocket.write(gt1);
// netSocket.write(gt2)
19 changes: 3 additions & 16 deletions proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@

const net = require("net");
const { EventEmitter } = require("events");
const {
executeSELECTQuery,
executeINSERTQuery,
executeDELETEQuery,
} = require("./queryExecuter");

class QueryQueue extends EventEmitter {
constructor() {
Expand Down Expand Up @@ -38,15 +33,7 @@ class QueryQueue extends EventEmitter {
}

async execute(query) {
if (query.toLowerCase().startsWith("select")) {
return await executeSELECTQuery(query);
} else if (query.toLowerCase().startsWith("insert into")) {
return await executeINSERTQuery(query);
} else if (query.toLowerCase().startsWith("delete from")) {
return await executeDELETEQuery(query);
} else {
throw new Error("Unsupported command");
}
// TODO: Replace with KV GET SET Logic
}
}

Expand Down Expand Up @@ -83,6 +70,6 @@ server.on("connection", (socket) => {
});
});

server.listen(5432, () => {
console.log("Server listening on port 5432");
server.listen(6767, () => {
console.log("Server listening on port 6767");
});
4 changes: 2 additions & 2 deletions raft-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function candidate() {
console.log("----------------------------------");
}

function onData(data) {
async function onData(data) {
// TODO: Edit this to make perfect
console.log(
"From Raft 'on' data method",
Expand All @@ -64,7 +64,7 @@ function onData(data) {
try {
// send acknowledgement
if (arr && arr.length > 0) {
const ackPacket = raft.packet("append ack", arr[0].command);
const ackPacket = await raft.packet("append ack", arr[0].command);
raft.message(MsgRaft.LEADER, ackPacket);
}
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions raft/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ class Raft extends EventEmitter {
async commitEntries(entries) {
entries.forEach(async (entry) => {
await this.log.commit(entry.index)
console.log("RAHUL entries commited in commit entries method", entry);
this.emit('commit', entry.command);
});
}
Expand Down
112 changes: 72 additions & 40 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,69 @@ server.on("connection", (socket) => {

socket.write("Connected\n");

socket.on("data", (data) => {
const { type, args } = data;
console.log(`Received request of type ${type} with arguments ${args}`);
raftNode.command(data);
socket.on("data", async (pkt) => {
pkt = JSON.parse(pkt);
const { task, data } = pkt;

if (raftNode.state === MsgRaft.LEADER) {
switch (task) {
case "SET":
// TODO: Test for async
try {
if (data && data.length > 0) {
let cmd = data[0].command;
await raftNode.command(cmd);
socket.write(
`${JSON.stringify(cmd)} - ack, ${raftNode.log.length}`
);
} else {
throw new Error("Invalid data format");
}
} catch (e) {
console.log(e);
socket.write("error 2");
}
break;
case "GET":
break;
case "EXIT":
raftNode.db.closeDb();
break;
default:
socket.write("error 46");
}
} else {
switch (task) {
case "SET":
let packet = await raftNode.packet("append", data);
console.log("RAHUL received packet", packet, "as not leader in set");
// forward to leader
raftNode.message(MsgRaft.LEADER, packet, () => {
console.log(
"Forwarded the set command to leader since I am a follower."
);
});
break;
case "GET":
// TODO: Test for async
console.log("Received a GET event on socket");
// Implement round robin here based on current state of Raft
reply(raftNode.db.get(data.key));
// if (raft.state !== MsgRaft.LEADER) {
// reply(raft.db.get(data.key));
// }
break;
case "EXIT":
raftNode.db.closeDb();
break;
default:
console.log("in default and task: ", task);
reply("error 90");
break;
}
}
});

// TODO: Figure out a way to connect sockPull to the socket received with server connection
sockPull.connect(port + 100);

Expand All @@ -54,7 +112,7 @@ server.on("connection", (socket) => {
console.log("Nodes", raftNode.nodes);
try {
console.log("Inside SET");
await raft.command(data);
await raftNode.command(data);
reply(`${JSON.stringify(data)} - ack, ${raftNode.log.length}`);
} catch (e) {
console.log(e);
Expand All @@ -67,20 +125,21 @@ server.on("connection", (socket) => {
} else {
switch (task) {
case "SET":
debug("Received a SET event on socket");
console.log("Received a SET event on socket");
// forward to leader
raftNode.message(
MsgRaft.LEADER,
MsgRaft.packet("append ack", JSON.stringify(task)),
raftNode.packet("append ack", JSON.stringify(task)),
() => {
console.log(
"Forwarded the set command to leader since I am a follower."
);
}
);
break;
case "GET":
// TODO: Test for async
debug("Received a GET event on socket");
console.log("Received a GET event on socket");
// Implement round robin here based on current state of Raft
reply(raftNode.db.get(data.key));
// if (raft.state !== MsgRaft.LEADER) {
Expand All @@ -101,7 +160,8 @@ server.on("connection", (socket) => {
});
});

server.listen(port + 1000, () => {
let raftNodeServerPort = port + 1000;
server.listen(raftNodeServerPort, () => {
// raftNode is initialised whenever the server starts to listen
// initiaise the node
raftNode = registerNode(port, {
Expand All @@ -116,35 +176,7 @@ server.listen(port + 1000, () => {
raftNode.join("tcp://0.0.0.0:" + nr);
});

console.log(`Initialsied raft node and server on port ${port}`);

// send a message to the raft every 5 seconds
// setInterval(async () => {
// if (raftNode.state === MsgRaft.LEADER) {
// for (var i = 0; i < 5000; i++) {
// const command = {
// command: "SET",
// data: {
// key: i.toString(),
// value: i.toString(),
// },
// };
// await raftNode.command(command);
// }
// // sockPush.send('SET', {
// // 'key': i.toString(), 'value': i.toString()
// // }, function (res) {
// // console.log(`ack for SET: ${res}`);
// // });
// }

// // for (var i = 0; i < 10; i++) {
// // sockPush.send('GET', { 'key': i.toString() }, function (res) {
// // console.log(`Response for GET: ${res}`);
// // });
// // }
// // raft.message(MsgRaft.LEADER, { foo: 'bar' }, () => {
// // console.log('message sent');
// // });
// }, 5000);
console.log(
`Initialsied raft node at socket ${port} and raft node server on port ${raftNodeServerPort}`
);
});