Skip to content

Commit

Permalink
feat(core): add support for read connections (#116)
Browse files Browse the repository at this point in the history
Now users can specify multiple read connections via `replicas` option. You can provide only fields that differ from master connection, rest will be taken from it.

By default select queries will use random read connection if not inside transaction. You can specify connection type manually in `EntityManager.getConnection(type: 'read' | 'write')`.

Closes #77
  • Loading branch information
B4nan committed Sep 13, 2019
1 parent 8b18a87 commit 6144463
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 49 deletions.
10 changes: 5 additions & 5 deletions lib/EntityManager.ts
@@ -1,7 +1,7 @@
import { Configuration, RequestContext, Utils, ValidationError } from './utils';
import { EntityAssigner, EntityFactory, EntityLoader, EntityRepository, EntityValidator, ReferenceType } from './entity';
import { LockMode, UnitOfWork } from './unit-of-work';
import { FilterQuery, IDatabaseDriver } from './drivers';
import { AbstractSqlDriver, FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
Expand All @@ -24,8 +24,8 @@ export class EntityManager {
return this.driver as D;
}

getConnection<C extends Connection = Connection>(): C {
return this.driver.getConnection() as C;
getConnection<C extends Connection = Connection>(type?: 'read' | 'write'): C {
return this.driver.getConnection(type) as C;
}

getRepository<T extends IEntityType<T>>(entityName: EntityName<T>): EntityRepository<T> {
Expand All @@ -44,9 +44,9 @@ export class EntityManager {
return this.validator;
}

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
createQueryBuilder(entityName: EntityName<IEntity>, alias?: string, type?: 'read' | 'write'): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
return new QueryBuilder(entityName, this.metadata, this.driver as AbstractSqlDriver, this.transactionContext, alias, type);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand Down
5 changes: 2 additions & 3 deletions lib/MikroORM.ts
Expand Up @@ -45,8 +45,7 @@ export class MikroORM {
}

async connect(): Promise<IDatabaseDriver> {
const connection = this.driver.getConnection();
await connection.connect();
const connection = await this.driver.connect();
const clientUrl = connection.getClientUrl();
const dbName = this.config.get('dbName');
this.logger.log('info', `MikroORM successfully connected to database ${chalk.green(dbName)}${clientUrl ? ' on ' + chalk.green(clientUrl) : ''}`);
Expand All @@ -59,7 +58,7 @@ export class MikroORM {
}

async close(force = false): Promise<void> {
return this.driver.getConnection().close(force);
return this.driver.close(force);
}

getMetadata(): MetadataStorage {
Expand Down
31 changes: 21 additions & 10 deletions lib/connections/Connection.ts
Expand Up @@ -3,16 +3,22 @@ import { Transaction as KnexTransaction } from 'knex';
import chalk from 'chalk';
import highlight from 'cli-highlight';

import { Configuration, Utils } from '../utils';
import { Configuration, ConnectionOptions, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {

protected readonly logger = this.config.getLogger();
protected metadata: MetadataStorage;
protected abstract client: any;

constructor(protected readonly config: Configuration) { }
constructor(protected readonly config: Configuration,
protected readonly options?: ConnectionOptions,
protected readonly type: 'read' | 'write' = 'write') {
if (!this.options) {
const props = ['dbName', 'clientUrl', 'host', 'port', 'user', 'password', 'multipleStatements', 'pool'] as const;
this.options = props.reduce((o, i) => { (o[i] as any) = this.config.get(i); return o; }, {} as ConnectionOptions);
}
}

/**
* Establishes connection to database
Expand Down Expand Up @@ -42,12 +48,12 @@ export abstract class Connection {

getConnectionOptions(): ConnectionConfig {
const ret: ConnectionConfig = {};
const url = new URL(this.config.getClientUrl());
ret.host = this.config.get('host', url.hostname);
ret.port = this.config.get('port', +url.port);
ret.user = this.config.get('user', url.username);
ret.password = this.config.get('password', url.password);
ret.database = this.config.get('dbName', url.pathname.replace(/^\//, ''));
const url = new URL(this.options!.clientUrl || this.config.getClientUrl());
this.options!.host = ret.host = this.config.get('host', url.hostname);
this.options!.port = ret.port = this.config.get('port', +url.port);
this.options!.user = ret.user = this.config.get('user', url.username);
this.options!.password = ret.password = this.config.get('password', url.password);
this.options!.dbName = ret.database = this.config.get('dbName', url.pathname.replace(/^\//, ''));

return ret;
}
Expand Down Expand Up @@ -76,7 +82,12 @@ export abstract class Connection {
query = highlight(query, { language, ignoreIllegals: true, theme: this.config.getHighlightTheme() });
}

const msg = `${query}` + (Utils.isDefined(took) ? chalk.grey(` [took ${took} ms]`) : '');
let msg = query + (Utils.isDefined(took) ? chalk.grey(` [took ${chalk.grey(took)} ms]`) : '');

if (this.config.get('replicas', []).length > 0) {
msg += chalk.cyan(` (via ${this.type} connection '${this.options!.name || this.config.get('name') || this.options!.host}')`);
}

this.config.getLogger().log('query', msg);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/drivers/AbstractSqlDriver.ts
Expand Up @@ -93,7 +93,7 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
}

protected createQueryBuilder(entityName: string, ctx?: Transaction): QueryBuilder {
return new QueryBuilder(entityName, this.metadata, this, ctx);
return new QueryBuilder(entityName, this.metadata, this, ctx, undefined, ctx ? 'write' : 'read');
}

protected extractManyToMany<T extends IEntityType<T>>(entityName: string, data: EntityData<T>): EntityData<T> {
Expand Down
38 changes: 35 additions & 3 deletions lib/drivers/DatabaseDriver.ts
Expand Up @@ -2,14 +2,15 @@ import { FilterQuery, IDatabaseDriver } from './IDatabaseDriver';
import { EntityData, EntityMetadata, EntityProperty, IEntity, IEntityType, IPrimaryKey } from '../decorators';
import { MetadataStorage } from '../metadata';
import { Connection, QueryResult, Transaction } from '../connections';
import { Configuration, Utils } from '../utils';
import { Configuration, ConnectionOptions, Utils } from '../utils';
import { QueryOrder, QueryOrderMap } from '../query';
import { Platform } from '../platforms';
import { LockMode } from '../unit-of-work';

export abstract class DatabaseDriver<C extends Connection> implements IDatabaseDriver<C> {

protected readonly connection: C;
protected readonly replicas: C[] = [];
protected readonly platform: Platform;
protected readonly logger = this.config.getLogger();
protected metadata: MetadataStorage;
Expand Down Expand Up @@ -74,8 +75,26 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD
return ret as T;
}

getConnection(): C {
return this.connection as C;
async connect(): Promise<C> {
await this.connection.connect();
await Promise.all(this.replicas.map(replica => replica.connect()));

return this.connection;
}

getConnection(type: 'read' | 'write' = 'write'): C {
if (type === 'write' || this.replicas.length === 0) {
return this.connection as C;
}

const rand = Utils.randomInt(0, this.replicas.length - 1);

return this.replicas[rand] as C;
}

async close(force?: boolean): Promise<void> {
await Promise.all(this.replicas.map(replica => replica.close(force)));
await this.connection.close(force);
}

getPlatform(): Platform {
Expand All @@ -92,4 +111,17 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD
return meta ? meta.primaryKey : this.config.getNamingStrategy().referenceColumnName();
}

protected createReplicas(cb: (c: ConnectionOptions) => C): C[] {
const replicas = this.config.get('replicas', [])!;
const ret: C[] = [];
const props = ['dbName', 'clientUrl', 'host', 'port', 'user', 'password', 'multipleStatements', 'pool', 'name'] as const;

replicas.forEach((conf: Partial<ConnectionOptions>) => {
props.forEach(prop => (conf[prop] as any) = prop in conf ? conf[prop] : this.config.get(prop));
ret.push(cb(conf as ConnectionOptions));
});

return ret;
}

}
6 changes: 5 additions & 1 deletion lib/drivers/IDatabaseDriver.ts
Expand Up @@ -7,7 +7,11 @@ import { MetadataStorage } from '../metadata';

export interface IDatabaseDriver<C extends Connection = Connection> {

getConnection(): C;
connect(): Promise<C>;

close(force?: boolean): Promise<void>;

getConnection(type?: 'read' | 'write'): C;

/**
* Finds selection of entities
Expand Down
14 changes: 7 additions & 7 deletions lib/drivers/MongoDriver.ts
Expand Up @@ -15,7 +15,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {

async find<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T>, populate: string[], orderBy: QueryOrderMap, limit: number, offset: number): Promise<T[]> {
where = this.renameFields(entityName, where);
const res = await this.connection.find<T>(entityName, where, orderBy, limit, offset);
const res = await this.getConnection('read').find<T>(entityName, where, orderBy, limit, offset);

return res.map((r: T) => this.mapResult<T>(r, this.metadata.get(entityName))!);
}
Expand All @@ -26,19 +26,19 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
}

where = this.renameFields(entityName, where) as FilterQuery<T>;
const res = await this.connection.find<T>(entityName, where, orderBy, 1, undefined, fields);
const res = await this.getConnection('read').find<T>(entityName, where, orderBy, 1, undefined, fields);

return this.mapResult<T>(res[0], this.metadata.get(entityName));
}

async count<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T>): Promise<number> {
where = this.renameFields(entityName, where);
return this.connection.countDocuments<T>(entityName, where);
return this.getConnection('read').countDocuments<T>(entityName, where);
}

async nativeInsert<T extends IEntityType<T>>(entityName: string, data: EntityData<T>): Promise<QueryResult> {
data = this.renameFields(entityName, data);
return this.connection.insertOne<EntityData<T>>(entityName, data);
return this.getConnection('write').insertOne<EntityData<T>>(entityName, data);
}

async nativeUpdate<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T> | IPrimaryKey, data: EntityData<T>): Promise<QueryResult> {
Expand All @@ -49,7 +49,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
where = this.renameFields(entityName, where) as FilterQuery<T>;
data = this.renameFields(entityName, data);

return this.connection.updateMany<T>(entityName, where, data);
return this.getConnection('write').updateMany<T>(entityName, where, data);
}

async nativeDelete<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T> | IPrimaryKey): Promise<QueryResult> {
Expand All @@ -59,11 +59,11 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {

where = this.renameFields(entityName, where) as FilterQuery<T>;

return this.connection.deleteMany<T>(entityName, where);
return this.getConnection('write').deleteMany<T>(entityName, where);
}

async aggregate(entityName: string, pipeline: any[]): Promise<any[]> {
return this.connection.aggregate(entityName, pipeline);
return this.getConnection('read').aggregate(entityName, pipeline);
}

private renameFields(entityName: string, data: any): any {
Expand Down
1 change: 1 addition & 0 deletions lib/drivers/MySqlDriver.ts
Expand Up @@ -5,6 +5,7 @@ import { MySqlPlatform } from '../platforms/MySqlPlatform';
export class MySqlDriver extends AbstractSqlDriver<MySqlConnection> {

protected readonly connection = new MySqlConnection(this.config);
protected readonly replicas = this.createReplicas(conf => new MySqlConnection(this.config, conf, 'read'));
protected readonly platform = new MySqlPlatform();

}
17 changes: 9 additions & 8 deletions lib/query/QueryBuilder.ts
Expand Up @@ -6,8 +6,7 @@ import { EntityProperty, IEntity } from '../decorators';
import { ReferenceType } from '../entity';
import { QueryFlag, QueryOrderMap, QueryType } from './enums';
import { LockMode } from '../unit-of-work';
import { AbstractSqlConnection } from '../connections/AbstractSqlConnection';
import { IDatabaseDriver } from '../drivers';
import { AbstractSqlDriver } from '../drivers';
import { MetadataStorage } from '../metadata';

/**
Expand All @@ -33,16 +32,16 @@ export class QueryBuilder {
private _limit: number;
private _offset: number;
private lockMode?: LockMode;
private readonly connection = this.driver.getConnection() as AbstractSqlConnection;
private readonly platform = this.driver.getPlatform();
private readonly knex = this.connection.getKnex();
private readonly knex = this.driver.getConnection(this.connectionType).getKnex();
private readonly helper = new QueryBuilderHelper(this.entityName, this.alias, this._aliasMap, this.metadata, this.knex, this.platform);

constructor(private readonly entityName: string,
private readonly metadata: MetadataStorage,
private readonly driver: IDatabaseDriver,
private readonly driver: AbstractSqlDriver,
private readonly context?: Transaction,
readonly alias = `e0`) { }
readonly alias = `e0`,
private readonly connectionType?: 'read' | 'write') { }

select(fields: string | string[], distinct = false): this {
this._fields = Utils.asArray(fields);
Expand Down Expand Up @@ -236,7 +235,8 @@ export class QueryBuilder {
}

async execute(method: 'all' | 'get' | 'run' = 'all', mapResults = true): Promise<any> {
const res = await this.connection.execute(this.getKnexQuery(), [], method);
const type = this.connectionType || (method === 'run' ? 'write' : 'read');
const res = await this.driver.getConnection(type).execute(this.getKnexQuery(), [], method);
const meta = this.metadata.get(this.entityName);

if (!mapResults) {
Expand All @@ -251,7 +251,7 @@ export class QueryBuilder {
}

clone(): QueryBuilder {
const qb = new QueryBuilder(this.entityName, this.metadata, this.driver, this.context, this.alias);
const qb = new QueryBuilder(this.entityName, this.metadata, this.driver, this.context, this.alias, this.connectionType);
Object.assign(qb, this);

// clone array/object properties
Expand Down Expand Up @@ -410,6 +410,7 @@ export class QueryBuilder {
if (this.context) {
qb.transacting(this.context);
}

return qb;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/unit-of-work/UnitOfWork.ts
Expand Up @@ -102,7 +102,7 @@ export class UnitOfWork {
const promise = async (tx: Transaction) => await Utils.runSerial(this.changeSets, changeSet => this.commitChangeSet(changeSet, tx));

if (runInTransaction) {
await this.em.getConnection().transactional(trx => promise(trx));
await this.em.getConnection('write').transactional(trx => promise(trx));
} else {
await promise(this.em.getTransactionContext());
}
Expand Down
26 changes: 16 additions & 10 deletions lib/utils/Configuration.ts
Expand Up @@ -31,6 +31,7 @@ export class Configuration {
hydrator: ObjectHydrator,
tsNode: false,
debug: false,
verbose: false,
cache: {
enabled: true,
adapter: FileCacheAdapter,
Expand Down Expand Up @@ -75,8 +76,8 @@ export class Configuration {
this.init();
}

get<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, defaultValue?: U): MikroORMOptions[T] {
return (Utils.isDefined(this.options[key]) ? this.options[key] : defaultValue) as MikroORMOptions[T];
get<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, defaultValue?: U): U {
return (Utils.isDefined(this.options[key]) ? this.options[key] : defaultValue) as U;
}

set<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, value: U): void {
Expand Down Expand Up @@ -172,8 +173,19 @@ export class Configuration {

}

export interface MikroORMOptions {
export interface ConnectionOptions {
name?: string;
dbName: string;
clientUrl?: string;
host?: string;
port?: number;
user?: string;
password?: string;
multipleStatements?: boolean; // for mysql driver
pool: PoolConfig;
}

export interface MikroORMOptions extends ConnectionOptions {
entities: (EntityClass<IEntity> | EntityClassGroup<IEntity>)[];
entitiesDirs: string[];
entitiesDirsTs: string[];
Expand All @@ -185,13 +197,7 @@ export interface MikroORMOptions {
namingStrategy?: { new (): NamingStrategy };
hydrator: { new (factory: EntityFactory, driver: IDatabaseDriver): Hydrator };
entityRepository: { new (em: EntityManager, entityName: EntityName<IEntity>): EntityRepository<IEntity> };
clientUrl?: string;
host?: string;
port?: number;
user?: string;
password?: string;
multipleStatements?: boolean; // for mysql driver
pool: PoolConfig;
replicas?: Partial<ConnectionOptions>[];
strict: boolean;
logger: (message: string) => void;
debug: boolean | LoggerNamespace[];
Expand Down
4 changes: 4 additions & 0 deletions lib/utils/Utils.ts
Expand Up @@ -269,4 +269,8 @@ export class Utils {
}, [] as T[]);
}

static randomInt(min: number, max: number): number {
return Math.round(Math.random() * (max - min)) + min;
}

}

0 comments on commit 6144463

Please sign in to comment.