Skip to content

Commit

Permalink
feat(postgres, sqlite): add conflictWhere option to Model.bulkCreate (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wbourne0 committed Mar 24, 2023
1 parent 338ae6a commit 295c297
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 2 deletions.
25 changes: 24 additions & 1 deletion src/dialects/abstract/query-generator.js
Expand Up @@ -364,8 +364,31 @@ class QueryGenerator {
// If no conflict target columns were specified, use the primary key names from options.upsertKeys
const conflictKeys = options.upsertKeys.map(attr => this.quoteIdentifier(attr));
const updateKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=EXCLUDED.${this.quoteIdentifier(attr)}`);
onDuplicateKeyUpdate = ` ON CONFLICT (${conflictKeys.join(',')}) DO UPDATE SET ${updateKeys.join(',')}`;

let whereClause = false;
if (options.conflictWhere) {
if (!this._dialect.supports.inserts.onConflictWhere) {
throw new Error(`conflictWhere not supported for dialect ${this._dialect.name}`);
}

whereClause = this.whereQuery(options.conflictWhere, options);
}

// The Utils.joinSQLFragments later on will join this as it handles nested arrays.
onDuplicateKeyUpdate = [
'ON CONFLICT',
'(',
conflictKeys.join(','),
')',
whereClause,
'DO UPDATE SET',
updateKeys.join(',')
];
} else { // mysql / maria
if (options.conflictWhere) {
throw new Error(`conflictWhere not supported for dialect ${this._dialect.name}`);
}

const valueKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=VALUES(${this.quoteIdentifier(attr)})`);
onDuplicateKeyUpdate = `${this._dialect.supports.inserts.updateOnDuplicate} ${valueKeys.join(',')}`;
}
Expand Down
7 changes: 6 additions & 1 deletion src/model.d.ts
Expand Up @@ -1078,7 +1078,12 @@ export interface BulkCreateOptions<TAttributes = any> extends Logging, Transacti
* Return all columns or only the specified columns for the affected rows (only for postgres)
*/
returning?: boolean | (keyof TAttributes)[];

/**
* An optional parameter to specify a where clause for partial unique indexes
* (note: `ON CONFLICT WHERE` not `ON CONFLICT DO UPDATE WHERE`).
* Only supported in Postgres >= 9.5 and sqlite >= 9.5
*/
conflictWhere?: WhereOptions<TAttributes>;
/**
* Optional override for the conflict fields in the ON CONFLICT part of the query.
* Only supported in Postgres >= 9.5 and SQLite >= 3.24.0
Expand Down
260 changes: 260 additions & 0 deletions test/integration/model/bulk-create.test.js
Expand Up @@ -829,6 +829,266 @@ describe(Support.getTestDialectTeaser('Model'), () => {
expect(result.permissions).to.eql(exp.permissions);
}
});

describe('conflictWhere', () => {
const Memberships = current.define(
'memberships',
{
// ID of the member (no foreign key constraint for testing purposes)
user_id: DataTypes.INTEGER,
// ID of what the member is a member of
foreign_id: DataTypes.INTEGER,
time_deleted: DataTypes.DATE
},
{
createdAt: false,
updatedAt: false,
deletedAt: 'time_deleted',
indexes: [
{
fields: ['user_id', 'foreign_id'],
unique: true,
where: { time_deleted: null }
}
]
}
);

const options = {
conflictWhere: { time_deleted: null },
conflictAttributes: ['user_id', 'foreign_id'],
updateOnDuplicate: ['user_id', 'foreign_id', 'time_deleted']
};

beforeEach(() => Memberships.sync({ force: true }));

it('should insert items with conflictWhere', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: null
}));

const results = await Memberships.bulkCreate(
memberships,
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}
});

it('should not conflict with soft deleted memberships', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: new Date()
}));

let results = await Memberships.bulkCreate(memberships, options);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.not.eq(null);
}

results = await Memberships.bulkCreate(
memberships.map(membership => ({
...membership,
time_deleted: null
})),
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}

const count = await Memberships.count();

expect(count).to.eq(20);
});

it('should upsert existing memberships', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: i % 2 ? new Date() : null
}));

let results = await Memberships.bulkCreate(memberships, options);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
if (i % 2) {
expect(results[i].time_deleted).to.not.eq(null);
} else {
expect(results[i].time_deleted).to.eq(null);
}
}

for (const membership of memberships) {
membership.time_deleted;
}

results = await Memberships.bulkCreate(
memberships.map(membership => ({
...membership,
time_deleted: null
})),
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}

const count = await Memberships.count({ paranoid: false });

expect(count).to.eq(15);
});
});

if (
current.dialect.supports.inserts.onConflictWhere
) {
describe('conflictWhere', () => {
const Memberships = current.define(
'memberships',
{
// ID of the member (no foreign key constraint for testing purposes)
user_id: DataTypes.INTEGER,
// ID of what the member is a member of
foreign_id: DataTypes.INTEGER,
time_deleted: DataTypes.DATE
},
{
createdAt: false,
updatedAt: false,
deletedAt: 'time_deleted',
indexes: [
{
fields: ['user_id', 'foreign_id'],
unique: true,
where: { time_deleted: null }
}
]
}
);

const options = {
conflictWhere: { time_deleted: null },
conflictAttributes: ['user_id', 'foreign_id'],
updateOnDuplicate: ['user_id', 'foreign_id', 'time_deleted']
};

beforeEach(() => Memberships.sync({ force: true }));

it('should insert items with conflictWhere', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: null
}));

const results = await Memberships.bulkCreate(
memberships,
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}
});

it('should not conflict with soft deleted memberships', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: new Date()
}));

let results = await Memberships.bulkCreate(memberships, options);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.not.eq(null);
}

results = await Memberships.bulkCreate(
memberships.map(membership => ({
...membership,
time_deleted: null
})),
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}

const count = await Memberships.count();

expect(count).to.eq(20);
});

it('should upsert existing memberships', async () => {
const memberships = new Array(10).fill().map((_, i) => ({
user_id: i + 1,
foreign_id: i + 20,
time_deleted: i % 2 ? new Date() : null
}));

let results = await Memberships.bulkCreate(memberships, options);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
if (i % 2) {
expect(results[i].time_deleted).to.not.eq(null);
} else {
expect(results[i].time_deleted).to.eq(null);
}
}

for (const membership of memberships) {
membership.time_deleted;
}

results = await Memberships.bulkCreate(
memberships.map(membership => ({
...membership,
time_deleted: null
})),
options
);

for (let i = 0; i < 10; i++) {
expect(results[i].user_id).to.eq(memberships[i].user_id);
expect(results[i].team_id).to.eq(memberships[i].team_id);
expect(results[i].time_deleted).to.eq(null);
}

const count = await Memberships.count({ paranoid: false });

expect(count).to.eq(15);
});
});
}
}
});
}
Expand Down
66 changes: 66 additions & 0 deletions test/unit/sql/insert.test.js
Expand Up @@ -291,5 +291,71 @@ describe(Support.getTestDialectTeaser('SQL'), () => {
}
});
});

if (
current.dialect.supports.inserts.updateOnDuplicate
) {
it('correctly generates SQL for conflictWhere', () => {
const User = Support.sequelize.define(
'user',
{
username: {
type: DataTypes.STRING,
field: 'user_name',
primaryKey: true
},
password: {
type: DataTypes.STRING,
field: 'pass_word'
},
createdAt: {
type: DataTypes.DATE,
field: 'created_at'
},
updatedAt: {
type: DataTypes.DATE,
field: 'updated_at'
},
deletedAt: {
type: DataTypes.DATE,
field: 'deleted_at'
}
},
{
timestamps: true
}
);

// mapping primary keys to their "field" override values
const primaryKeys = User.primaryKeyAttributes.map(attr => User.getAttributes()[attr].field || attr);

let result;

try {
result = sql.bulkInsertQuery(
User.tableName,
[{ user_name: 'testuser', pass_word: '12345' }],
{
updateOnDuplicate: ['user_name', 'pass_word', 'updated_at'],
upsertKeys: primaryKeys,
conflictWhere: { deleted_at: null }
},
User.fieldRawAttributesMap
);
} catch (error) {
result = error;
}

expectsql(result, {
default: new Error(
`conflictWhere not supported for dialect ${current.dialect.name}`
),
'postgres':
'INSERT INTO "users" ("user_name","pass_word") VALUES (\'testuser\',\'12345\') ON CONFLICT ("user_name") WHERE "deleted_at" IS NULL DO UPDATE SET "user_name"=EXCLUDED."user_name","pass_word"=EXCLUDED."pass_word","updated_at"=EXCLUDED."updated_at";',
'sqlite':
'INSERT INTO `users` (`user_name`,`pass_word`) VALUES (\'testuser\',\'12345\') ON CONFLICT (`user_name`) WHERE `deleted_at` IS NULL DO UPDATE SET `user_name`=EXCLUDED.`user_name`,`pass_word`=EXCLUDED.`pass_word`,`updated_at`=EXCLUDED.`updated_at`;'
});
});
}
});
});

0 comments on commit 295c297

Please sign in to comment.