Skip to content

Commit

Permalink
perf(core): use dedicated identity maps for each entity
Browse files Browse the repository at this point in the history
Related: #732
  • Loading branch information
B4nan committed Oct 12, 2020
1 parent 1089c57 commit 84667f9
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 63 deletions.
4 changes: 2 additions & 2 deletions packages/core/src/drivers/DatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD

abstract async nativeInsert<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

abstract async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;
abstract async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction, processCollections?: boolean): Promise<QueryResult>;

abstract async nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult> {
async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction, processCollections?: boolean): Promise<QueryResult> {
throw new Error(`Batch updates are not supported by ${this.constructor.name} driver`);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/drivers/IDatabaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ export interface IDatabaseDriver<C extends Connection = Connection> {

nativeInsert<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;
nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction, processCollections?: boolean): Promise<QueryResult>;

nativeUpdate<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, data: EntityData<T>, ctx?: Transaction): Promise<QueryResult>;

nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction): Promise<QueryResult>;
nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction, processCollections?: boolean): Promise<QueryResult>;

nativeDelete<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, ctx?: Transaction): Promise<QueryResult>;

Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/entity/EntityFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ export class EntityFactory {
this.unitOfWork.registerManaged(entity, data, options.refresh && options.initialized, options.newEntity);
}

this.eventManager.dispatchEvent(EventType.onInit, { entity, em: this.em });
if (this.eventManager.hasListeners(EventType.onInit, meta2)) {
this.eventManager.dispatchEvent(EventType.onInit, { entity, em: this.em });
}

return entity as New<T, P>;
}
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/events/EventManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyEntity } from '../typings';
import { AnyEntity, EntityMetadata } from '../typings';
import { EventArgs, EventSubscriber, FlushEventArgs } from './EventSubscriber';
import { Utils } from '../utils';
import { EventType } from '../enums';
Expand Down Expand Up @@ -47,9 +47,9 @@ export class EventManager {
return Utils.runSerial(listeners, listener => listener[1][listener[0]]!(args as (EventArgs<T> & FlushEventArgs)) as Promise<void>);
}

hasListeners<T extends AnyEntity<T>>(event: EventType, entity?: T): boolean {
hasListeners<T extends AnyEntity<T>>(event: EventType, meta: EntityMetadata<T>): boolean {
/* istanbul ignore next */
const hasHooks = entity?.__meta!.hooks[event]?.length;
const hasHooks = meta.hooks[event]?.length;

if (hasHooks) {
return true;
Expand All @@ -58,7 +58,7 @@ export class EventManager {
for (const listener of this.listeners[event] ?? []) {
const entities = this.entities.get(listener)!;

if (entities.length === 0 || !entity || entities.includes(entity.constructor.name)) {
if (entities.length === 0 || entities.includes(meta.className)) {
return true;
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/unit-of-work/ChangeSetPersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class ChangeSetPersister {
}

private async persistNewEntitiesBatch<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
const res = await this.driver.nativeInsertMany(meta.className, changeSets.map(cs => cs.payload), ctx);
const res = await this.driver.nativeInsertMany(meta.className, changeSets.map(cs => cs.payload), ctx, false);

for (let i = 0; i < changeSets.length; i++) {
const changeSet = changeSets[i];
Expand Down Expand Up @@ -131,7 +131,7 @@ export class ChangeSetPersister {

private async persistManagedEntitiesBatch<T extends AnyEntity<T>>(meta: EntityMetadata<T>, changeSets: ChangeSet<T>[], ctx?: Transaction): Promise<void> {
await this.checkOptimisticLocks(meta, changeSets, ctx);
await this.driver.nativeUpdateMany(meta.className, changeSets.map(cs => cs.entity.__helper!.getPrimaryKey() as Dictionary), changeSets.map(cs => cs.payload), ctx);
await this.driver.nativeUpdateMany(meta.className, changeSets.map(cs => cs.entity.__helper!.getPrimaryKey() as Dictionary), changeSets.map(cs => cs.payload), ctx, false);
changeSets.forEach(cs => cs.persisted = true);
}

Expand Down
72 changes: 72 additions & 0 deletions packages/core/src/unit-of-work/IdentityMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { AnyEntity, Constructor, EntityMetadata } from '../typings';

export class IdentityMap {

private readonly registry = new Map<Constructor<AnyEntity>, Map<string, AnyEntity>>();

store<T extends AnyEntity<T>>(item: T) {
this.getStore(item.__meta!.root).set(item.__helper!.getSerializedPrimaryKey(), item);
}

delete<T extends AnyEntity<T>>(item: T) {
this.getStore(item.__meta!.root).delete(item.__helper!.getSerializedPrimaryKey());
}

getByHash<T>(meta: EntityMetadata<T>, hash: string): T | undefined {
const store = this.getStore(meta);
return store.has(hash) ? store.get(hash) : undefined;
}

getStore<T extends AnyEntity<T>>(meta: EntityMetadata<T>): Map<string, T> {
const store = this.registry.get(meta.class) as Map<string, T>;

if (store) {
return store;
}

const newStore = new Map();
this.registry.set(meta.class, newStore);

return newStore;
}

clear() {
this.registry.clear();
}

values(): AnyEntity[] {
const ret: AnyEntity[] = [];

for (const store of this.registry.values()) {
ret.push(...store.values());
}

return ret;
}

keys(): string[] {
const ret: string[] = [];

for (const [cls, store] of this.registry) {
ret.push(...[...store.keys()].map(hash => `${cls.name}-${hash}`));
}

return ret;
}

/**
* For back compatibility only.
*/
get<T>(hash: string): T | undefined {
const [name, id] = hash.split('-', 2);
const cls = [...this.registry.keys()].find(k => k.name === name);

if (!cls) {
return undefined;
}

const store = this.registry.get(cls) as Map<string, T>;
return store.has(id) ? store.get(id) : undefined;
}

}
31 changes: 16 additions & 15 deletions packages/core/src/unit-of-work/UnitOfWork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import { EntityManager } from '../EntityManager';
import { Cascade, EventType, LockMode, ReferenceType } from '../enums';
import { OptimisticLockError, ValidationError } from '../errors';
import { Transaction } from '../connections';
import { IdentityMap } from './IdentityMap';

export class UnitOfWork {

/** map of references to managed entities */
private readonly identityMap = new Map<string, AnyEntity>();
private readonly identityMap = new IdentityMap();

private readonly persistStack = new Set<AnyEntity>();
private readonly removeStack = new Set<AnyEntity>();
Expand All @@ -32,7 +33,6 @@ export class UnitOfWork {
constructor(private readonly em: EntityManager) { }

merge<T extends AnyEntity<T>>(entity: T, visited = new WeakSet<AnyEntity>()): void {
const meta = entity.__meta!;
const wrapped = entity.__helper!;
wrapped.__em = this.em;

Expand All @@ -45,7 +45,7 @@ export class UnitOfWork {
return;
}

this.identityMap.set(`${meta.root.name}-${wrapped.getSerializedPrimaryKey()}`, entity);
this.identityMap.store(entity);
entity.__helper!.__originalEntityData = this.comparator.prepareEntity(entity);

this.cascade(entity, Cascade.MERGE, visited);
Expand All @@ -55,16 +55,17 @@ export class UnitOfWork {
* @internal
*/
registerManaged<T extends AnyEntity<T>>(entity: T, data?: EntityData<T>, refresh?: boolean, newEntity?: boolean): T {
this.identityMap.set(`${entity.__meta!.root.name}-${entity.__helper!.getSerializedPrimaryKey()}`, entity);
this.identityMap.store(entity);

if (newEntity) {
return entity;
}

entity.__helper!.__em = this.em;
const helper = entity.__helper!;
helper!.__em = this.em;

if (data && entity.__helper!.__initialized && (refresh || !entity.__helper!.__originalEntityData)) {
entity.__helper!.__originalEntityData = data;
if (data && helper!.__initialized && (refresh || !helper!.__originalEntityData)) {
helper!.__originalEntityData = data;
}

return entity;
Expand All @@ -75,10 +76,9 @@ export class UnitOfWork {
*/
getById<T extends AnyEntity<T>>(entityName: string, id: Primary<T> | Primary<T>[]): T {
const root = this.metadata.find(entityName)!.root;
const hash = Array.isArray(id) ? Utils.getPrimaryKeyHash(id as string[]) : id;
const token = `${root.name}-${hash}`;
const hash = Array.isArray(id) ? Utils.getPrimaryKeyHash(id as string[]) : '' + id;

return this.identityMap.get(token) as T;
return this.identityMap.getByHash(root, hash);
}

tryGetById<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>, strict = true): T | null {
Expand All @@ -94,7 +94,7 @@ export class UnitOfWork {
/**
* Returns map of all managed entities.
*/
getIdentityMap(): Map<string, AnyEntity> {
getIdentityMap(): IdentityMap {
return this.identityMap;
}

Expand All @@ -113,7 +113,7 @@ export class UnitOfWork {
*/
getOriginalEntityData<T extends AnyEntity<T>>(entity?: T): EntityData<AnyEntity>[] | EntityData<T> | undefined {
if (!entity) {
return [...this.identityMap.values()].map(e => {
return this.identityMap.values().map(e => {
return e.__helper!.__originalEntityData!;
});
}
Expand Down Expand Up @@ -251,8 +251,8 @@ export class UnitOfWork {
}

unsetIdentity(entity: AnyEntity): void {
this.identityMap.delete(entity);
const wrapped = entity.__helper!;
this.identityMap.delete(`${entity.__meta!.root.name}-${wrapped.getSerializedPrimaryKey()}`);
delete wrapped.__identifier;
delete wrapped.__originalEntityData;
}
Expand Down Expand Up @@ -374,7 +374,7 @@ export class UnitOfWork {
}

private async runHooks<T extends AnyEntity<T>>(type: EventType, changeSet: ChangeSet<T>, sync = false): Promise<unknown> {
const hasListeners = this.eventManager.hasListeners(type, changeSet.entity);
const hasListeners = this.eventManager.hasListeners(type, changeSet.entity.__meta!);

if (!hasListeners) {
return;
Expand Down Expand Up @@ -552,7 +552,8 @@ export class UnitOfWork {
return;
}

const props = changeSets[0].entity.__meta!.relations.filter(prop => {
const meta = changeSets[0].entity.__meta!;
const props = meta.relations.filter(prop => {
return (prop.reference === ReferenceType.ONE_TO_ONE && prop.owner) || prop.reference === ReferenceType.MANY_TO_ONE;
});

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/unit-of-work/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from './ChangeSetComputer';
export * from './ChangeSetPersister';
export * from './CommitOrderCalculator';
export * from './UnitOfWork';
export * from './IdentityMap';
31 changes: 13 additions & 18 deletions packages/knex/src/AbstractSqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return res;
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<KnexTransaction>, processCollections = true): Promise<QueryResult> {
const meta = this.metadata.get<T>(entityName);
const collections = data.map(d => this.extractManyToMany(entityName, d));
const collections = processCollections ? data.map(d => this.extractManyToMany(entityName, d)) : [];
const pks = this.getPrimaryKeyFields(entityName);
const set = new Set<string>();
data.forEach(row => Object.keys(row).forEach(k => set.add(k)));
Expand Down Expand Up @@ -281,11 +281,9 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return res;
}

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<KnexTransaction>): Promise<QueryResult> {
async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<KnexTransaction>, processCollections = true): Promise<QueryResult> {
const meta = this.metadata.get<T>(entityName);
const collections = data.map(d => this.extractManyToMany(entityName, d));
const values: Dictionary<Raw> = {};
const knex = this.connection.getKnex();
const collections = processCollections ? data.map(d => this.extractManyToMany(entityName, d)) : [];
const keys = new Set<string>();
data.forEach(row => Object.keys(row).forEach(k => keys.add(k)));
const pkCond = Utils.flatten(meta.primaryKeys.map(pk => meta.properties[pk].fieldNames)).map(pk => `${this.platform.quoteIdentifier(pk)} = ?`).join(' and ');
Expand Down Expand Up @@ -495,17 +493,14 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return {};
}

const props = this.metadata.find(entityName)!.properties;
const ret: EntityData<T> = {};

for (const k of Object.keys(data)) {
const prop = props[k];

if (prop && prop.reference === ReferenceType.MANY_TO_MANY) {
ret[k as keyof T] = data[k].map((item: Primary<T>) => Utils.asArray(item));
delete data[k];
this.metadata.find(entityName)!.relations.forEach(prop => {
if (prop.reference === ReferenceType.MANY_TO_MANY && data[prop.name]) {
ret[prop.name as keyof T] = data[prop.name].map((item: Primary<T>) => Utils.asArray(item));
delete data[prop.name];
}
}
});

return ret;
}
Expand All @@ -515,10 +510,10 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
return;
}

const props = meta.properties;

for (const k of Object.keys(collections)) {
await this.rethrow(this.updateCollectionDiff(meta, props[k], pks, clear, collections[k], ctx));
for (const prop of meta.relations) {
if (collections[prop.name]) {
await this.rethrow(this.updateCollectionDiff(meta, prop, pks, clear, collections[prop.name], ctx));
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/mariadb/src/MariaDbDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ export class MariaDbDriver extends AbstractSqlDriver<MariaDbConnection> {
super(config, new MySqlPlatform(), MariaDbConnection, ['knex', 'mariadb']);
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx);
async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>, processCollections = true): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx, processCollections);
const pks = this.getPrimaryKeyFields(entityName);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId + idx });
res.row = res.rows![0];
Expand Down
4 changes: 2 additions & 2 deletions packages/mongodb/src/MongoDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
return this.rethrow(this.getConnection('write').insertOne(entityName, data as { _id: any }, ctx));
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<ClientSession>, processCollections = true): Promise<QueryResult> {
data = data.map(d => this.renameFields(entityName, d));
return this.rethrow(this.getConnection('write').insertMany(entityName, data as any[], ctx));
}
Expand All @@ -68,7 +68,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
return this.rethrow(this.getConnection('write').updateMany(entityName, where as FilterQuery<T>, data as { _id: any }, ctx));
}

async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<ClientSession>): Promise<QueryResult> {
async nativeUpdateMany<T extends AnyEntity<T>>(entityName: string, where: FilterQuery<T>[], data: EntityData<T>[], ctx?: Transaction<ClientSession>, processCollections?: boolean): Promise<QueryResult> {
data = data.map(row => this.renameFields(entityName, row));
return this.rethrow(this.getConnection('write').bulkUpdateMany(entityName, where, data as { _id: any }[], ctx));
}
Expand Down
4 changes: 2 additions & 2 deletions packages/mysql-base/src/MySqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export class MySqlDriver extends AbstractSqlDriver<MySqlConnection> {
super(config, new MySqlPlatform(), MySqlConnection, ['knex', 'mysql2']);
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx);
async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>, processCollections = true): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx, processCollections);
const pks = this.getPrimaryKeyFields(entityName);
data.forEach((item, idx) => res.rows![idx] = { [pks[0]]: item[pks[0]] ?? res.insertId + idx });
res.row = res.rows![0];
Expand Down
4 changes: 2 additions & 2 deletions packages/sqlite/src/SqliteDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export class SqliteDriver extends AbstractSqlDriver<SqliteConnection> {
super(config, new SqlitePlatform(), SqliteConnection, ['knex', 'sqlite3']);
}

async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx);
async nativeInsertMany<T extends AnyEntity<T>>(entityName: string, data: EntityData<T>[], ctx?: Transaction<Knex.Transaction>, processCollections = true): Promise<QueryResult> {
const res = await super.nativeInsertMany(entityName, data, ctx, processCollections);
const pks = this.getPrimaryKeyFields(entityName);
const first = res.insertId - data.length + 1;
res.rows = res.rows ?? [];
Expand Down
Loading

0 comments on commit 84667f9

Please sign in to comment.