Skip to content

Commit

Permalink
Add ability to listen for iterator completion
Browse files Browse the repository at this point in the history
  • Loading branch information
kriszyp committed Nov 17, 2022
1 parent 5942536 commit 24762d0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ declare namespace lmdb {
filter(callback: (entry: T) => any): RangeIterable<T>
[Symbol.iterator]() : Iterator<T>
forEach(callback: (entry: T) => any): void
onDone?: Function
asArray: T[]
}
class Transaction {
Expand Down
23 changes: 11 additions & 12 deletions read.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,11 @@ export function addReadMethods(LMDBStore, {
function finishCursor() {
if (txn.isDone)
return;
if (iterable.onDone)
iterable.onDone()
if (cursorRenewId)
txn.renewingRefCount--;
if (--txn.refCount <= 0 && txn.onlyCursor) {
if (--txn.refCount <= 0 && txn.notCurrent) {
cursor.close();
txn.abort(); // this is no longer main read txn, abort it now that we are done
txn.isDone = true;
Expand Down Expand Up @@ -618,13 +620,7 @@ export function addReadMethods(LMDBStore, {
},
useReadTransaction() {
let txn = readTxnRenewed ? readTxn : renewReadTxn(this);
txn.refCount = (txn.refCount || 0) + 1;
txn.done = function() {
txn.refCount--;
if (txn.refCount === 0 && readTxn !== txn) {
txn.abort();
}
};
txn.use();
return txn;
},
close(callback) {
Expand Down Expand Up @@ -693,8 +689,8 @@ export function addReadMethods(LMDBStore, {
readTxn = new Txn(env, 0x20000, lastReadTxn && !lastReadTxn.isDone && lastReadTxn);
if (readTxn.address == 0) {
readTxn = lastReadTxn;
if (readTxn.onlyCursor)
readTxn.onlyCursor = false;
if (readTxn.notCurrent)
readTxn.notCurrent = false;
}
break;
} catch (error) {
Expand All @@ -718,7 +714,7 @@ export function addReadMethods(LMDBStore, {
if (readTxnRenewed) {
readTxnRenewed = null;
if (readTxn.refCount - (readTxn.renewingRefCount || 0) > 0) {
readTxn.onlyCursor = true;
readTxn.notCurrent = true;
lastReadTxnRef = new WeakRef(readTxn);
readTxn = null;
} else {
Expand All @@ -736,11 +732,14 @@ export function makeReusableBuffer(size) {

Txn.prototype.done = function() {
this.refCount--;
if (this.refCount == 0 && this.onlyCursor) {
if (this.refCount === 0 && this.notCurrent) {
this.abort();
this.isDone = true;
}
}
Txn.prototype.use = function() {
this.refCount = (this.refCount || 0) + 1;
}


let readInstructions, readCallbacks, uint32Instructions, instructionsDataView = { setFloat64() {}, setUint32() {} }, instructionsAddress;
Expand Down
33 changes: 24 additions & 9 deletions util/RangeIterable.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ export class RangeIterable {
}
map(func) {
let source = this;
let result = new RangeIterable();
result.iterate = (async) => {
let iterable = new RangeIterable();
iterable.iterate = (async) => {
let iterator = source[Symbol.iterator](async);
let i = 0;
return {
Expand All @@ -35,6 +35,7 @@ export class RangeIterable {
}
if (iteratorResult.done === true) {
this.done = true;
if (iterable.onDone) iterable.onDone();
return iteratorResult;
}
result = func(iteratorResult.value, i++);
Expand All @@ -46,23 +47,26 @@ export class RangeIterable {
value: result
});
}
} while(result === SKIP)
} while(result === SKIP);
if (result === DONE) {
if (iterable.onDone) iterable.onDone();
return result;
}
return {
value: result
};
},
return() {
if (iterable.onDone) iterable.onDone();
return iterator.return();
},
throw() {
if (iterable.onDone) iterable.onDone();
return iterator.throw();
}
};
};
return result;
return iterable;
}
[Symbol.asyncIterator]() {
return this.iterator = this.iterate();
Expand All @@ -89,17 +93,24 @@ export class RangeIterable {
return {
next() {
let result = iterator.next();
if (isFirst && result.done) {
isFirst = false;
iterator = secondIterable[Symbol.iterator](async);
return iterator.next();
if (result.done) {
if (isFirst) {
isFirst = false;
iterator = secondIterable[Symbol.iterator](async);
result = iterator.next();
if (result.done && concatIterable.onDone) iterable.onDone();
} else {
if (concatIterable.onDone) concatIterable.onDone();
}
}
return result;
},
return() {
if (concatIterable.onDone) concatIterable.onDone();
return iterator.return();
},
throw() {
if (concatIterable.onDone) concatIterable.onDone();
return iterator.throw();
}
};
Expand All @@ -123,8 +134,10 @@ export class RangeIterable {
}
}
let result = iterator.next();
if (result.done)
if (result.done) {
if (mappedIterable.onDone) mappedIterable.onDone();
return result;
}
let value = callback(result.value);
if (Array.isArray(value) || value instanceof RangeIterable)
currentSubIterator = value[Symbol.iterator]();
Expand All @@ -135,11 +148,13 @@ export class RangeIterable {
} while(true);
},
return() {
if (mappedIterable.onDone) mappedIterable.onDone();
if (currentSubIterator)
currentSubIterator.return();
return iterator.return();
},
throw() {
if (mappedIterable.onDone) mappedIterable.onDone();
if (currentSubIterator)
currentSubIterator.throw();
return iterator.throw();
Expand Down

0 comments on commit 24762d0

Please sign in to comment.