Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions axon-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const axon = require("axon");
const argv = require("argh").argv;

// Create a Push socket
const socket = axon.socket("push");

let port = +argv.port || 8081;
// Connect to the socket
socket.connect(port); // Replace 8000 with the port you want to connect to

// Data to send
const setMessages = createEventObjects(5, "SET");
const getMessages = createEventObjects(5, "GET");

// Iterate through the messages and send each one
for (const message of setMessages) {
console.log("sending message: ", message);
socket.send(JSON.stringify(message));
}

// WAIT for a second and send GET messages
setTimeout(() => {
for (const message of getMessages) {
console.log("sending message: ", message);
socket.send(JSON.stringify(message));
}
}, 1000);

// Close the socket when done (optional)
socket.close();

/**
*
* @param {number} len length of the array required
* @param {string} op operation to be performed
* @returns
*/
function createEventObjects(len, op) {
const items = [];
for (let i = 1; i <= len; i++) {
let data = {
task: op,
data: [
{
command: {
key: "key_" + i,
value: i + "",
},
},
],
};
if (op === "GET") delete data.data[0].command.value;
items.push(data);
}

return items;
}
169 changes: 84 additions & 85 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,103 @@
* this file serves as client to connect to the server raft nodes
*/
const argv = require("argh").argv;
var axon = require("axon");
const net = require("net");

// var sockPush = axon.socket("req");
let port = +argv.port || 8081; // read the port from command line arguments
var netSocket = net.createConnection({ port: port + 1000 }, () => {
console.log("connected to server at port", port + 1000);
});

// sockPush.connect(port + 1200);
const sendEvent = (op, data) => {
let data = {
task: op,
data: data,
};

netSocket.write(JSON.stringify(data));
};

// const ex = {
// task: "EXIT",
// data: [
// {
// command: {
// key: "0",
// value: "7",
// },
// },
// ],
// };
function sequentialCalls(times){
let i = 1;

function makeCall() {
// Your function or code that needs to be called sequentially
let data = {
task: "SET",
data: [
{
command: {
key: "a" + i,
value: "" + i,
},
let i = 1;
(() => {
setInterval(() => {
// SEND 5 SEQUENTIAL SETS
setTimeout(
sendEvent("GET", [
{
command: {
key: "a" + i,
value: "" + i,
},
],
};
let st = JSON.stringify(data)
netSocket.write(st);

i++;

// Check if there are more calls to make
if (i <= times) {
// Set a timeout for 1 second before making the next call
setTimeout(makeCall, 1000);
}
},
]),
500
);
// SEND 5 SEQUENTIAL GETS
i++;
}, 1000);
})();

function sequentialGETCalls(times) {
let i = 1;

function makeCall() {
// Your function or code that needs to be called sequentially
let data = {
task: "GET",
data: [
{
command: {
key: "a" + i,
},
},
],
};
let st = JSON.stringify(data);
netSocket.write(st);

i++;

// Check if there are more calls to make
if (i <= times) {
// Set a timeout for 1 second before making the next call
setTimeout(makeCall, 1000);
}

// Start the first call
makeCall();
}

// Start the first call
makeCall();
}

sequentialCalls(5);
function sequentialSETCalls(times) {
let i = 1;
for (let i = 0; i < times; i++) {
let data = {
task: "SET",
data: [
{
command: {
key: "a" + i,
value: "" + i,
},
},
],
};

// send a message to the raft every 5 seconds
// setInterval(async () => {
// for (var i = 0; i < 5000; i++) {
// console.log("sending to socket with i: ", i);
// const data = {
// op: "SET",
// data: {
// key: i.toString(),
// value: i.toString(),
// },
// };
// sockPush.send(data, function (res) {
// console.log(`ack for SET: ${res}`);
// });
// }
let st = JSON.stringify(data);
netSocket.write(st);
setTimeout(() => {
console.log("waiting 1 second", 1000);
});
}
//make get calls after set calls
setTimeout(() => {
sequentialGETCalls(5);
}, 1000);
}

// // send 10 GET Commands
// for (var i = 0; i < 10; i++) {
// sockPush.send("GET", { key: i.toString() }, function (res) {
// console.log(`Response for GET: ${res}`);
// });
// }
// }, 5000);
sequentialSETCalls(5);

// const g1 = {
// task: "GET",
// data: [{
// command: {
// key : "a"
// }
// }]
// };
// const g2 = {
// task: "GET",
// data: [{
// command: {
// key : "b"
// }
// }]
// };
// const gt1 = JSON.stringify(data);
// const gt2 = JSON.stringify(data2);
// netSocket.write(gt1);
// netSocket.write(gt2)
netSocket.on("data", (buffer) => {
const data = buffer.toString("utf8");
console.log(data);
});
98 changes: 51 additions & 47 deletions db.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,66 @@
* @description This file serves as the interface for storage engine of the KV Store
*/

const lmdb = require('node-lmdb');
const fs = require('fs')
const lmdb = require("node-lmdb");
const fs = require("fs");
// TODO: Turn this into an interface to different storage engine -- try RocksDB on the lines of TiKV
class LMDBManager {
constructor(path, mapSize, maxDbs) {
if(!fs.existsSync(path)) {
fs.mkdirSync(path, {recursive: true});
}
this.env = new lmdb.Env();
this.env.open({
path: path || "./db",
mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default
maxDbs: maxDbs || 10
});
this.writeTxn = null;
constructor(path, mapSize, maxDbs) {
if (!fs.existsSync(path)) {
fs.mkdirSync(path, { recursive: true });
}
this.env = new lmdb.Env();
this.env.open({
path: path || "./db",
mapSize: mapSize || 2 * 1024 * 1024 * 1024, // 2 GB by default
maxDbs: maxDbs || 10,
});
this.writeTxn = null;
}

openDb(dbName) {
this.dbi = this.env.openDbi({
name: dbName || "mydb",
create: true
});
}

closeDb() {
if(this.writeTxn) this.writeTxn.commit();
this.dbi.close();
}
openDb(dbName) {
this.dbi = this.env.openDbi({
name: dbName || "mydb",
create: true,
});
}

closeEnv() {
this.env.close();
}
closeDb() {
if (this.writeTxn) this.writeTxn.commit();
this.dbi.close();
}

set(key, value) {
try {
if (!this.writeTxn) {
const txn = this.env.beginTxn();
this.writeTxn = txn;
}
this.writeTxn.putString(this.dbi, key, value);
console.log('wrote', key, value);
} catch (e) {
console.error("Not a valid key", key, value);
}
closeEnv() {
this.env.close();
}

set(key, value) {
try {
if (!this.writeTxn) {
const txn = this.env.beginTxn();
this.writeTxn = txn;
}
this.writeTxn.putString(this.dbi, key, value);
console.log("wrote", key, value);
} catch (e) {
console.error("Not a valid key", key, value);
}
}

get(key) {
// const txn = this.env.beginTxn({ readOnly: true });
if (!this.writeTxn) {
const txn = this.env.beginTxn();
this.writeTxn = txn;
}
const value = this.writeTxn.getString(this.dbi, key);
return value;
get(key) {
// const txn = this.env.beginTxn({ readOnly: true });
try {
if (!this.writeTxn) {
const txn = this.env.beginTxn();
this.writeTxn = txn;
}
const value = this.writeTxn.getString(this.dbi, key);
return value;
} catch (err) {
console.error(err);
return "error finding key";
}
}
}

module.exports = LMDBManager;
Expand All @@ -77,4 +81,4 @@ console.log(dbManager.get(1)); // Output: Hello world!

dbManager.closeDb();
dbManager.closeEnv();
*/
*/
10 changes: 8 additions & 2 deletions raft-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ async function onData(data) {
function onCommit(command) {
// TODO: Edit this to make perfect
console.log("Inside commit", command);
raft.db.set(command.key, command.value);
console.log("Committed", command.key, command.value);
let val;
if (command.type === "SET") {
raft.db.set(command.key, command.value);
console.log("Committed", command.key, command.value);
} else if (command.type === "GET") {
val = raft.db.get(command.key);
console.log("got val in GET: ", val);
}
}

// main function to create and return the instance
Expand Down
1 change: 0 additions & 1 deletion raft/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,6 @@ class Raft extends EventEmitter {
async commitEntries(entries) {
entries.forEach(async (entry) => {
await this.log.commit(entry.index)
console.log("RAHUL entries commited in commit entries method", entry);
this.emit('commit', entry.command);
});
}
Expand Down
Loading