-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lightning Stream support #267
Comments
I would suggest that most of this probably does belong at the application level. We also build this same type of functionality on top of lmdb-js, so it definitely works well to do that. At some point it might be nice to have "deleted" entry awareness in lmdb-js for record counts, but we have the same exact concept in the db software we build. I will also mention the |
Thanks for the hints. That will help me greatly.
Is that open source by any chance? I would love to pillage the sources a bit. :D |
I finally got time to play with this. The message packing and unpacking seems relatively straight forward. But the event listeners don't seem to work. (Not sure if I should open a new ticket) const lmdb = require('lmdb');
const db = lmdb.open({ path: 'test_db' });
let token_db = db.openDB('tokens');
db.on('aftercommit', ({ next, last, txnId }) => {
console.log("aftercommit", txnId)
});
db.on("beforecommit", (...args) => {
const parameters = args.join(', ');
console.log(`beforecommit event with parameters ${parameters}`);
});
token_db.on('aftercommit', ({ next, last, txnId }) => {
console.log("aftercommit", txnId)
});
token_db.on("beforecommit", (...args) => {
const parameters = args.join(', ');
console.log(`beforecommit event with parameters ${parameters}`);
});
token_db.putSync("test","test")
token_db.get("test");
console.log("events: ", db.eventNames());
console.log("events: ", token_db.eventNames()); outputs
Also, I need the txnId before I write out data. With Is there a way to access the highest txnId before a commit is made? |
The commit events are only for asynchronous transactions, not synchronous as in the example above. I suppose they could be triggered for synchronous transactions, but doesn't seem that helpful since they are already explicitly started/stopped in the main thread. Instead I added a |
Cool, that's very useful. Thanks a lot. I was thinking of wrapping my code into transactions even though it's not really useful for me. |
I struggle to use the encoding Module to implement this, mostly because I have to re-use existing headers when updating entries. (I'm not allowed to drop unused flags and header extensions. But that context would be lost.) Is there something like putBinary() complementary to getBinary()? Because passing a Buffer into put() put's a prefix before the data. I can of course use a null Encoder, but putBinary() would be a bit more clear in the code. |
Yes, I think you want to use |
Oh how embarrassing. I was convinced asBinary() put a two byte header in front of my data. I did a lot of testing with zero buffers and got spurious data in the front. But today with coffee and more sleep I see it's working as intended. 🤦 Sorry to waste your time |
For anyone coming here by google // Header offsets
// https://github.com/PowerDNS/lightningstream/blob/main/docs/schema-native.md
const LS_HEADER_SIZE = 24;
const LS_HEADER_POS_TIMESTAMP = 0;
const LS_HEADER_POS_TXN = 8;
const LS_HEADER_POS_SCHEMA = 16;
const LS_HEADER_POS_FLAGS = 17;
const LS_HEADER_POS_EXTENSION_COUNT = 22;
const LS_EXTENSION_HEADER_SIZE = 8;
const LS_FLAGS_DELETED = 1 << 0;
class LSData {
#header;
#value;
constructor(data) {
if (data) {
// Parse a buffer
this.unpack(data);
} else {
// Init an empty object
this.#header = Buffer.alloc(LS_HEADER_SIZE);
}
return this;
}
unpack(data) {
let extension_headers = data.readInt8(LS_HEADER_POS_EXTENSION_COUNT);
let header_len =
LS_HEADER_SIZE + extension_headers * LS_EXTENSION_HEADER_SIZE;
let header = data.subarray(0, header_len);
let buf = data.subarray(header.length);
if(header.readInt8(LS_HEADER_POS_SCHEMA) != 0){
throw Error("Schema version of Lightning header is not 0. Not allowed!")
}
this.#header = header;
if (buf.length > 0) {
this.value = msgpackr.unpack(buf);
} else {
this.value = undefined;
}
return this;
}
asBuffer(txnID) {
this.#header.writeBigInt64BE(BigInt(txnID), LS_HEADER_POS_TXN);
this.#header.writeBigInt64BE(process.hrtime.bigint(), LS_HEADER_POS_TIMESTAMP);
let buf = [this.#header];
if (!this.deleted) {
buf.push(msgpackr.pack(this.#value)); // Only write data if it was set
}
return Buffer.concat(buf);
}
get deleted() {
let flags = this.#header.readInt8(LS_HEADER_POS_FLAGS);
let value = (flags & LS_FLAGS_DELETED) != 0;
return value;
}
set deleted(val) {
let delflag = val ? LS_FLAGS_DELETED : 0;
let flags = this.#header.readInt8(LS_HEADER_POS_FLAGS);
flags = (flags & LS_FLAGS_DELETED) | delflag;
this.#header.writeInt8(flags, LS_HEADER_POS_FLAGS);
}
set value(val){
this.deleted = false;
this.#value = val
}
get value(){
return this.#value;
}
} // Store
return await db.transactionSync(() => {
let data;
if (db.doesExist(key)) {
data = db.getBinary(key);
}
let ls_entry = new LSData(data);
ls_entry.deleted = false;
ls_entry.value = {YOUR STUFF};
return token_db.put(key, asBinary(ls_entry.asBuffer(db.getWriteTxnId())));
});
// Retrieve
let data = db.getBinary(key);
if (!data) {
return null; // No entry
}
let ls_entry = new LSData(data);
if (ls_entry.deleted) {
return null; // Entry is marked as deleted
}
let value = ls_entry.value;
// Delete
return await db.transactionSync(() => {
let data;
if (db.doesExist(key)) {
data = db.getBinary(key);
}
let ls_entry = new LSData(data);
ls_entry.deleted = true;
return db.put(key, asBinary(ls_entry.asBuffer( db.getWriteTxnId())));
}); |
If you don't mind I pop this one open again. 😄 So I have a db that I can read and write, and I think it's lightningstream compatible data. But lightningstream can't even open my db.
The first db's are the one from pdns. It's a big mess. The lmdb file created from lmdb-js on the
The node instance on my dev system is subtly different.
I feel suddenly transported in the 90's. Does LMDB really have incompatible binary versions? PS: Apparently lithiningstream is unhappy if you don't use |
I tried with
I guess there are some non-obvious build dependencies... |
closing because it's not really related to the issue I think |
I want to write a simple application with native Lightning Stream support.
It basically needs additional content headers https://github.com/PowerDNS/lightningstream/blob/main/docs/schema-native.md
And values that are deleted should not be deleted but have the deleted flag set (and the value set to empty).
The header seems pretty easy via custom encoders. But I suppose the devil is in the details. Like having access to the transaction ID at that time.
And filtering deleted values and overwriting remove() would be required...
I haven't produced any code yet but I wonder if it makes sense to support in this library or a wrapper library?
I man implementing it in application level would be cool too. The particular application can very well be polluted with back-end specifics.
The text was updated successfully, but these errors were encountered: