Skip to content

Commit

Permalink
Issue #1 improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Lott authored and Scott Lott committed Aug 14, 2019
1 parent 99ad569 commit 889717a
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 85 deletions.
83 changes: 81 additions & 2 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ import * as fs from "fs";
import * as path from "path";
import { SnapManifest, writeManifestUpdate, fileName, VERSION, throttle, SnapIndex, NULLBYTE, tableGenerator } from "./common";
import { BloomFilter, MurmurHash3, IbloomFilterObj } from "./bloom";
import { createRBTree, RedBlackTree } from "./rbtree";
import { createRBTree, RedBlackTree, RedBlackTreeIterator } from "./rbtree";

export const rand = () => {
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

for (var i = 0; i < 6; i++)
text += possible.charAt(Math.floor(Math.random() * possible.length));

return text;
}

export class SnapDatabase {

Expand Down Expand Up @@ -468,6 +477,55 @@ export class SnapDatabase {
this._doingTx = false;
}

public iterators: {
[key: string]: {it: RedBlackTreeIterator, r: boolean};
} = {};

public newIterator(mode: "all"|"offset"|"range", args: any[], reverse: boolean): string {
let id = rand();
while(this.iterators[id]) {
id = rand();
}
switch(mode) {
case "all":
this.iterators[id] = {it: reverse ? this._index.end() : this._index.begin(), r: reverse};
break;
case "offset":
this.iterators[id] = {it: reverse ? this._index.end() : this._index.begin(), r: reverse};
let i = args[0] || 0;
while (i-- && this.iterators[id].it.valid()) {
if (reverse) {
this.iterators[id].it.prev();
} else {
this.iterators[id].it.next();
}
}
break;
case "range":
this.iterators[id] = {it: reverse ? this._index.le(args[0]) : this._index.ge(args[1]), r: reverse};
break;
}
return id;
}

public clearIterator(id: string) {
delete this.iterators[id];
}

public nextIterator(id: string): {key: any, done: boolean} {
if (this.iterators[id].it.valid()) {
const key = this.iterators[id].it.key();
if (this.iterators[id].r) {
this.iterators[id].it.prev();
} else {
this.iterators[id].it.next();
}
return {key: key, done: false};
} else {
return {key: undefined, done: true};
}
}

public getAllKeys(onKey: (key: any) => void, complete: (err?: any) => void, reverse: boolean) {
const it = reverse ? this._index.end() : this._index.begin()
try {
Expand Down Expand Up @@ -522,7 +580,7 @@ export class SnapDatabase {

try {
let i = offset || 0;
while (i--) {
while (i-- && it.valid()) {
if (reverse) {
it.prev();
} else {
Expand Down Expand Up @@ -594,6 +652,27 @@ export class SnapDatabase {
case "do-compact":
this.flushLog(true);
break;
case "snap-new-iterator":
try {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: [undefined, this.newIterator(msg.args[0], msg.args[1], msg.args[2])] })
} catch (e) {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: ["Failed to make iterator!", ""] })
}
break;
case "snap-next-iterator":
try {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: [undefined, this.nextIterator(msg.args[0])] })
} catch (e) {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: ["Failed to get next iterator value!", ""] })
}
break;
case "snap-clear-iterator":
try {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: [undefined, this.clearIterator(msg.args[0])] })
} catch (e) {
if (process.send) process.send({ type: "snap-res-done", id: msgId, data: ["Failed to get next iterator value!", ""] })
}
break;
case "snap-get":
try {
if (process.send) process.send({ type: "snap-res-done", id: msgId, event: "get", data: [undefined, this.get(key)] })
Expand Down
212 changes: 129 additions & 83 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@ import { fork, ChildProcess } from "child_process";
import { VERSION, fileName as fNameFN } from "./common";
import { ReallySmallEvents } from "./rse";
import * as fs from "fs";
import { SnapDatabase } from "./database";
import { SnapDatabase, rand } from "./database";

const messageBuffer: {
[messageId: string]: (values: string[]) => void;
} = {};

export const rand = () => {
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

for (var i = 0; i < 6; i++)
text += possible.charAt(Math.floor(Math.random() * possible.length));

return text;
}

export interface SnapEvent {
target: SnapDB<any>,
Expand Down Expand Up @@ -420,6 +412,45 @@ export class SnapDB<K> {
});
}

private _asyncNewIterator(mode: "all" | "offset" | "range", args: any[], reverse: boolean): Promise<string> {
return new Promise((res, rej) => {
const msgId = this._msgID((data) => {
if (data[0]) {
rej(data[0]);
} else {
res(data[1]);
}
})
this._worker.send({ type: "snap-new-iterator", args: [mode, args, reverse], id: msgId });
});
}

private _asyncNextIterator(id: string): Promise<{ key: K, done: boolean }> {
return new Promise((res, rej) => {
const msgId = this._msgID((data) => {
if (data[0]) {
rej(data[0]);
} else {
res(data[1]);
}
})
this._worker.send({ type: "snap-next-iterator", args: [id], id: msgId });
});
}

private _asyncClearIteator(id: string): Promise<void> {
return new Promise((res, rej) => {
const msgId = this._msgID((data) => {
if (data[0]) {
rej(data[0]);
} else {
res();
}
})
this._worker.send({ type: "snap-clear-iterator", args: [id], id: msgId });
});
}

/**
* Async Iterable version of get all keys.
*
Expand All @@ -428,22 +459,46 @@ export class SnapDB<K> {
* @returns {Promise<AsyncIterableIterator<K>>}
* @memberof SnapDB
*/
public getAllKeysAsync(reverse?: boolean):Promise<AsyncIterableIterator<K>> {
return new Promise((res, rej) => {
let keys: any[] = [];
this.getAllKeys((key) => {
keys.push(key);
}, () => {
async function* loopKeys() {
let i = 0;
while(i < keys.length) {
yield keys[i];
i++;
public getAllKeysAsync(reverse?: boolean): Promise<AsyncIterableIterator<K>> {
return this._doWhenReady((res, rej) => {
const that = this;
res(async function* () {
if (that._worker) {
const id = await that._asyncNewIterator("all", [], reverse || false);
try {
let nextKey = await that._asyncNextIterator(id);
while (!nextKey.done) {
yield nextKey.key;
if (that._hasEvents) that._rse.trigger("get-keys", { target: that, tx: id, time: Date.now(), data: nextKey.key });
nextKey = await that._asyncNextIterator(id);
}
await that._asyncClearIteator(id);

if (that._hasEvents) that._rse.trigger("get-keys-end", { target: that, tx: id, time: Date.now(), error: undefined });
} catch (e) {
if (that._hasEvents) that._rse.trigger("get-keys-end", { target: that, tx: id, time: Date.now(), error: e });
throw e;
}

} else {
const id = that._database.newIterator("all", [], reverse || false);
try {
let nextKey = that._database.nextIterator(id);
while (!nextKey.done) {
yield nextKey.key;
if (that._hasEvents) that._rse.trigger("get-keys", { target: that, tx: id, time: Date.now(), data: nextKey.key });
nextKey = that._database.nextIterator(id)
}
that._database.clearIterator(id);

if (that._hasEvents) that._rse.trigger("get-keys-end", { target: that, tx: id, time: Date.now(), error: undefined });
} catch (e) {
if (that._hasEvents) that._rse.trigger("get-keys-end", { target: that, tx: id, time: Date.now(), error: e });
throw e;
}
}
res(loopKeys());
}, reverse);
})
});
});
}

/**
Expand Down Expand Up @@ -580,33 +635,60 @@ export class SnapDB<K> {
});
}

private _asyncKeysAndValues(mode: "all"|"offset"|"range", args: any[], reverse: boolean, progressEvent: string, doneEvent: string): Promise<AsyncIterableIterator<[K, string]>> {
return this._doWhenReady((res, rej) => {
const that = this;
res(async function* () {
if (that._worker) {
const id = await that._asyncNewIterator(mode, args, reverse || false);
try {
let nextKey = await that._asyncNextIterator(id);
let nextValue = nextKey.done ? undefined : await that.get(nextKey.key);
while (!nextKey.done) {
if (that._hasEvents) that._rse.trigger(progressEvent, { target: that, tx: id, time: Date.now(), data: { k: nextKey, v: nextValue } });
yield [nextKey.key, nextValue];
nextKey = await that._asyncNextIterator(id);
nextValue = await that.get(nextKey.key);
}
await that._asyncClearIteator(id);
if (that._hasEvents) that._rse.trigger(doneEvent, { target: that, tx: id, time: Date.now(), error: undefined });
} catch (e) {
if (that._hasEvents) that._rse.trigger(doneEvent, { target: that, tx: id, time: Date.now(), error: eval });
throw e;
}

} else {
const id = that._database.newIterator(mode, args, reverse || false);
try {
let nextKey = that._database.nextIterator(id);
let nextValue = nextKey.done ? undefined : await that.get(nextKey.key);
while (!nextKey.done) {
if (that._hasEvents) that._rse.trigger(progressEvent, { target: that, tx: id, time: Date.now(), data: { k: nextKey, v: nextValue } });
yield [nextKey.key, nextValue];
nextKey = that._database.nextIterator(id);
nextValue = await that.get(nextKey.key);
}
that._database.clearIterator(id);

if (that._hasEvents) that._rse.trigger(doneEvent, { target: that, tx: id, time: Date.now(), error: undefined });
} catch (e) {
if (that._hasEvents) that._rse.trigger(doneEvent, { target: that, tx: id, time: Date.now(), error: e });
throw e;
}
}
});
});
}

/**
* Async Iterable version of getAll.
*
* @param {boolean} [reverse]
* @returns {Promise<AsyncIterableIterator<[K, string]>>}
* @memberof SnapDB
*/
public getAllAsync(reverse?: boolean):Promise<AsyncIterableIterator<[K, string]>> {
return new Promise((res, rej) => {
let values: any[] = [];
this.getAll((key, value) => {
values.push([key, value]);
}, (err) => {
if (err) {
rej(err);
return;
}
async function* loopValues() {
let i = 0;
while(i < values.length) {
yield values[i];
i++;
}
}
res(loopValues());
}, reverse);
})
public getAllAsync(reverse?: boolean): Promise<AsyncIterableIterator<[K, string]>> {
return this._asyncKeysAndValues("all", [], reverse || false, "get-all", "get-all-end");
}

/**
Expand Down Expand Up @@ -654,26 +736,8 @@ export class SnapDB<K> {
* @returns {Promise<AsyncIterableIterator<[K, string]>>}
* @memberof SnapDB
*/
public rangeAsync(lower: K, higher: K, reverse?: boolean):Promise<AsyncIterableIterator<[K, string]>> {
return new Promise((res, rej) => {
let values: any[] = [];
this.range(lower, higher, (key, value) => {
values.push([key, value]);
}, (err) => {
if (err) {
rej(err);
return;
}
async function* loopValues() {
let i = 0;
while(i < values.length) {
yield values[i];
i++;
}
}
res(loopValues());
}, reverse);
})
public rangeAsync(lower: K, higher: K, reverse?: boolean): Promise<AsyncIterableIterator<[K, string]>> {
return this._asyncKeysAndValues("range", [lower, higher], reverse || false, "get-range", "get-range-end");
}

/**
Expand Down Expand Up @@ -723,26 +787,8 @@ export class SnapDB<K> {
* @returns {Promise<AsyncIterableIterator<[K, string]>>}
* @memberof SnapDB
*/
public offsetAsync(offset: number, limit: number, reverse?: boolean):Promise<AsyncIterableIterator<[K, string]>> {
return new Promise((res, rej) => {
let values: any[] = [];
this.offset(offset, limit, (key, value) => {
values.push([key, value]);
}, (err) => {
if (err) {
rej(err);
return;
}
async function* loopValues() {
let i = 0;
while(i < values.length) {
yield values[i];
i++;
}
}
res(loopValues());
}, reverse);
})
public offsetAsync(offset: number, limit: number, reverse?: boolean): Promise<AsyncIterableIterator<[K, string]>> {
return this._asyncKeysAndValues("offset", [offset, limit], reverse || false, "get-offset", "get-offset-end");
}

/**
Expand Down
Loading

0 comments on commit 889717a

Please sign in to comment.