Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement "skipLocked()" and "noWait()" #2961

Merged
merged 20 commits into from Jul 6, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
150fb08
Implement "skip locked" and "nowait" for Postgres dialect
ricmzn Dec 14, 2018
b9dd2c6
Fix error with duplicate skipLocked or noWait call
ricmzn Dec 19, 2018
21775f9
Fix noWait test swallowing errors on non-postgres databases
ricmzn Dec 19, 2018
0f13016
Implement skipLocked and noWait for mysql-based clients
ricmzn Dec 19, 2018
d73663a
Disable skipLocked test on MySQL
ricmzn Jan 8, 2019
dbac68e
Restrict noWait test to supported databases
ricmzn Jan 8, 2019
71fd14b
Update knex.d.ts with skipLocked and noWait
ricmzn Jan 8, 2019
de3b4cc
Adjust noWait test for MySQL 8.0
ricmzn Jan 8, 2019
94de977
Disable noWait test on MySQL
ricmzn Jan 8, 2019
7db84fa
Increase query builder test coverage for MySQL
ricmzn Jan 8, 2019
e49c4b7
Remove MySQL support for skipLocked() and noWait()
ricmzn Mar 10, 2019
f32488f
Add errors for databases that don't support skipLocked() and noWait()
ricmzn Mar 12, 2019
6b44e37
Merge remote-tracking branch 'upstream/master' into feature/skip-locked
ricmzn Jun 20, 2019
dcaff9e
Update test comments
ricmzn Jun 21, 2019
18cccab
Restore MySQL tests for #2961
ricmzn Jun 21, 2019
e9b152d
Simplify query builder checks on first(), noWait() and skipLocked()
ricmzn Jun 21, 2019
2a92c9d
Add some failing tests for PR #2961
ricmzn Jun 21, 2019
6eb290a
Replace broken test on MySQL with an extra test for #2961
ricmzn Jun 21, 2019
b22cd7b
Adjust the #2961 tests to use async/await
ricmzn Jun 29, 2019
c0b1cd4
Move some magic query builder values into constants
ricmzn Jul 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/dialects/mysql/query/compiler.js
Expand Up @@ -48,6 +48,16 @@ assign(QueryCompiler_MySQL.prototype, {
return 'lock in share mode';
},

// Only supported on MySQL 8.0+
skipLocked() {
return 'skip locked';
},

// Supported on MySQL 8.0+ and MariaDB 10.3.0+
noWait() {
return 'nowait';
},

// Compiles a `columnInfo` query.
columnInfo() {
const column = this.single.columnInfo;
Expand Down
8 changes: 8 additions & 0 deletions src/dialects/postgres/query/compiler.js
Expand Up @@ -103,6 +103,14 @@ assign(QueryCompiler_PG.prototype, {
);
},

skipLocked() {
return 'skip locked';
},

noWait() {
return 'nowait';
},

// Compiles a columnInfo query
columnInfo() {
const column = this.single.columnInfo;
Expand Down
50 changes: 46 additions & 4 deletions src/query/builder.js
Expand Up @@ -950,10 +950,8 @@ assign(Builder.prototype, {
// Sets the values for a `select` query, informing that only the first
// row should be returned (limit 1).
first() {
const { _method } = this;

if (!includes(['pluck', 'first', 'select'], _method)) {
throw new Error(`Cannot chain .first() on "${_method}" query!`);
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .first() on "${this._method}" query!`);
}

const args = new Array(arguments.length);
Expand Down Expand Up @@ -1093,6 +1091,40 @@ assign(Builder.prototype, {
return this;
},

// Skips locked rows when using a lock constraint.
skipLocked() {
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .skipLocked() on "${this._method}" query!`);
}
if (!this._hasLockMode()) {
throw new Error(
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
);
}
if (this._single.waitMode === 'noWait') {
throw new Error('.skipLocked() cannot be used together with .noWait()!');
}
this._single.waitMode = 'skipLocked';
return this;
},

// Causes error when acessing a locked row instead of waiting for it to be released.
noWait() {
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .noWait() on "${this._method}" query!`);
}
if (!this._hasLockMode()) {
throw new Error(
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
);
}
if (this._single.waitMode === 'skipLocked') {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably these could be constants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I can't find a good place for query builder constants. Is it worth it to create one now, or would this be more fitting for a future refactor?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't get much refactoring PRs, as people have no good reason for submitting these :D. Would be awesome if you created something for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it a separate module in c0b1cd4, how do you like it?

throw new Error('.noWait() cannot be used together with .skipLocked()!');
}
this._single.waitMode = 'noWait';
return this;
},

// Takes a JS object of methods to call and calls them
fromJS(obj) {
each(obj, (val, key) => {
Expand Down Expand Up @@ -1179,6 +1211,16 @@ assign(Builder.prototype, {
_clearGrouping(grouping) {
this._statements = reject(this._statements, { grouping });
},

// Helper function that checks if the builder will emit a select query
_isSelectQuery() {
return includes(['pluck', 'first', 'select'], this._method);
},

// Helper function that checks if the query has a lock mode set
_hasLockMode() {
return includes(['forShare', 'forUpdate'], this._single.lock);
},
});

Object.defineProperty(Builder.prototype, 'or', {
Expand Down
22 changes: 22 additions & 0 deletions src/query/compiler.js
Expand Up @@ -48,6 +48,7 @@ const components = [
'limit',
'offset',
'lock',
'waitMode',
];

assign(QueryCompiler.prototype, {
Expand Down Expand Up @@ -542,6 +543,27 @@ assign(QueryCompiler.prototype, {
}
},

// Compiles the wait mode on the locks.
waitMode() {
if (this.single.waitMode) {
return this[this.single.waitMode]();
}
},

// Fail on unsupported databases
skipLocked() {
throw new Error(
'.skipLocked() is currently only supported on MySQL 8.0+ and PostgreSQL 9.5+'
);
},

// Fail on unsupported databases
noWait() {
throw new Error(
'.noWait() is currently only supported on MySQL 8.0+, MariaDB 10.3.0+ and PostgreSQL 9.5+'
);
},

// On Clause
// ------

Expand Down
136 changes: 136 additions & 0 deletions test/integration/builder/selects.js
Expand Up @@ -1247,5 +1247,141 @@ module.exports = function(knex) {
});
});
});

it('forUpdate().skipLocked() with order by should return the first non-locked row', function() {
// Note: this test doesn't work properly on MySQL - see https://bugs.mysql.com/bug.php?id=67745
if (knex.client.driverName !== 'pg') {
return;
}

const rowName = 'row for skipLocked() test #1';
return knex('test_default_table')
.insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
])
.then(() => {
return knex
.transaction((trx) => {
// lock only the first row from this test
return trx('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.first()
.forUpdate()
.then((res) => {
// try to lock the next available row
return knex('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.forUpdate()
.skipLocked()
.first();
});
})
.then((res) => {
expect(res.tinyint).to.equal(2);
});
});
});

it('forUpdate().skipLocked() should return an empty set when all rows are locked', function() {
if (
knex.client.driverName !== 'pg' &&
knex.client.driverName !== 'mysql'
) {
return;
}

const rowName = 'row for skipLocked() test #2';
return knex('test_default_table')
.insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
])
.then(() => {
return knex
.transaction((trx) => {
// lock all of the test rows
return trx('test_default_table')
.where({ string: rowName })
.forUpdate()
.then((res) => {
// try to aquire the lock on at least one more row (which isn't available)
return knex('test_default_table')
.where({ string: rowName })
.forUpdate()
.skipLocked()
.limit(1);
});
})
.then((res) => {
expect(res).to.be.empty;
});
});
});

it('forUpdate().noWait() should reject immediately when a row is locked', function() {
if (
knex.client.driverName !== 'pg' &&
ricmzn marked this conversation as resolved.
Show resolved Hide resolved
knex.client.driverName !== 'mysql'
) {
return;
}

const rowName = 'row for noWait() test';
return knex('test_default_table')
.insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
])
.then(() => {
ricmzn marked this conversation as resolved.
Show resolved Hide resolved
return knex
.transaction((trx) => {
// select and lock only the first row from this test
return trx('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.first()
.forUpdate()
.then((res) => {
// try to lock the second row
return knex('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.forUpdate()
.noWait()
.first();
});
})
.then((res) => {
expect(
'The query should have been cancelled when trying to select a locked row'
).to.be.false;
})
.catch((err) => {
switch (knex.client.driverName) {
case 'pg':
expect(err.message).to.contain(
'could not obtain lock on row'
);
break;
case 'mysql':
ricmzn marked this conversation as resolved.
Show resolved Hide resolved
case 'mysql2':
// mysql
expect(err.message).to.contain(
'lock(s) could not be acquired immediately'
);
// mariadb
// TODO: detect if test is being run on mysql or mariadb to check for the correct error message
// expect(err.message).to.contain('Lock wait timeout exceeded');
break;
default:
// unsupported database
throw err;
}
});
});
});
});
};
80 changes: 80 additions & 0 deletions test/unit/query/builder.js
Expand Up @@ -6998,6 +6998,86 @@ describe('QueryBuilder', function() {
);
});

it('lock for update with skip locked #1937', function() {
testsql(
qb()
.select('*')
.from('foo')
.first()
.forUpdate()
.skipLocked(),
{
mysql: {
sql: 'select * from `foo` limit ? for update skip locked',
bindings: [1],
},
pg: {
sql: 'select * from "foo" limit ? for update skip locked',
bindings: [1],
},
}
);
});

it('lock for update with nowait #1937', function() {
testsql(
qb()
.select('*')
.from('foo')
.first()
.forUpdate()
.noWait(),
{
mysql: {
sql: 'select * from `foo` limit ? for update nowait',
bindings: [1],
},
pg: {
sql: 'select * from "foo" limit ? for update nowait',
bindings: [1],
},
}
);
});

it('noWait and skipLocked require a lock mode to be set', function() {
expect(function() {
qb()
.select('*')
.noWait()
.toString();
}).to.throw(
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
);
expect(function() {
qb()
.select('*')
.skipLocked()
.toString();
}).to.throw(
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
);
});

it('skipLocked conflicts with noWait and vice-versa', function() {
expect(function() {
qb()
.select('*')
.forUpdate()
.noWait()
.skipLocked()
.toString();
}).to.throw('.skipLocked() cannot be used together with .noWait()!');
expect(function() {
qb()
.select('*')
.forUpdate()
.skipLocked()
.noWait()
.toString();
}).to.throw('.noWait() cannot be used together with .skipLocked()!');
});

it('allows insert values of sub-select, #121', function() {
testsql(
qb()
Expand Down
3 changes: 3 additions & 0 deletions types/index.d.ts
Expand Up @@ -1356,6 +1356,9 @@ declare namespace Knex {
forShare(...tableNames: string[]): QueryBuilder<TRecord, TResult>;
forShare(tableNames: string[]): QueryBuilder<TRecord, TResult>;

skipLocked(): QueryBuilder<TRecord, TResult>;
noWait(): QueryBuilder<TRecord, TResult>;

toSQL(): Sql;

on(event: string, callback: Function): QueryBuilder<TRecord, TResult>;
Expand Down