Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres): native upsert #12301

Merged
merged 4 commits into from May 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/dialects/abstract/index.js
Expand Up @@ -7,7 +7,6 @@ AbstractDialect.prototype.supports = {
'DEFAULT VALUES': false,
'VALUES ()': false,
'LIMIT ON UPDATE': false,
'ON DUPLICATE KEY': true,
sushantdhiman marked this conversation as resolved.
Show resolved Hide resolved
'ORDER NULLS': false,
'UNION': true,
'UNION ALL': true,
Expand Down
42 changes: 20 additions & 22 deletions lib/dialects/abstract/query-generator.js
Expand Up @@ -3,7 +3,6 @@
const util = require('util');
const _ = require('lodash');
const uuidv4 = require('uuid').v4;
const semver = require('semver');

const Utils = require('../../utils');
const deprecations = require('../../utils/deprecations');
Expand Down Expand Up @@ -179,6 +178,20 @@ class QueryGenerator {
}
}

let onDuplicateKeyUpdate = '';

if (this._dialect.supports.inserts.updateOnDuplicate && options.updateOnDuplicate) {
if (this._dialect.supports.inserts.updateOnDuplicate == ' ON CONFLICT DO UPDATE SET') { // postgres / sqlite
// If no conflict target columns were specified, use the primary key names from options.upsertKeys
const conflictKeys = options.upsertKeys.map(attr => this.quoteIdentifier(attr));
const updateKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=EXCLUDED.${this.quoteIdentifier(attr)}`);
SimonSchick marked this conversation as resolved.
Show resolved Hide resolved
onDuplicateKeyUpdate = ` ON CONFLICT (${conflictKeys.join(',')}) DO UPDATE SET ${updateKeys.join(',')}`;
} else {
const valueKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=VALUES(${this.quoteIdentifier(attr)})`);
onDuplicateKeyUpdate += `${this._dialect.supports.inserts.updateOnDuplicate} ${valueKeys.join(',')}`;
}
}

const replacements = {
ignoreDuplicates: options.ignoreDuplicates ? this._dialect.supports.inserts.ignoreDuplicates : '',
onConflictDoNothing: options.ignoreDuplicates ? this._dialect.supports.inserts.onConflictDoNothing : '',
Expand All @@ -188,8 +201,8 @@ class QueryGenerator {
tmpTable
};

valueQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable} (${replacements.attributes})${replacements.output} VALUES (${replacements.values})${replacements.onConflictDoNothing}${valueQuery}`;
emptyQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable}${replacements.output}${replacements.onConflictDoNothing}${emptyQuery}`;
valueQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable} (${replacements.attributes})${replacements.output} VALUES (${replacements.values})${onDuplicateKeyUpdate}${replacements.onConflictDoNothing}${valueQuery}`;
emptyQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable}${replacements.output}${onDuplicateKeyUpdate}${replacements.onConflictDoNothing}${emptyQuery}`;

// Mostly for internal use, so we expect the user to know what he's doing!
// pg_temp functions are private per connection, so we never risk this function interfering with another one.
Expand All @@ -200,31 +213,16 @@ class QueryGenerator {
returningModelAttributes.push('*');
}

if (semver.gte(this.sequelize.options.databaseVersion, '9.2.0')) {
// >= 9.2 - Use a UUID but prefix with 'func_' (numbers first not allowed)
const delimiter = `$func_${uuidv4().replace(/-/g, '')}$`;
const selectQuery = `SELECT (testfunc.response).${returningModelAttributes.join(', (testfunc.response).')}, testfunc.sequelize_caught_exception FROM pg_temp.testfunc();`;
const delimiter = `$func_${uuidv4().replace(/-/g, '')}$`;
const selectQuery = `SELECT (testfunc.response).${returningModelAttributes.join(', (testfunc.response).')}, testfunc.sequelize_caught_exception FROM pg_temp.testfunc();`;

options.exception = 'WHEN unique_violation THEN GET STACKED DIAGNOSTICS sequelize_caught_exception = PG_EXCEPTION_DETAIL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc(OUT response ${quotedTable}, OUT sequelize_caught_exception text) RETURNS RECORD AS ${delimiter
} BEGIN ${valueQuery} RETURNING * INTO response; EXCEPTION ${options.exception} END ${delimiter} LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
} else {
const selectQuery = `SELECT ${returningModelAttributes.join(', ')} FROM pg_temp.testfunc();`;

options.exception = 'WHEN unique_violation THEN NULL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF ${quotedTable} AS $body$ BEGIN RETURN QUERY ${valueQuery
} RETURNING *; EXCEPTION ${options.exception} END; $body$ LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
}
options.exception = 'WHEN unique_violation THEN GET STACKED DIAGNOSTICS sequelize_caught_exception = PG_EXCEPTION_DETAIL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc(OUT response ${quotedTable}, OUT sequelize_caught_exception text) RETURNS RECORD AS ${delimiter} BEGIN ${valueQuery} RETURNING * INTO response; EXCEPTION ${options.exception} END ${delimiter} LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
papb marked this conversation as resolved.
Show resolved Hide resolved
} else {
valueQuery += returningFragment;
emptyQuery += returningFragment;
}

if (this._dialect.supports['ON DUPLICATE KEY'] && options.onDuplicate) {
valueQuery += ` ON DUPLICATE KEY ${options.onDuplicate}`;
emptyQuery += ` ON DUPLICATE KEY ${options.onDuplicate}`;
}

query = `${replacements.attributes.length ? valueQuery : emptyQuery};`;
if (identityWrapperRequired && this._dialect.supports.autoIncrement.identityInsert) {
query = `SET IDENTITY_INSERT ${quotedTable} ON; ${query} SET IDENTITY_INSERT ${quotedTable} OFF;`;
Expand Down
84 changes: 31 additions & 53 deletions lib/dialects/abstract/query-interface.js
Expand Up @@ -6,7 +6,6 @@ const Utils = require('../../utils');
const DataTypes = require('../../data-types');
const Transaction = require('../../transaction');
const QueryTypes = require('../../query-types');
const Op = require('../../operators');

/**
* The interface that Sequelize uses to talk to all databases
Expand Down Expand Up @@ -744,72 +743,51 @@ class QueryInterface {
* @param {string} tableName table to upsert on
* @param {object} insertValues values to be inserted, mapped to field name
* @param {object} updateValues values to be updated, mapped to field name
* @param {object} where various conditions
* @param {Model} model Model to upsert on
* @param {object} where where conditions, which can be used for UPDATE part when INSERT fails
* @param {object} options query options
*
* @returns {Promise<boolean,?number>} Resolves an array with <created, primaryKey>
*/
async upsert(tableName, insertValues, updateValues, where, model, options) {
const wheres = [];
const attributes = Object.keys(insertValues);
let indexes = [];
let indexFields;

async upsert(tableName, insertValues, updateValues, where, options) {
options = { ...options };

if (!Utils.isWhereEmpty(where)) {
wheres.push(where);
}
const model = options.model;
const primaryKeys = Object.values(model.primaryKeys).map(item => item.field);
const uniqueKeys = Object.values(model.uniqueKeys).filter(c => c.fields.length >= 1).map(c => c.fields);
const indexKeys = Object.values(model._indexes).filter(c => c.unique && c.fields.length >= 1).map(c => c.fields);

// Lets combine unique keys and indexes into one
indexes = _.map(model.uniqueKeys, value => {
return value.fields;
});

model._indexes.forEach(value => {
if (value.unique) {
// fields in the index may both the strings or objects with an attribute property - lets sanitize that
indexFields = value.fields.map(field => {
if (_.isPlainObject(field)) {
return field.attribute;
}
return field;
});
indexes.push(indexFields);
options.type = QueryTypes.UPSERT;
options.updateOnDuplicate = Object.keys(updateValues);
options.upsertKeys = [];

// For fields in updateValues, try to find a constraint or unique index
// that includes given field. Only first matching upsert key is used.
for (const field of options.updateOnDuplicate) {
const uniqueKey = uniqueKeys.find(fields => fields.includes(field));
if (uniqueKey) {
options.upsertKeys = uniqueKey;
break;
}
});

for (const index of indexes) {
if (_.intersection(attributes, index).length === index.length) {
where = {};
for (const field of index) {
where[field] = insertValues[field];
}
wheres.push(where);
const indexKey = indexKeys.find(fields => fields.includes(field));
if (indexKey) {
options.upsertKeys = indexKey;
break;
}
}

where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;
// Always use PK, if no constraint available OR update data contains PK
if (
options.upsertKeys.length === 0
|| _.intersection(options.updateOnDuplicate, primaryKeys).length
) {
options.upsertKeys = primaryKeys;
}

const sql = this.queryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
const result = await this.sequelize.query(sql, options);
return this._convertUpsertResult(result, model);
}
options.upsertKeys = _.uniq(options.upsertKeys);

/**
* Converts raw upsert result to API contract.
*
* @param {object} result
* @param {Model} model
* @protected
*/
// eslint-disable-next-line no-unused-vars
_convertUpsertResult(result, model) {
return [result, undefined];
const sql = this.queryGenerator.insertQuery(tableName, insertValues, model.rawAttributes, options);
return await this.sequelize.query(sql, options);
}

/**
Expand Down
7 changes: 5 additions & 2 deletions lib/dialects/mariadb/query.js
Expand Up @@ -92,10 +92,12 @@ class Query extends AbstractQuery {
formatResults(data) {
let result = this.instance;

if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()
|| this.isUpsertQuery()) {
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
return data.affectedRows;
}
if (this.isUpsertQuery()) {
return [null, data.affectedRows === 1];
}
if (this.isInsertQuery(data)) {
this.handleInsertQuery(data);

Expand All @@ -116,6 +118,7 @@ class Query extends AbstractQuery {
}
return [result, data.affectedRows];
}

return [data[this.getInsertIdField()], data.affectedRows];
}
}
Expand Down
42 changes: 37 additions & 5 deletions lib/dialects/mssql/query-interface.js
@@ -1,5 +1,10 @@
'use strict';

const _ = require('lodash');

const Utils = require('../../utils');
const QueryTypes = require('../../query-types');
const Op = require('../../operators');
const { QueryInterface } = require('../abstract/query-interface');

/**
Expand Down Expand Up @@ -42,11 +47,38 @@ class MSSqlQueryInterface extends QueryInterface {
/**
* @override
*/
_convertUpsertResult(result, model) {
return [
result.$action === 'INSERT',
result[model.primaryKeyField]
];
async upsert(tableName, insertValues, updateValues, where, options) {
const model = options.model;
const wheres = [];

options = { ...options };

if (!Utils.isWhereEmpty(where)) {
wheres.push(where);
}

// Lets combine unique keys and indexes into one
let indexes = Object.values(model.uniqueKeys).map(item => item.fields);
indexes = indexes.concat(Object.values(model._indexes).filter(item => item.unique).map(item => item.fields));

const attributes = Object.keys(insertValues);
for (const index of indexes) {
if (_.intersection(attributes, index).length === index.length) {
where = {};
for (const field of index) {
where[field] = insertValues[field];
}
wheres.push(where);
}
}

where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;

const sql = this.queryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
return await this.sequelize.query(sql, options);
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/dialects/mssql/query.js
Expand Up @@ -206,9 +206,6 @@ class Query extends AbstractQuery {
if (this.isShowIndexesQuery()) {
return this.handleShowIndexesQuery(data);
}
if (this.isUpsertQuery()) {
return data[0];
}
if (this.isCallQuery()) {
return data[0];
}
Expand All @@ -224,6 +221,9 @@ class Query extends AbstractQuery {
if (this.isForeignKeysQuery()) {
return data;
}
if (this.isUpsertQuery()) {
return [result, data[0].$action === 'INSERT'];
}
if (this.isInsertQuery() || this.isUpdateQuery()) {
return [result, rowCount];
}
Expand Down
11 changes: 0 additions & 11 deletions lib/dialects/mysql/query-generator.js
Expand Up @@ -293,17 +293,6 @@ class MySQLQueryGenerator extends AbstractQueryGenerator {
return value;
}

upsertQuery(tableName, insertValues, updateValues, where, model, options) {
options.onDuplicate = 'UPDATE ';

options.onDuplicate += Object.keys(updateValues).map(key => {
key = this.quoteIdentifier(key);
return `${key}=VALUES(${key})`;
}).join(', ');

return this.insertQuery(tableName, insertValues, model.rawAttributes, options);
}

truncateTableQuery(tableName) {
return `TRUNCATE ${this.quoteTable(tableName)}`;
}
Expand Down
22 changes: 15 additions & 7 deletions lib/dialects/mysql/query-interface.js
Expand Up @@ -2,6 +2,7 @@

const sequelizeErrors = require('../../errors');
const { QueryInterface } = require('../abstract/query-interface');
const QueryTypes = require('../../query-types');

/**
* The interface that Sequelize uses to talk with MySQL/MariaDB database
Expand Down Expand Up @@ -37,6 +38,20 @@ class MySQLQueryInterface extends QueryInterface {
);
}

/**
* @override
*/
async upsert(tableName, insertValues, updateValues, where, options) {
options = { ...options };

options.type = QueryTypes.UPSERT;
options.updateOnDuplicate = Object.keys(updateValues);

const model = options.model;
const sql = this.queryGenerator.insertQuery(tableName, insertValues, model.rawAttributes, options);
return await this.sequelize.query(sql, options);
}

/**
* @override
*/
Expand Down Expand Up @@ -69,13 +84,6 @@ class MySQLQueryInterface extends QueryInterface {

return await this.sequelize.query(query, options);
}

/**
* @override
*/
_convertUpsertResult(result) {
return [result === 1, undefined];
}
}

exports.MySQLQueryInterface = MySQLQueryInterface;
5 changes: 4 additions & 1 deletion lib/dialects/mysql/query.js
Expand Up @@ -138,7 +138,7 @@ class Query extends AbstractQuery {
if (this.isCallQuery()) {
return data[0];
}
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery() || this.isUpsertQuery()) {
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
return data.affectedRows;
}
if (this.isVersionQuery()) {
Expand All @@ -147,6 +147,9 @@ class Query extends AbstractQuery {
if (this.isForeignKeysQuery()) {
return data;
}
if (this.isUpsertQuery()) {
return [result, data.affectedRows === 1];
}
if (this.isInsertQuery() || this.isUpdateQuery()) {
return [result, data.affectedRows];
}
Expand Down