Skip to content

Commit

Permalink
[misc] always using promise.
Browse files Browse the repository at this point in the history
Callback can still be used for compatibility when option 'useCallback' is set
  • Loading branch information
rusher committed Jun 5, 2018
1 parent 95f403b commit 2a1361a
Show file tree
Hide file tree
Showing 20 changed files with 616 additions and 476 deletions.
2 changes: 1 addition & 1 deletion lib/config/connection-options.js
Expand Up @@ -70,10 +70,10 @@ class ConnectionOptions {
if (this.typeCast != undefined && typeof this.typeCast !== "function") {
this.typeCast = undefined;
}
this.useCallback = opts.useCallback || false;
this.user = opts.user || process.env.USERNAME;
this.nestTables = opts.nestTables === undefined ? undefined : opts.nestTables;
this.rowsAsArray = opts.rowsAsArray || false;

this.ssl = opts.ssl;
if (opts.ssl) {
if (typeof opts.ssl === "string") {
Expand Down
180 changes: 96 additions & 84 deletions lib/connection.js
Expand Up @@ -47,51 +47,57 @@ function Connection(options) {
//*****************************************************************

/**
* Connect event with callback.
* Connect event
*
* @param callback(error)
* @returns {Promise} promise if no callback
* @returns {Promise} promise
*/
this.connect = callback => {
this.connect = arg => {
if (arg) {
return Promise.reject(
Errors.createError(
"No parameters allowed on Connection.connect(). " +
"If wanting to use callback, set option 'useCallback'",
true,
info,
"08S01",
Errors.ER_WRONG_PARAMETERS
)
);
}

switch (_status) {
case Status.NOT_CONNECTED:
_status = Status.CONNECTING;
if (callback) {
_registerHandshakeCmd(callback, callback);
break;
}
return new Promise(function(resolve, reject) {
_registerHandshakeCmd(resolve, reject);
});

break;

case Status.CLOSING:
case Status.CLOSED:
const err = Errors.createError(
"Connection closed",
true,
info,
"08S01",
Errors.ER_CONNECTION_ALREADY_CLOSED
return Promise.reject(
Errors.createError(
"Connection closed",
true,
info,
"08S01",
Errors.ER_CONNECTION_ALREADY_CLOSED
)
);
if (callback) return callback(err);
return Promise.reject(err);

case Status.CONNECTING:
case Status.AUTHENTICATING:
const errConnecting = Errors.createError(
"Connection is already connecting",
true,
info,
"08S01",
Errors.ER_ALREADY_CONNECTING
return Promise.reject(
Errors.createError(
"Connection is already connecting",
true,
info,
"08S01",
Errors.ER_ALREADY_CONNECTING
)
);
if (callback) return callback(errConnecting);
return Promise.reject(errConnecting);

case Status.CONNECTED:
if (callback) callback();
default:
//status Connected
return Promise.resolve();
}
};
Expand All @@ -102,46 +108,25 @@ function Connection(options) {
* !!! mysql has a bug when CONNECT_ATTRS capability is set, that is default !!!!
*
* @param options connection options
* @param callback callback function
* @returns {Promise} promise
*/
this.changeUser = (options, callback) => {
this.changeUser = options => {
if (!this.isMariaDB()) {
const err = Errors.createError(
"method changeUser not available for MySQL server due to Bug #83472",
false,
info,
"0A000",
Errors.ER_MYSQL_CHANGE_USER_BUG
);
if (callback) {
callback(err);
return;
}
throw err;
}
let _options, _cb;
if (typeof options === "function") {
_cb = options;
_options = undefined;
} else {
_options = options;
_cb = callback;
}

if (_cb) {
const _changeUserEnd = err => {
process.nextTick(_cb, err);
if (err) _fatalError(err, true);
};
return _addCommand(
new ChangeUser(_options, _changeUserEnd, _changeUserEnd, _addCommand.bind(this))
return Promise.reject(
Errors.createError(
"method changeUser not available for MySQL server due to Bug #83472",
false,
info,
"0A000",
Errors.ER_MYSQL_CHANGE_USER_BUG
)
);
}

return new Promise(function(resolve, reject) {
_addCommand(
new ChangeUser(
_options,
options,
resolve,
_authFailHandler.bind(this, reject),
_addCommand.bind(this)
Expand Down Expand Up @@ -263,37 +248,25 @@ function Connection(options) {
* @param callback when done
* @returns {Promise} promise when no callback
*/
this.end = callback => {
this.end = () => {
_addCommand = _addCommandDisabled;
if (_status === Status.CONNECTING || _status === Status.CONNECTED) {
_status = Status.CLOSING;
let quitCmd;
let promise;
if (callback) {
quitCmd = new Quit(() => {
return new Promise(function(resolve, reject) {
//TODO handle socket error
const quitCmd = new Quit(() => {
let sock = _socket;
_clear();
_status = Status.CLOSED;
setImmediate(callback);
setImmediate(resolve);
sock.destroy();
});
} else {
promise = new Promise(function(resolve, reject) {
quitCmd = new Quit(() => {
let sock = _socket;
_clear();
_status = Status.CLOSED;
setImmediate(resolve);
sock.destroy();
});
});
}
_sendQueue.push(quitCmd);
_receiveQueue.push(quitCmd);
if (_sendQueue.length === 1) {
process.nextTick(_nextSendCmd.bind(this));
}
return promise;
_sendQueue.push(quitCmd);
_receiveQueue.push(quitCmd);
if (_sendQueue.length === 1) {
process.nextTick(_nextSendCmd.bind(this));
}
});
}
return Promise.resolve();
};
Expand All @@ -311,7 +284,7 @@ function Connection(options) {
//only possibility is to kill process by another thread
//TODO reuse a pool connection to avoid connection creation
const killCon = new Connection(opts);
killCon.connect(() => {
killCon.connect().then(() => {
killCon.query("KILL " + info.threadId, () => {
const err = Errors.createError(
"Connection destroyed, command was killed",
Expand Down Expand Up @@ -440,6 +413,42 @@ function Connection(options) {
// internal methods
//*****************************************************************

const _useCallbackFct = () => {
const connectWithPromise = this.connect;
this.connect = callback => {
const connectPromise = connectWithPromise();
if (callback) {
connectPromise.then(rows => callback(null, null, null)).catch(callback);
}
};

const changeUserWithPromise = this.changeUser;
this.changeUser = (options, callback) => {
let _options, _cb;
if (typeof options === "function") {
_cb = options;
_options = undefined;
} else {
_options = options;
_cb = callback;
}

const promise = changeUserWithPromise(_options);

if (_cb) {
promise.then(_cb).catch(_cb);
}
};

const endWithPromise = this.end;
this.end = callback => {
const promise = endWithPromise();
if (callback) {
promise.then(callback).catch(callback);
}
};
};

/**
* Add handshake command to queue.
*
Expand Down Expand Up @@ -910,6 +919,9 @@ function Connection(options) {
let _out = new PacketOutputStream(opts, info);
let _in = new PacketInputStream(_unexpectedPacket.bind(this), _receiveQueue, _out, opts, info);

//overload methods with callback if option
if (opts.useCallback) _useCallbackFct();

//add alias threadId for mysql/mysql2 compatibility
Object.defineProperty(this, "threadId", {
get() {
Expand Down
1 change: 1 addition & 0 deletions lib/misc/errors.js
Expand Up @@ -75,6 +75,7 @@ module.exports.ER_SERVER_SSL_DISABLED = 45023;
module.exports.ER_AUTHENTICATION_BAD_PACKET = 45024;
module.exports.ER_AUTHENTICATION_PLUGIN_NOT_SUPPORTED = 45025;
module.exports.ER_SOCKET_TIMEOUT = 45026;
module.exports.ER_WRONG_PARAMETERS = 45027;

const keys = Object.keys(module.exports);
const errByNo = {};
Expand Down
16 changes: 5 additions & 11 deletions test/base.js
Expand Up @@ -13,24 +13,18 @@ before("share initialization", done => {
if (global.shareConn) {
done();
} else {
// console.log("connecting share connection : ");
// console.log(connOptions);
// console.log(" ");
let conn = new Connection(connOptions);
conn.connect(err => {
if (err) {
done(err);
} else {
done();
}
});
conn
.connect()
.then(done)
.catch(done);
global.shareConn = conn;
}
});

after("share destroy", () => {
if (shareConn) {
shareConn.end(() => (global.shareConn = undefined));
shareConn.end().then(() => (global.shareConn = undefined));
}
});

Expand Down
46 changes: 24 additions & 22 deletions test/integration/datatype/test-datetime.js
Expand Up @@ -90,27 +90,29 @@ describe("datetime", () => {
profileSql: true
});

conn1.connect(function(err) {
if (err) return done(err);
conn1.query("select * from table_date", (err, rows) => {
if (err) throw err;
assert.equal(rows[0].t0, "2001-12-31");
assert.equal(rows[0].t1, "2001-12-31 23:59:58.123");
//microsecond doesn't work in javascript date
assert.equal(rows[0].t2, "2001-12-31 23:59:59.123000");

assert.isNull(rows[1].t0);
assert.isNull(rows[1].t1);
assert.isNull(rows[1].t2);

if (shareConn.isMariaDB() || !shareConn.hasMinVersion(5, 7)) {
assert.equal(rows[2].t0, "0000-00-00");
assert.equal(rows[2].t1, "0000-00-00 00:00:00.000");
assert.equal(rows[2].t2, "0000-00-00 00:00:00.000000");
}
conn1.end();
done();
});
});
conn1
.connect()
.then(() => {
conn1.query("select * from table_date", (err, rows) => {
if (err) throw err;
assert.equal(rows[0].t0, "2001-12-31");
assert.equal(rows[0].t1, "2001-12-31 23:59:58.123");
//microsecond doesn't work in javascript date
assert.equal(rows[0].t2, "2001-12-31 23:59:59.123000");

assert.isNull(rows[1].t0);
assert.isNull(rows[1].t1);
assert.isNull(rows[1].t2);

if (shareConn.isMariaDB() || !shareConn.hasMinVersion(5, 7)) {
assert.equal(rows[2].t0, "0000-00-00");
assert.equal(rows[2].t1, "0000-00-00 00:00:00.000");
assert.equal(rows[2].t2, "0000-00-00 00:00:00.000000");
}
conn1.end();
done();
});
})
.catch(done);
});
});
20 changes: 11 additions & 9 deletions test/integration/datatype/test-string.js
Expand Up @@ -68,15 +68,17 @@ describe("string", () => {
const encodings = ["KOI8R_GENERAL_CI", "UTF8_GENERAL_CI", "CP850_BIN", "CP1251_GENERAL_CI"];
for (let i = 0; i < encodings.length; i++) {
const conn = base.createConnection({ charset: encodings[i] });
conn.connect(err => {
if (err) return done(err);
conn.query("select ? as t", value, (err, res) => assert.strictEqual(res[0].t, value));
conn.execute("select ? as t", value, (err, res) => {
assert.strictEqual(res[0].t, value);
conn.end();
if (i === encodings.length - 1) done();
});
});
conn
.connect()
.then(() => {
conn.query("select ? as t", value, (err, res) => assert.strictEqual(res[0].t, value));
conn.execute("select ? as t", value, (err, res) => {
assert.strictEqual(res[0].t, value);
conn.end();
if (i === encodings.length - 1) done();
});
})
.catch(done);
}
});

Expand Down

0 comments on commit 2a1361a

Please sign in to comment.