-
-
Notifications
You must be signed in to change notification settings - Fork 65
/
client.ts
153 lines (142 loc) · 3.7 KB
/
client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import { Connection, ExecuteResult } from "./connection.ts";
import { ResponseTimeoutError, WriteError } from "./constant/errors.ts";
import { DeferredStack } from "./deferred.ts";
import { log } from "./logger.ts";
/**
* Clinet Config
*/
export interface ClientConfig {
/** Database hostname */
hostname?: string;
/** Database username */
username?: string;
/** Database password */
password?: string;
/** Database port */
port?: number;
/** Database name */
db?: string;
/** Whether to Display Packet Debugging Information */
debug?: boolean;
/** Connect timeout */
timeout?: number;
/** Connection pool size default 1 */
poolSize?: number;
/** charset */
charset?: string;
}
/** Transaction processor */
export interface TransactionProcessor<T> {
(connection: Connection): Promise<T>;
}
/**
* MySQL client
*/
export class Client {
config: ClientConfig = {};
private _pool?: DeferredStack<Connection>;
private _connections: Connection[] = [];
private async createConnection(): Promise<Connection> {
let connection: Connection = new Connection(this);
await connection.connect();
return connection;
}
/** get pool info */
get pool() {
if (this._pool) {
return {
size: this._pool.size,
maxSize: this._pool.maxSize,
available: this._pool.available,
};
}
}
/**
* connect to database
* @param config config for client
* @returns Clinet instance
*/
async connect(config: ClientConfig): Promise<Client> {
this.config = {
hostname: "127.0.0.1",
username: "root",
port: 3306,
poolSize: 1,
...config,
};
Object.freeze(this.config);
this._connections = [];
this._pool = new DeferredStack<Connection>(
this.config.poolSize || 10,
this._connections,
this.createConnection.bind(this),
);
return this;
}
/**
* excute query sql
* @param sql query sql string
* @param params query params
*/
async query(sql: string, params?: any[]): Promise<any> {
return await this.useConnection(async (connection) => {
return await connection.query(sql, params);
});
}
/**
* excute sql
* @param sql sql string
* @param params query params
*/
async execute(sql: string, params?: any[]): Promise<ExecuteResult> {
return await this.useConnection(async (connection) => {
return await connection.execute(sql, params);
});
}
async useConnection<T>(fn: (conn: Connection) => Promise<T>) {
if (!this._pool) {
throw new Error("Unconnected");
}
const connection = await this._pool.pop();
try {
const result = await fn(connection);
this._pool.push(connection);
return result;
} catch (error) {
if (
error instanceof WriteError ||
error instanceof ResponseTimeoutError
) {
this._pool.reduceSize();
} else {
this._pool.push(connection);
}
throw error;
}
}
/**
* Execute a transaction process, and the transaction successfully
* returns the return value of the transaction process
* @param processor transation processor
*/
async transaction<T = any>(processor: TransactionProcessor<T>): Promise<T> {
return await this.useConnection(async (connection) => {
try {
await connection.execute("BEGIN");
const result = await processor(connection);
await connection.execute("COMMIT");
return result;
} catch (error) {
log.info(`ROLLBACK: ${error.message}`);
await connection.execute("ROLLBACK");
throw error;
}
});
}
/**
* close connection
*/
async close() {
await Promise.all(this._connections.map((conn) => conn.close()));
}
}