Skip to content

Commit

Permalink
Merge 4ff90dc into 8b18a87
Browse files Browse the repository at this point in the history
  • Loading branch information
B4nan committed Sep 12, 2019
2 parents 8b18a87 + 4ff90dc commit 50d8278
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 49 deletions.
10 changes: 5 additions & 5 deletions lib/EntityManager.ts
@@ -1,7 +1,7 @@
import { Configuration, RequestContext, Utils, ValidationError } from './utils';
import { EntityAssigner, EntityFactory, EntityLoader, EntityRepository, EntityValidator, ReferenceType } from './entity';
import { LockMode, UnitOfWork } from './unit-of-work';
import { FilterQuery, IDatabaseDriver } from './drivers';
import { AbstractSqlDriver, FilterQuery, IDatabaseDriver } from './drivers';
import { EntityData, EntityMetadata, EntityName, IEntity, IEntityType, IPrimaryKey } from './decorators';
import { QueryBuilder, QueryOrderMap, SmartQueryHelper } from './query';
import { MetadataStorage } from './metadata';
Expand All @@ -24,8 +24,8 @@ export class EntityManager {
return this.driver as D;
}

getConnection<C extends Connection = Connection>(): C {
return this.driver.getConnection() as C;
getConnection<C extends Connection = Connection>(type?: 'read' | 'write'): C {
return this.driver.getConnection(type) as C;
}

getRepository<T extends IEntityType<T>>(entityName: EntityName<T>): EntityRepository<T> {
Expand All @@ -44,9 +44,9 @@ export class EntityManager {
return this.validator;
}

createQueryBuilder(entityName: EntityName<IEntity>, alias?: string): QueryBuilder {
createQueryBuilder(entityName: EntityName<IEntity>, alias?: string, type?: 'read' | 'write'): QueryBuilder {
entityName = Utils.className(entityName);
return new QueryBuilder(entityName, this.metadata, this.driver, this.transactionContext, alias);
return new QueryBuilder(entityName, this.metadata, this.driver as AbstractSqlDriver, this.transactionContext, alias, type);
}

async find<T extends IEntityType<T>>(entityName: EntityName<T>, where?: FilterQuery<T>, options?: FindOptions): Promise<T[]>;
Expand Down
5 changes: 2 additions & 3 deletions lib/MikroORM.ts
Expand Up @@ -45,8 +45,7 @@ export class MikroORM {
}

async connect(): Promise<IDatabaseDriver> {
const connection = this.driver.getConnection();
await connection.connect();
const connection = await this.driver.connect();
const clientUrl = connection.getClientUrl();
const dbName = this.config.get('dbName');
this.logger.log('info', `MikroORM successfully connected to database ${chalk.green(dbName)}${clientUrl ? ' on ' + chalk.green(clientUrl) : ''}`);
Expand All @@ -59,7 +58,7 @@ export class MikroORM {
}

async close(force = false): Promise<void> {
return this.driver.getConnection().close(force);
return this.driver.close(force);
}

getMetadata(): MetadataStorage {
Expand Down
31 changes: 21 additions & 10 deletions lib/connections/Connection.ts
Expand Up @@ -3,16 +3,22 @@ import { Transaction as KnexTransaction } from 'knex';
import chalk from 'chalk';
import highlight from 'cli-highlight';

import { Configuration, Utils } from '../utils';
import { Configuration, ConnectionOptions, Utils } from '../utils';
import { MetadataStorage } from '../metadata';

export abstract class Connection {

protected readonly logger = this.config.getLogger();
protected metadata: MetadataStorage;
protected abstract client: any;

constructor(protected readonly config: Configuration) { }
constructor(protected readonly config: Configuration,
protected readonly options?: ConnectionOptions,
protected readonly type: 'read' | 'write' = 'write') {
if (!this.options) {
const props = ['dbName', 'clientUrl', 'host', 'port', 'user', 'password', 'multipleStatements', 'pool'] as const;
this.options = props.reduce((o, i) => { (o[i] as any) = this.config.get(i); return o; }, {} as ConnectionOptions);
}
}

/**
* Establishes connection to database
Expand Down Expand Up @@ -42,12 +48,12 @@ export abstract class Connection {

getConnectionOptions(): ConnectionConfig {
const ret: ConnectionConfig = {};
const url = new URL(this.config.getClientUrl());
ret.host = this.config.get('host', url.hostname);
ret.port = this.config.get('port', +url.port);
ret.user = this.config.get('user', url.username);
ret.password = this.config.get('password', url.password);
ret.database = this.config.get('dbName', url.pathname.replace(/^\//, ''));
const url = new URL(this.options!.clientUrl || this.config.getClientUrl());
this.options!.host = ret.host = this.config.get('host', url.hostname);
this.options!.port = ret.port = this.config.get('port', +url.port);
this.options!.user = ret.user = this.config.get('user', url.username);
this.options!.password = ret.password = this.config.get('password', url.password);
this.options!.dbName = ret.database = this.config.get('dbName', url.pathname.replace(/^\//, ''));

return ret;
}
Expand Down Expand Up @@ -76,7 +82,12 @@ export abstract class Connection {
query = highlight(query, { language, ignoreIllegals: true, theme: this.config.getHighlightTheme() });
}

const msg = `${query}` + (Utils.isDefined(took) ? chalk.grey(` [took ${took} ms]`) : '');
let msg = query + (Utils.isDefined(took) ? chalk.grey(` [took ${chalk.grey(took)} ms]`) : '');

if (this.config.get('replicas', []).length > 0) {
msg += chalk.cyan(` (via ${this.type} connection '${this.options!.name || this.config.get('name') || this.options!.host}')`);
}

this.config.getLogger().log('query', msg);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/drivers/AbstractSqlDriver.ts
Expand Up @@ -93,7 +93,7 @@ export abstract class AbstractSqlDriver<C extends AbstractSqlConnection = Abstra
}

protected createQueryBuilder(entityName: string, ctx?: Transaction): QueryBuilder {
return new QueryBuilder(entityName, this.metadata, this, ctx);
return new QueryBuilder(entityName, this.metadata, this, ctx, undefined, ctx ? 'write' : 'read');
}

protected extractManyToMany<T extends IEntityType<T>>(entityName: string, data: EntityData<T>): EntityData<T> {
Expand Down
38 changes: 35 additions & 3 deletions lib/drivers/DatabaseDriver.ts
Expand Up @@ -2,14 +2,15 @@ import { FilterQuery, IDatabaseDriver } from './IDatabaseDriver';
import { EntityData, EntityMetadata, EntityProperty, IEntity, IEntityType, IPrimaryKey } from '../decorators';
import { MetadataStorage } from '../metadata';
import { Connection, QueryResult, Transaction } from '../connections';
import { Configuration, Utils } from '../utils';
import { Configuration, ConnectionOptions, Utils } from '../utils';
import { QueryOrder, QueryOrderMap } from '../query';
import { Platform } from '../platforms';
import { LockMode } from '../unit-of-work';

export abstract class DatabaseDriver<C extends Connection> implements IDatabaseDriver<C> {

protected readonly connection: C;
protected readonly replicas: C[] = [];
protected readonly platform: Platform;
protected readonly logger = this.config.getLogger();
protected metadata: MetadataStorage;
Expand Down Expand Up @@ -74,8 +75,26 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD
return ret as T;
}

getConnection(): C {
return this.connection as C;
async connect(): Promise<C> {
await this.connection.connect();
await Promise.all(this.replicas.map(replica => replica.connect()));

return this.connection;
}

getConnection(type: 'read' | 'write' = 'write'): C {
if (type === 'write' || this.replicas.length === 0) {
return this.connection as C;
}

const rand = Utils.randomInt(0, this.replicas.length - 1);

return this.replicas[rand] as C;
}

async close(force?: boolean): Promise<void> {
await Promise.all(this.replicas.map(replica => replica.close(force)));
await this.connection.close(force);
}

getPlatform(): Platform {
Expand All @@ -92,4 +111,17 @@ export abstract class DatabaseDriver<C extends Connection> implements IDatabaseD
return meta ? meta.primaryKey : this.config.getNamingStrategy().referenceColumnName();
}

protected createReplicas(cb: (c: ConnectionOptions) => C): C[] {
const replicas = this.config.get('replicas', [])!;
const ret: C[] = [];
const props = ['dbName', 'clientUrl', 'host', 'port', 'user', 'password', 'multipleStatements', 'pool', 'name'] as const;

replicas.forEach((conf: Partial<ConnectionOptions>) => {
props.forEach(prop => (conf[prop] as any) = prop in conf ? conf[prop] : this.config.get(prop));
ret.push(cb(conf as ConnectionOptions));
});

return ret;
}

}
6 changes: 5 additions & 1 deletion lib/drivers/IDatabaseDriver.ts
Expand Up @@ -7,7 +7,11 @@ import { MetadataStorage } from '../metadata';

export interface IDatabaseDriver<C extends Connection = Connection> {

getConnection(): C;
connect(): Promise<C>;

close(force?: boolean): Promise<void>;

getConnection(type?: 'read' | 'write'): C;

/**
* Finds selection of entities
Expand Down
14 changes: 7 additions & 7 deletions lib/drivers/MongoDriver.ts
Expand Up @@ -15,7 +15,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {

async find<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T>, populate: string[], orderBy: QueryOrderMap, limit: number, offset: number): Promise<T[]> {
where = this.renameFields(entityName, where);
const res = await this.connection.find<T>(entityName, where, orderBy, limit, offset);
const res = await this.getConnection('read').find<T>(entityName, where, orderBy, limit, offset);

return res.map((r: T) => this.mapResult<T>(r, this.metadata.get(entityName))!);
}
Expand All @@ -26,19 +26,19 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
}

where = this.renameFields(entityName, where) as FilterQuery<T>;
const res = await this.connection.find<T>(entityName, where, orderBy, 1, undefined, fields);
const res = await this.getConnection('read').find<T>(entityName, where, orderBy, 1, undefined, fields);

return this.mapResult<T>(res[0], this.metadata.get(entityName));
}

async count<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T>): Promise<number> {
where = this.renameFields(entityName, where);
return this.connection.countDocuments<T>(entityName, where);
return this.getConnection('read').countDocuments<T>(entityName, where);
}

async nativeInsert<T extends IEntityType<T>>(entityName: string, data: EntityData<T>): Promise<QueryResult> {
data = this.renameFields(entityName, data);
return this.connection.insertOne<EntityData<T>>(entityName, data);
return this.getConnection('write').insertOne<EntityData<T>>(entityName, data);
}

async nativeUpdate<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T> | IPrimaryKey, data: EntityData<T>): Promise<QueryResult> {
Expand All @@ -49,7 +49,7 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {
where = this.renameFields(entityName, where) as FilterQuery<T>;
data = this.renameFields(entityName, data);

return this.connection.updateMany<T>(entityName, where, data);
return this.getConnection('write').updateMany<T>(entityName, where, data);
}

async nativeDelete<T extends IEntityType<T>>(entityName: string, where: FilterQuery<T> | IPrimaryKey): Promise<QueryResult> {
Expand All @@ -59,11 +59,11 @@ export class MongoDriver extends DatabaseDriver<MongoConnection> {

where = this.renameFields(entityName, where) as FilterQuery<T>;

return this.connection.deleteMany<T>(entityName, where);
return this.getConnection('write').deleteMany<T>(entityName, where);
}

async aggregate(entityName: string, pipeline: any[]): Promise<any[]> {
return this.connection.aggregate(entityName, pipeline);
return this.getConnection('read').aggregate(entityName, pipeline);
}

private renameFields(entityName: string, data: any): any {
Expand Down
1 change: 1 addition & 0 deletions lib/drivers/MySqlDriver.ts
Expand Up @@ -5,6 +5,7 @@ import { MySqlPlatform } from '../platforms/MySqlPlatform';
export class MySqlDriver extends AbstractSqlDriver<MySqlConnection> {

protected readonly connection = new MySqlConnection(this.config);
protected readonly replicas = this.createReplicas(conf => new MySqlConnection(this.config, conf, 'read'));
protected readonly platform = new MySqlPlatform();

}
17 changes: 9 additions & 8 deletions lib/query/QueryBuilder.ts
Expand Up @@ -6,8 +6,7 @@ import { EntityProperty, IEntity } from '../decorators';
import { ReferenceType } from '../entity';
import { QueryFlag, QueryOrderMap, QueryType } from './enums';
import { LockMode } from '../unit-of-work';
import { AbstractSqlConnection } from '../connections/AbstractSqlConnection';
import { IDatabaseDriver } from '../drivers';
import { AbstractSqlDriver } from '../drivers';
import { MetadataStorage } from '../metadata';

/**
Expand All @@ -33,16 +32,16 @@ export class QueryBuilder {
private _limit: number;
private _offset: number;
private lockMode?: LockMode;
private readonly connection = this.driver.getConnection() as AbstractSqlConnection;
private readonly platform = this.driver.getPlatform();
private readonly knex = this.connection.getKnex();
private readonly knex = this.driver.getConnection(this.connectionType).getKnex();
private readonly helper = new QueryBuilderHelper(this.entityName, this.alias, this._aliasMap, this.metadata, this.knex, this.platform);

constructor(private readonly entityName: string,
private readonly metadata: MetadataStorage,
private readonly driver: IDatabaseDriver,
private readonly driver: AbstractSqlDriver,
private readonly context?: Transaction,
readonly alias = `e0`) { }
readonly alias = `e0`,
private readonly connectionType?: 'read' | 'write') { }

select(fields: string | string[], distinct = false): this {
this._fields = Utils.asArray(fields);
Expand Down Expand Up @@ -236,7 +235,8 @@ export class QueryBuilder {
}

async execute(method: 'all' | 'get' | 'run' = 'all', mapResults = true): Promise<any> {
const res = await this.connection.execute(this.getKnexQuery(), [], method);
const type = this.connectionType || (method === 'run' ? 'write' : 'read');
const res = await this.driver.getConnection(type).execute(this.getKnexQuery(), [], method);
const meta = this.metadata.get(this.entityName);

if (!mapResults) {
Expand All @@ -251,7 +251,7 @@ export class QueryBuilder {
}

clone(): QueryBuilder {
const qb = new QueryBuilder(this.entityName, this.metadata, this.driver, this.context, this.alias);
const qb = new QueryBuilder(this.entityName, this.metadata, this.driver, this.context, this.alias, this.connectionType);
Object.assign(qb, this);

// clone array/object properties
Expand Down Expand Up @@ -410,6 +410,7 @@ export class QueryBuilder {
if (this.context) {
qb.transacting(this.context);
}

return qb;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/unit-of-work/UnitOfWork.ts
Expand Up @@ -102,7 +102,7 @@ export class UnitOfWork {
const promise = async (tx: Transaction) => await Utils.runSerial(this.changeSets, changeSet => this.commitChangeSet(changeSet, tx));

if (runInTransaction) {
await this.em.getConnection().transactional(trx => promise(trx));
await this.em.getConnection('write').transactional(trx => promise(trx));
} else {
await promise(this.em.getTransactionContext());
}
Expand Down
26 changes: 16 additions & 10 deletions lib/utils/Configuration.ts
Expand Up @@ -31,6 +31,7 @@ export class Configuration {
hydrator: ObjectHydrator,
tsNode: false,
debug: false,
verbose: false,
cache: {
enabled: true,
adapter: FileCacheAdapter,
Expand Down Expand Up @@ -75,8 +76,8 @@ export class Configuration {
this.init();
}

get<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, defaultValue?: U): MikroORMOptions[T] {
return (Utils.isDefined(this.options[key]) ? this.options[key] : defaultValue) as MikroORMOptions[T];
get<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, defaultValue?: U): U {
return (Utils.isDefined(this.options[key]) ? this.options[key] : defaultValue) as U;
}

set<T extends keyof MikroORMOptions, U extends MikroORMOptions[T]>(key: T, value: U): void {
Expand Down Expand Up @@ -172,8 +173,19 @@ export class Configuration {

}

export interface MikroORMOptions {
export interface ConnectionOptions {
name?: string;
dbName: string;
clientUrl?: string;
host?: string;
port?: number;
user?: string;
password?: string;
multipleStatements?: boolean; // for mysql driver
pool: PoolConfig;
}

export interface MikroORMOptions extends ConnectionOptions {
entities: (EntityClass<IEntity> | EntityClassGroup<IEntity>)[];
entitiesDirs: string[];
entitiesDirsTs: string[];
Expand All @@ -185,13 +197,7 @@ export interface MikroORMOptions {
namingStrategy?: { new (): NamingStrategy };
hydrator: { new (factory: EntityFactory, driver: IDatabaseDriver): Hydrator };
entityRepository: { new (em: EntityManager, entityName: EntityName<IEntity>): EntityRepository<IEntity> };
clientUrl?: string;
host?: string;
port?: number;
user?: string;
password?: string;
multipleStatements?: boolean; // for mysql driver
pool: PoolConfig;
replicas?: Partial<ConnectionOptions>[];
strict: boolean;
logger: (message: string) => void;
debug: boolean | LoggerNamespace[];
Expand Down
4 changes: 4 additions & 0 deletions lib/utils/Utils.ts
Expand Up @@ -269,4 +269,8 @@ export class Utils {
}, [] as T[]);
}

static randomInt(min: number, max: number): number {
return Math.round(Math.random() * (max - min)) + min;
}

}

0 comments on commit 50d8278

Please sign in to comment.