Skip to content

Commit

Permalink
fix(mariadb): rework pagination mechanism to fix extra results
Browse files Browse the repository at this point in the history
MariaDB imposes restrictions on how subqueries work, breaking the default pagination mechanism, which returned all the rows instead of just the matching ones based on the subquery.

This commit uses JSON array instead to get around this limitation.
  • Loading branch information
B4nan committed Apr 6, 2024
1 parent d228976 commit a57cb19
Show file tree
Hide file tree
Showing 6 changed files with 650 additions and 18 deletions.
4 changes: 2 additions & 2 deletions packages/knex/src/query/QueryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ export class QueryBuilder<T extends object = AnyEntity> {
return prop;
}

private prepareFields<T, U extends string | Knex.Raw>(fields: Field<T>[], type: 'where' | 'groupBy' | 'sub-query' = 'where'): U[] {
protected prepareFields<T, U extends string | Knex.Raw>(fields: Field<T>[], type: 'where' | 'groupBy' | 'sub-query' = 'where'): U[] {
const ret: Field<T>[] = [];
const getFieldName = (name: string) => {
if (type === 'groupBy') {
Expand Down Expand Up @@ -1475,7 +1475,7 @@ export class QueryBuilder<T extends object = AnyEntity> {
});
}

private wrapPaginateSubQuery(meta: EntityMetadata): void {
protected wrapPaginateSubQuery(meta: EntityMetadata): void {
const pks = this.prepareFields(meta.primaryKeys, 'sub-query') as string[];
const subQuery = this.clone(['_orderBy', '_fields']).select(pks).groupBy(pks).limit(this._limit!);
// revert the on conditions added via populateWhere, we want to apply those only once
Expand Down
27 changes: 25 additions & 2 deletions packages/mariadb/src/MariaDbDriver.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import type { Configuration, EntityDictionary, NativeInsertUpdateManyOptions, QueryResult, Transaction } from '@mikro-orm/core';
import { AbstractSqlDriver } from '@mikro-orm/knex';
import {
type AnyEntity,
type Configuration,
type ConnectionType,
type EntityDictionary,
type LoggingOptions,
type NativeInsertUpdateManyOptions,
type QueryResult,
type Transaction,
QueryFlag,
} from '@mikro-orm/core';
import { AbstractSqlDriver, type Knex, type SqlEntityManager } from '@mikro-orm/knex';
import { MariaDbConnection } from './MariaDbConnection';
import { MariaDbPlatform } from './MariaDbPlatform';
import { MariaDbQueryBuilder } from './MariaDbQueryBuilder';

export class MariaDbDriver extends AbstractSqlDriver<MariaDbConnection, MariaDbPlatform> {

Expand Down Expand Up @@ -40,4 +51,16 @@ export class MariaDbDriver extends AbstractSqlDriver<MariaDbConnection, MariaDbP
return res;
}

override createQueryBuilder<T extends AnyEntity<T>>(entityName: string, ctx?: Transaction<Knex.Transaction>, preferredConnectionType?: ConnectionType, convertCustomTypes?: boolean, loggerContext?: LoggingOptions, alias?: string, em?: SqlEntityManager): MariaDbQueryBuilder<T> {
// do not compute the connectionType if EM is provided as it will be computed from it in the QB later on
const connectionType = em ? preferredConnectionType : this.resolveConnectionType({ ctx, connectionType: preferredConnectionType });
const qb = new MariaDbQueryBuilder<T>(entityName, this.metadata, this, ctx, alias, connectionType, em, loggerContext);

if (!convertCustomTypes) {
qb.unsetFlag(QueryFlag.CONVERT_CUSTOM_TYPES);
}

return qb;
}

}
128 changes: 128 additions & 0 deletions packages/mariadb/src/MariaDbQueryBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import {
type AnyEntity,
type Dictionary,
type EntityKey,
type EntityMetadata,
type PopulateOptions,
raw,
RawQueryFragment,
} from '@mikro-orm/core';
import { QueryBuilder } from '@mikro-orm/knex';

/**
* @inheritDoc
*/
export class MariaDbQueryBuilder<T extends object = AnyEntity> extends QueryBuilder<T> {

protected override wrapPaginateSubQuery(meta: EntityMetadata): void {
const pks = this.prepareFields(meta.primaryKeys, 'sub-query') as string[];
const quotedPKs = pks.map(pk => this.platform.quoteIdentifier(pk));
const subQuery = this.clone(['_orderBy', '_fields']).select(pks).groupBy(pks).limit(this._limit!);
// revert the on conditions added via populateWhere, we want to apply those only once
// @ts-ignore
Object.values(subQuery._joins).forEach(join => join.cond = join.cond_ ?? {});

if (this._offset) {
subQuery.offset(this._offset);
}

const addToSelect = [];

if (this._orderBy.length > 0) {
const orderBy = [];

for (const orderMap of this._orderBy) {
for (const [field, direction] of Object.entries(orderMap)) {
if (RawQueryFragment.isKnownFragment(field)) {
const rawField = RawQueryFragment.getKnownFragment(field, false)!;
this.rawFragments.add(field);
orderBy.push({ [rawField.clone() as any]: direction });
continue;
}

const [a, f] = this.helper.splitField(field as EntityKey<T>);
const prop = this.helper.getProperty(f, a);
const type = this.platform.castColumn(prop);
const fieldName = this.helper.mapper(field, this.type, undefined, null);

if (!prop?.persist && !prop?.formula && !pks.includes(fieldName)) {
addToSelect.push(fieldName);
}

const key = raw(`min(${this.knex.ref(fieldName)}${type})`);
orderBy.push({ [key]: direction });
}
}

subQuery.orderBy(orderBy);
}

// @ts-ignore
subQuery.finalized = true;
const knexQuery = subQuery.as(this.mainAlias.aliasName).clearSelect().select(pks);

if (addToSelect.length > 0) {
addToSelect.forEach(prop => {
const field = this._fields!.find(field => {
if (typeof field === 'object' && field && '__as' in field) {
return field.__as === prop;
}

if (field instanceof RawQueryFragment) {
// not perfect, but should work most of the time, ideally we should check only the alias (`... as alias`)
return field.sql.includes(prop);
}

return false;
});

if (field instanceof RawQueryFragment) {
knexQuery.select(this.platform.formatQuery(field.sql, field.params));
} else if (field) {
knexQuery.select(field as string);
}
});
}

// multiple sub-queries are needed to get around mysql limitations with order by + limit + where in + group by (o.O)
// https://stackoverflow.com/questions/17892762/mysql-this-version-of-mysql-doesnt-yet-support-limit-in-all-any-some-subqu
const subSubQuery = this.getKnex().select(this.knex.raw(`json_arrayagg(${quotedPKs.join(', ')})`)).from(knexQuery);
(subSubQuery as Dictionary).__raw = true; // tag it as there is now way to check via `instanceof`
this._limit = undefined;
this._offset = undefined;

// remove joins that are not used for population or ordering to improve performance
const populate = new Set<string>();
const orderByAliases = this._orderBy
.flatMap(hint => Object.keys(hint))
.map(k => k.split('.')[0]);

function addPath(hints: PopulateOptions<any>[], prefix = '') {
for (const hint of hints) {
const field = hint.field.split(':')[0];
populate.add((prefix ? prefix + '.' : '') + field);

if (hint.children) {
addPath(hint.children, (prefix ? prefix + '.' : '') + field);
}
}
}

addPath(this._populate);

for (const [key, join] of Object.entries(this._joins)) {
const path = join.path?.replace(/\[populate]|\[pivot]|:ref/g, '').replace(new RegExp(`^${meta.className}.`), '');

if (!populate.has(path ?? '') && !orderByAliases.includes(join.alias)) {
delete this._joins[key];
}
}

const subquerySql = subSubQuery.toString();
const key = meta.getPrimaryProps()[0].runtimeType === 'string' ? `concat('"', ${quotedPKs.join(', ')}, '"')` : quotedPKs.join(', ');
const sql = `json_contains((${subquerySql}), ${key})`;
this._cond = {};
this.select(this._fields!).where(sql);
}

}
78 changes: 74 additions & 4 deletions tests/EntityManager.mariadb.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { v4 } from 'uuid';
import { Collection, Configuration, EntityManager, MikroORM, QueryOrder, Reference, wrap } from '@mikro-orm/core';
import { Collection, Configuration, EntityManager, MikroORM, QueryFlag, QueryOrder, Reference, wrap } from '@mikro-orm/core';
import { MariaDbDriver } from '@mikro-orm/mariadb';
import { Author2, Book2, BookTag2, Publisher2, PublisherType } from './entities-sql';
import { initORMMySql } from './bootstrap';
import { MySqlDriver } from '@mikro-orm/mysql';
import { initORMMySql, mockLogger } from './bootstrap';

describe('EntityManagerMariaDb', () => {

Expand Down Expand Up @@ -38,7 +37,7 @@ describe('EntityManagerMariaDb', () => {

test('getConnectionOptions()', async () => {
const config = new Configuration({
driver: MySqlDriver,
driver: MariaDbDriver,
clientUrl: 'mysql://root@127.0.0.1:3308/db_name',
host: '127.0.0.10',
password: 'secret',
Expand Down Expand Up @@ -242,4 +241,75 @@ describe('EntityManagerMariaDb', () => {
await orm.em.remove(lastBook[0]).flush();
});

test('pagination', async () => {
for (let i = 1; i <= 10; i++) {
const num = `${i}`.padStart(2, '0');
const god = new Author2(`God ${num}`, `hello${num}@heaven.god`);
const b1 = new Book2(`Bible ${num}.1`, god);
const b2 = new Book2(`Bible ${num}.2`, god);
const b3 = new Book2(`Bible ${num}.3`, god);
await orm.em.persistAndFlush([b1, b2, b3]);
orm.em.persist(god);
}

await orm.em.flush();
orm.em.clear();

const mock = mockLogger(orm, ['query']);

// without paginate flag it fails to get only 2 records (we need to explicitly disable it)
const res1 = await orm.em.find(Author2, { books: { title: /^Bible/ } }, {
orderBy: { name: QueryOrder.ASC, books: { title: QueryOrder.ASC } },
limit: 5,
groupBy: ['id', 'name', 'b1.title'],
having: { $or: [{ age: { $gt: 0 } }, { age: { $lte: 0 } }, { age: null }] }, // no-op just for testing purposes
strategy: 'select-in',
});

expect(res1).toHaveLength(2);
expect(res1.map(a => a.name)).toEqual(['God 01', 'God 02']);
expect(mock.mock.calls[0][0]).toMatch('select `a0`.*, `a2`.`author_id` as `a2__author_id` ' +
'from `author2` as `a0` ' +
'left join `book2` as `b1` on `a0`.`id` = `b1`.`author_id` ' +
'left join `address2` as `a2` on `a0`.`id` = `a2`.`author_id` ' +
'where `b1`.`title` like ? ' +
'group by `a0`.`id`, `a0`.`name`, `b1`.`title` ' +
'having (`a0`.`age` > ? or `a0`.`age` <= ? or `a0`.`age` is null) ' +
'order by `a0`.`name` asc, `b1`.`title` asc ' +
'limit ?');

// with paginate flag (and a bit of dark sql magic) we get what we want
const res2 = await orm.em.find(Author2, { books: { title: /^Bible/ } }, {
orderBy: { name: QueryOrder.ASC, books: { title: QueryOrder.ASC } },
offset: 3,
limit: 5,
flags: [QueryFlag.PAGINATE],
strategy: 'select-in',
});

expect(res2).toHaveLength(5);
expect(res2.map(a => a.name)).toEqual(['God 04', 'God 05', 'God 06', 'God 07', 'God 08']);
expect(mock.mock.calls[1][0]).toMatch('select `a0`.*, `a2`.`author_id` as `a2__author_id` ' +
'from `author2` as `a0` ' +
'left join `book2` as `b1` on `a0`.`id` = `b1`.`author_id` ' +
'left join `address2` as `a2` on `a0`.`id` = `a2`.`author_id` ' +
'where (json_contains((select json_arrayagg(`a0`.`id`) from (select `a0`.`id` from `author2` as `a0` ' +
'left join `book2` as `b1` on `a0`.`id` = `b1`.`author_id` ' +
'left join `address2` as `a2` on `a0`.`id` = `a2`.`author_id` ' +
'where `b1`.`title` like \'Bible%\' ' +
'group by `a0`.`id` ' +
'order by min(`a0`.`name`) asc, min(`b1`.`title`) asc limit 5 offset 3) as `a0`), `a0`.`id`)) ' +
'order by `a0`.`name` asc, `b1`.`title` asc');

// with paginate flag without offset
const res3 = await orm.em.find(Author2, { books: { title: /^Bible/ } }, {
orderBy: { name: QueryOrder.ASC, books: { title: QueryOrder.ASC } },
limit: 5,
flags: [QueryFlag.PAGINATE],
});

expect(res3).toHaveLength(5);
expect(res3.map(a => a.name)).toEqual(['God 01', 'God 02', 'God 03', 'God 04', 'God 05']);
});

});
10 changes: 0 additions & 10 deletions tests/EntityManager.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2499,16 +2499,6 @@ describe('EntityManagerMySql', () => {

const mock = mockLogger(orm, ['query']);

// select `a0`.*, `a1`.`author_id` as `a1__author_id`
// from `author2` as `a0`
// left join `address2` as `a1` on `a0`.`id` = `a1`.`author_id`
// left join `book2` as `b2` on `a0`.`id` = `b2`.`author_id`
// where `b2`.`title` like 'Bible%'
// group by `a0`.`id`, `a0`.`name`, `b1`.`title`
// having (`a0`.`age` > 0 or `a0`.`age` <= 0 or `a0`.`age` is null)
// order by `a0`.`name` asc, `b2`.`title` asc limit 5
// Unknown column 'b1.title' in 'group statement'

// without paginate flag it fails to get only 2 records (we need to explicitly disable it)
const res1 = await orm.em.find(Author2, { books: { title: /^Bible/ } }, {
orderBy: { name: QueryOrder.ASC, books: { title: QueryOrder.ASC } },
Expand Down
Loading

0 comments on commit a57cb19

Please sign in to comment.