Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed locking capability #5

Merged
merged 3 commits into from
Apr 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"typings": "lib/src/index.d.ts",
"scripts": {
"test": "npm-run-all --parallel test:lint test:unit",
"test:unit": "mocha --compilers ts:ts-node/register --timeout 20000 -r test/_setup.ts test/*.test.ts",
"test:unit": "TS_NODE_COMPILER_OPTIONS='{\"target\":\"es6\"}' mocha --compilers ts:ts-node/register --timeout 20000 -r test/_setup.ts test/*.test.ts",
"test:lint": "tslint --type-check --project tsconfig.json '{src,test}/**/*.ts'",
"update-proto": "node ./bin/update-proto ./proto && node bin/generate-methods.js ./proto/rpc.proto > src/rpc.ts",
"build:doc": "rm -rf docs && typedoc --exclude \"**/test/*\" --excludePrivate --out ./docs ./src/index.ts && node bin/tame-typedoc",
Expand Down
9 changes: 7 additions & 2 deletions src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,20 @@ export class ComparatorBuilder {
* Adds a new clause to the transaction.
*/
public and(key: string | Buffer, column: keyof typeof compareTarget,
cmp: keyof typeof comparator, value: string | Buffer): this {
cmp: keyof typeof comparator, value: string | Buffer | number): this {
assertWithin(compareTarget, column, 'comparison target in client.and(...)');
assertWithin(comparator, cmp, 'comparator in client.and(...)');

if (column === 'value') {
value = toBuffer(<string | Buffer> value);
}

this.request.compare = this.request.compare || [];
this.request.compare.push({
key: toBuffer(key),
result: comparator[cmp],
target: compareTarget[column].value,
[compareTarget[column].key]: toBuffer(value),
[compareTarget[column].key]: typeof value === 'number' ? value : toBuffer(value),
});
return this;
}
Expand Down
48 changes: 11 additions & 37 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,33 @@
* A GRPCGenericError is rejected via the connection when some error occurs
* that we can't be more specific about.
*/
export class GRPCGenericError extends Error {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCGenericError.prototype);
}
}
export class GRPCGenericError extends Error {}

/**
* GRPCConnectFailed is thrown when connecting to GRPC fails.
*/
export class GRPCConnectFailedError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCConnectFailedError.prototype);
}
}
export class GRPCConnectFailedError extends GRPCGenericError {}

/**
* GRPCProtocolError is thrown when a protocol error occurs on the other end,
* indicating that the external implementation is incorrect or incompatible.
*/
export class GRPCProtocolError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCProtocolError.prototype);
}
}
export class GRPCProtocolError extends GRPCGenericError {}

/**
* GRPCInternalError is thrown when a internal error occurs on either end.
*/
export class GRPCInternalError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCInternalError.prototype);
}
}
export class GRPCInternalError extends GRPCGenericError {}

/**
* GRPCCancelledError is emitted when an ongoing call is cancelled.
*/
export class GRPCCancelledError extends GRPCGenericError {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, GRPCCancelledError.prototype);
}
}
export class GRPCCancelledError extends GRPCGenericError {}

/**
* EtcdError is an application error returned by etcd.
*/
export class EtcdError extends Error {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, EtcdError.prototype);
}
}
export class EtcdError extends Error {}

/**
* EtcdLeaseTimeoutError is thrown when trying to renew a lease that's
Expand All @@ -68,10 +38,14 @@ export class EtcdError extends Error {
export class EtcdLeaseInvalidError extends Error {
constructor(leaseID: string) {
super(`Lease ${leaseID} is expired or revoked`);
Object.setPrototypeOf(this, EtcdLeaseInvalidError.prototype);
}
}

/**
* EtcdLockFailedError is thrown when we fail to aquire a lock.
*/
export class EtcdLockFailedError extends Error {}

interface IErrorCtor {
new (message: string): Error;
}
Expand Down
11 changes: 10 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Builder from './builder';
import { ConnectionPool } from './connection-pool';
import { Lease } from './lease';
import { Lock } from './lock';
import { IOptions } from './options';
import * as RPC from './rpc';

Expand Down Expand Up @@ -76,13 +77,21 @@ export class Etcd3 {
return new Lease(this.pool, ttl);
}

/**
* `lock()` is a helper to provide distributed locking capability. See
* the documentation on the Lock class for more information and examples.
*/
public lock(key: string | Buffer): Lock {
return new Lock(this.pool, key);
}

/**
* `if()` starts a new etcd transaction, which allows you to execute complex
* statements atomically. See documentation on the ComparatorBuilder for
* more information.
*/
public if(key: string | Buffer, column: keyof typeof Builder.compareTarget,
cmp: keyof typeof Builder.comparator, value: string | Buffer): Builder.ComparatorBuilder {
cmp: keyof typeof Builder.comparator, value: string | Buffer | number): Builder.ComparatorBuilder {
return new Builder.ComparatorBuilder(this.kv).and(key, column, cmp, value);
}

Expand Down
95 changes: 95 additions & 0 deletions src/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { EtcdLockFailedError } from './';
import { ComparatorBuilder, PutBuilder } from './builder';
import { ConnectionPool } from './connection-pool';
import { Lease } from './lease';
import * as RPC from './rpc';

/**
* A Lock can be used for distributed locking to create atomic operations
* across multiple systems. An EtcdLockFailedError is thrown if the lock
* can't be acquired.
*
* Under the hood, the Lock uses a lease on a key which is revoked when the
* the lock is released. If the server the lock is running on dies, or the
* network is disconnected, etcd will time out the lock.
*
* Bear in mind that this means that in certain rare situations (a network
* disconnect or wholesale etcd failure), the caller may lose the lock while
* operations may still be running.
*
* A quick example:
*
* ```
* const { Etcd3 } = require('etcd3');
* const client = new Etcd3();
*
* client.lock('my_resource').do(() => {
* // The lock will automatically be released when this promise returns
* return doMyAtomicAction();
* });
* ```
*/
export class Lock {

private leaseTTL = 30;
private lease: Lease | null;

constructor(private pool: ConnectionPool, private key: string | Buffer) {}

/**
* Sets the TTL of the lease underlying the lock. The lease TTL defaults
* to 30 seconds.
*/
public ttl(seconds: number): this {
if (!this.lease) {
throw new Error('Cannot set a lock TTL after acquiring the lock');
}

this.leaseTTL = seconds;
return this;
}

/**
* Acquire attempts to acquire the lock, rejecting if it's unable to.
*/
public acquire(): Promise<void> {
const lease = this.lease = new Lease(this.pool, this.leaseTTL);
const kv = new RPC.KVClient(this.pool);

return lease.grant().then(leaseID => {
return new ComparatorBuilder(kv)
.and(this.key, 'createdAt', '==', 0)
.then(new PutBuilder(kv, this.key).value('').lease(leaseID))
.commit()
.then(res => {
if (!res.succeeded) {
this.release();
throw new EtcdLockFailedError(`Failed to acquire a lock on ${this.key}`);
}
});
});
}

/**
* Release frees the lock.
*/
public release(): Promise<void> {
if (!this.lease) {
throw new Error('Attempted to release a lock which was not acquired');
}

return this.lease.revoke();
}

/**
* `do()` wraps the inner function. It acquires the lock before running
* the function, and releases the lock after any promise the function
* returns resolves or throws.
*/
public do<T>(fn: () => T | Promise<T>): Promise<T> {
return this.acquire()
.then(fn)
.then(value => this.release().then(() => value))
.catch(err => this.release().then(() => { throw err; }));
}
}
31 changes: 31 additions & 0 deletions test/kv.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from 'sinon';
import {
Etcd3,
EtcdLeaseInvalidError,
EtcdLockFailedError,
GRPCConnectFailedError,
Lease,
} from '../src';
Expand Down Expand Up @@ -258,5 +259,35 @@ describe('connection pool', () => {
expect(await client.get('foo1').string()).to.equal('bar3');
});
});

describe('lock()', () => {
const assertCantLock = () => {
return client.lock('resource')
.acquire()
.then(() => { throw new Error('expected to throw'); })
.catch(err => expect(err).to.be.an.instanceof(EtcdLockFailedError));
};

const assertAbleToLock = async () => {
const lock = client.lock('resource');
await lock.acquire();
await lock.release();
};

it('locks exclusively around a resource', async () => {
const lock1 = client.lock('resource');
await lock1.acquire();

await assertCantLock();
await lock1.release();

await assertAbleToLock();
});

it('provides locking around functions', async () => {
await client.lock('resource').do(assertCantLock);
await assertAbleToLock();
});
});
});
});