Process-to-process communication and object synchronization. Backed by node-ipc
.
- ⚙️ Multiple NodeJS processes can write to and read from the shared object.
- 🤝 Multiple NodeJS processes can communicate via RPC calls or EventBus.
- ✨ No extra server host process is needed, each process acts as a host or client, when the host process exits, the host role is taken over by another process.
- 🛡️ Enhanced concurrency control
1
Initialization2
Object changes3
Race conditions and conflict resolutions4
RPC5
Events6
Object Observer7
Mutex8
Storage9
Server10
Stop
1
Initialization §
MemSync sample initialization
import { MemSync } from 'memsync'
let memsync = new MemSync({
/* some unique/global name */
name: 'my_shared_object',
data: {
foo: 1,
bar: 2,
qux: [ 'one', 'two' ]
},
rpc: {
foo () { return 'bar'; }
},
options: {
/* optional */
server: {
port: 80
}
/* optional */
file: {
path: './file/persistence.json'
},
/* optional */
peer: {
roles: ['writer']
}
/* optional */
network: {
maxWriters: 1
},
logEvents: false
timeout: 30_000
}
);
await memsync.start();
start
discovers other nodes
- When the server already exists, we connect to the server and receive current object. Now our process holds the up-to-date data, receives new patches from the network and sends the changes to the host. When the host exits, we rediscover the network, and will try to act as a server, as we now holding the up-to-date object.
- When the process-to-process network does not exists, we create a server and start accepting new processes to join and will later share the current data with the clients
2
Object changes §
To read values you would use it as simple object let foo = memsync.data.foo
, but for changes use mongodb
update operators
await memsync.patch({
$set: {
foo: 2
}
})
We apply the patch immediately the object, and will try to
- if client: send to the host, which will accept the patch to itself, and broadcast the patch to other nodes.
- if server: broadcast the patch to nodes, if any.
3
Race conditions and conflict resolutions §
- When multiple processes modify different parts of the object - there is no conflicts and the patch order has no matter.
- When multiple patches for the same data are made - the host acts as the source of truth - it accepts the patches by timestamp (patch creation date by client), if for any reason a patch is delivered to the host with older timestamp, as already was applied, it will be rejected and the sender (the client) will be notified about current state and current patches.
4
RPC §
// process #1
let memsync = new MemSync({
name: 'foo',
rpc: {
getFoo () { return 'bar'; }
}
);
await memsync.start();
// process #2
let memsync = new MemSync({
name: 'foo',
});
await memsync.start();
let str = await memsync.call('getFoo');
// str === 'bar'
5
Events
Each process can emit and listen to events
// process #1
memsync.events.emit('foo', 123);
// process #2
memsync.events.on('foo', num => {
console.log('Lorem', 123);
})
6
Object Observer §
Observe the objects properties
memsync.observe('foo', (fooValue) => {
console.log(`FooValue changed`, fooValue);
})
7
Mutex §
By using writer
role and specifying a maxWriters
property to 1
for the instance, you make sure there is only one worker at a time.
// process #1
import { MemSync } from 'memsync'
let memsync = new MemSync({
name: 'foo',
options: {
peer: {
roles: ['writer']
}
network: {
maxWriters: 1
}
}
});
await memsync.start();
// process #2
import { MemSync, MemErrors } from 'memsync'
let memsync = new MemSync({
name: 'foo',
options: {
peer: {
roles: ['writer']
}
network: {
maxWriters: 1
}
}
});
try {
await memsync.start();
} catch (error) {
error.code === MemErrors.ERR_MAX_WRITERS
}
8
Storage §
You can load/save objects to the fs. We use atma-io
for this, and so you can use not just file://
protocol, but any other load atma-io
supports, for example atma-io-transport-s3
to load from any S3
compatible storage.
let mem = new MemSync('storage', { num: 0 }, {
file: {
path: 's3://my/storage.json'
}
});
9
Server §
The process may expose a server to query the object and its current state
let memory = new MemSync('foo-bar', { num: 0 }, {
server: { port: 8883 }
});
// Starts memory sync process, and also exposes the 8883 for http queries
await memory.start();
You can query the state of the foo-bar
object from extern
curl http://localhost:8883/foo-bar
//e.g.> `{ num: 1 }`
stop
Stop §
You can stop the node anytime
- if it is a client, it just disconnects
- if it is a server, it also stops listening, and one of alive nodes (if any) will act then as a server
memsync.stop()