Skip to content
Permalink
Browse files

ref(locks): Improve distributed locks with fencing

  • Loading branch information...
ex3ndr committed Jun 20, 2019
1 parent 129f7fd commit bbba6be863ee965c060a66ceda3558a0eabee310
@@ -0,0 +1,58 @@
import { Context } from '@openland/context';
import { LockLayer } from './LockLayer';
import { Database, transactional } from '@openland/foundationdb';
import { uniqueSeed } from '@openland/foundationdb-utils';

export class DistributedLock {

readonly db: Database;
readonly layer: LockLayer;
readonly key: string;
readonly version: number;

private readonly seed = uniqueSeed();

private lockIndex = 0;
private isLocked = false;

constructor(key: string, db: Database, version: number = 0) {
this.db = db;
this.key = key;
this.version = version;
this.layer = this.db.get(LockLayer);
}

@transactional
async tryLock(ctx: Context, timeout: number = 30000) {
let res = await this.layer.tryLock(ctx, this.key, this.seed, { timeout, version: this.version });
if (res !== false) {
this.lockIndex = res;
this.isLocked = true;
return true;
} else {
this.lockIndex = -1;
this.isLocked = false;
return false;
}
}

@transactional
async refresh(ctx: Context, timeout: number = 30000) {
if (!this.isLocked) {
return false;
}
let res = await this.layer.tryLock(ctx, this.key, this.seed, { timeout, version: this.version, refreshIndex: this.lockIndex });
if (res !== false) {
return true;
} else {
return false;
}
}

async releaseLock(ctx: Context) {
if (!this.isLocked) {
return;
}
return await this.layer.releaseLock(ctx, this.key, this.seed);
}
}
@@ -1,6 +1,8 @@
import { LockLayer, DistributedLock } from './LockLayer';
import { LockLayer } from './LockLayer';
import { DistributedLock } from './DistributedLock';
import { Database } from '@openland/foundationdb';
import { createNamedContext } from '@openland/context';
import { delay } from '@openland/foundationdb-utils';

describe('LockLayer', () => {
it('should lock', async () => {
@@ -15,4 +17,17 @@ describe('LockLayer', () => {
res = await lock1.tryLock(ctx);
expect(res).toBe(true);
});
it('should expire lock', async () => {
let ctx = createNamedContext('test');
let db = await Database.openTest({ layers: [new LockLayer()] });
let lock1 = new DistributedLock('lockkey1', db);
let lock2 = new DistributedLock('lockkey1', db);
let res = await lock1.tryLock(ctx, 10);
expect(res).toBe(true);
await delay(50);
res = await lock2.tryLock(ctx, 100);
expect(res).toBe(true);
res = await lock1.tryLock(ctx);
expect(res).toBe(false);
});
});
@@ -5,6 +5,8 @@ import { uniqueSeed } from '@openland/foundationdb-utils';
interface LockRecord {
version: number;
minVersion: number;

index: number;
seed: string;
timeout: number;
}
@@ -14,7 +16,7 @@ export class LockLayer extends BaseLayer {
private locksSubspace!: Subspace<Tuple[], LockRecord>;

@transactional
async tryLock(ctx: Context, key: string, seed: string, opts?: { version?: number, timeout?: number }) {
async tryLock(ctx: Context, key: string, seed: string, opts?: { version?: number, timeout?: number, refreshIndex?: number }): Promise<false | number> {

let version = 0;
if (opts && opts.version) {
@@ -27,31 +29,40 @@ export class LockLayer extends BaseLayer {

let existing = await this.locksSubspace.get(ctx, [key]);
let now = Date.now();
let currentTimeout = now + 30 * 1000;
let currentTimeout = now + timeout;
if (existing !== null) {
// If current version is less than current required minimum
if (existing.minVersion > version) {
return false;
}

// Fence token
if (opts && opts.refreshIndex && existing.index !== opts.refreshIndex) {
return false;
}

// Locking
if (existing.seed === seed || existing.timeout < now) {
existing.seed = seed;
existing.timeout = currentTimeout;
existing.version = version;
existing.minVersion = version;
if (existing.seed !== seed) {
existing.index++;
}
this.locksSubspace.set(ctx, [key], existing);
return true;
return existing.index;
} else {
// Bump minumum version if needed
// Bump minumum version if needed (why?)
if (version > existing.minVersion!!) {
existing.minVersion = version;
this.locksSubspace.set(ctx, [key], existing);
}
return false;
}
} else {
this.locksSubspace.set(ctx, [key], { version: version, minVersion: version, seed: seed, timeout: currentTimeout });
return true;
this.locksSubspace.set(ctx, [key], { version: version, minVersion: version, seed: seed, timeout: currentTimeout, index: 1 });
return 1;
}
}

@@ -77,28 +88,4 @@ export class LockLayer extends BaseLayer {
.withKeyEncoding(encoders.tuple)
.withValueEncoding(encoders.json);
}
}

export class DistributedLock {
readonly db: Database;
readonly layer: LockLayer;
readonly key: string;
readonly version: number;

private readonly seed = uniqueSeed();

constructor(key: string, db: Database, version: number = 0) {
this.db = db;
this.key = key;
this.version = version;
this.layer = this.db.get(LockLayer);
}

async tryLock(ctx: Context, timeout: number = 30000) {
return await this.layer.tryLock(ctx, this.key, this.seed, { timeout, version: this.version });
}

async releaseLock(ctx: Context) {
return await this.layer.releaseLock(ctx, this.key, this.seed);
}
}
@@ -1,2 +1,2 @@
export { DistributedLock } from './LockLayer';
export { DistributedLock } from './DistributedLock';
export { LockLayer } from './LockLayer';
@@ -10,9 +10,8 @@ export function singletonWorker(config: { db: Database, name: string, version?:
let wasStarted = false;
let layer = config.db.get(SingletonWorkerLayer);
let ctx = createNamedContext('singleton-' + config.name);
let lock = new DistributedLock('worker_' + config.name, config.db, config.version);
let timeout = 30000;
let refreshInterval = 15000;
let refreshInterval = 5000;
let awaiter: (() => void) | undefined;

let workLoop = foreverBreakable(async () => {
@@ -23,34 +22,40 @@ export function singletonWorker(config: { db: Database, name: string, version?:
let res = await (async () => {

// Trying to acquire lock
let lock = new DistributedLock('worker_' + config.name, config.db, config.version);
if (!(await lock.tryLock(ctx, timeout))) {
return false;
}

let locked = true;

// Update lock loop
// Refresh lock loop
// tslint:disable-next-line:no-floating-promises
(async () => {
while (locked) {
if (!(await lock.tryLock(ctx, timeout))) {
if (!(await lock.refresh(ctx, timeout))) {
locked = false;
break;
}
await delay(refreshInterval);
}
})();

// Working
// Work loop
while (locked && working) {
try {
await worker(ctx);
} catch (e) {
locked = false;
await lock.releaseLock(ctx);
throw e;
}
await delay(100);
}

// Release Lock
await lock.releaseLock(ctx);

return true;
})();
if (!working) {
@@ -75,7 +80,6 @@ export function singletonWorker(config: { db: Database, name: string, version?:
awaiter = undefined;
}
await workLoop.stop();
await lock.releaseLock(sctx);
};

layer.registerWorker(shutdown);
@@ -29,8 +29,8 @@
},
"exclude": [
"node_modules/**",
"packages/*/lib/**",
"packages/*/node_modules/**",
"**/lib/**",
"**/node_modules/**",
"**/*.spec.ts"
]
}
@@ -22,7 +22,6 @@
"label-position": true,
"max-line-length": [ false, 120 ],
"member-ordering": [
true,
"public-before-private",
"static-before-instance",
"variables-before-functions"

0 comments on commit bbba6be

Please sign in to comment.
You can’t perform that action at this time.