Skip to content

Commit

Permalink
[CONJS-257] permit to import sql file directly #242
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed May 29, 2023
1 parent 11d925d commit a25d3b1
Show file tree
Hide file tree
Showing 19 changed files with 3,998 additions and 90 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -35,7 +35,8 @@ Connector is production grade quality, with multiple features:
* easy debugging, trace pointing to code line on error
* allows data streaming without high memory consumption
* pipelining
* metadata skipping (for MariaDB server only)
* metadata skipping (for MariaDB server only)
* sql file import
* ...

see some of those features:
Expand Down
23 changes: 23 additions & 0 deletions callback.js
Expand Up @@ -10,6 +10,7 @@ const ConnOptions = require('./lib/config/connection-options');
const PoolOptions = require('./lib/config/pool-options');
const ClusterOptions = require('./lib/config/cluster-options');
const Connection = require('./lib/connection');
const CommandParameter = require('./lib/command-parameter');

module.exports.version = require('./package.json').version;
module.exports.SqlError = require('./lib/misc/errors').SqlError;
Expand Down Expand Up @@ -51,3 +52,25 @@ exports.createPoolCluster = function createPoolCluster(opts) {
const options = new ClusterOptions(opts);
return new ClusterCallback(options);
};

module.exports.importFile = function importFile(opts, callback) {
const cb = callback ? callback : () => {};
try {
const options = new ConnOptions(opts);
const conn = new Connection(options);
conn
.connect()
.then(() => {
return new Promise(function (res, rej) {
return conn.importFile(Object.assign({ skipDbCheck: true }, opts), res, rej);
});
})
.then(() => cb())
.catch((err) => cb(err))
.finally(() => {
new Promise(conn.end.bind(conn, new CommandParameter())).catch(() => {});
});
} catch (err) {
cb(err);
}
};
144 changes: 97 additions & 47 deletions documentation/callback-api.md

Large diffs are not rendered by default.

146 changes: 108 additions & 38 deletions documentation/promise-api.md

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions lib/connection-callback.js
Expand Up @@ -229,6 +229,37 @@ class ConnectionCallback {
});
}

/**
* Import sql file.
*
* @param opts JSON array with 2 possible fields: file and database
* @param cb callback
*/
importFile(opts, cb) {
if (!opts) {
if (cb)
cb(
Errors.createError(
'SQL file parameter is mandatory',
Errors.ER_MISSING_SQL_PARAMETER,
this.#conn.info,
'HY000',
null,
false,
null
)
);
return;
}
new Promise(this.#conn.importFile.bind(this.#conn, { file: opts.file, database: opts.database }))
.then(() => {
if (cb) cb();
})
.catch((err) => {
if (cb) cb(err);
});
}

/**
* Send an empty MySQL packet to ensure connection is active, and reset @@wait_timeout
* @param timeout (optional) timeout value in ms. If reached, throw error and close connection
Expand Down
23 changes: 23 additions & 0 deletions lib/connection-promise.js
Expand Up @@ -2,6 +2,7 @@

const Stream = require('./cmd/stream');
const CommandParameter = require('./command-parameter');
const Errors = require('./misc/errors');

/**
* New Connection instance.
Expand Down Expand Up @@ -152,6 +153,28 @@ class ConnectionPromise {
return conn.batch(cmdParam);
}

/**
* Import sql file.
*
* @param opts JSON array with 2 possible fields: file and database
*/
importFile(opts) {
if (!opts) {
return Promise.reject(
Errors.createError(
'SQL file parameter is mandatory',
Errors.ER_MISSING_SQL_PARAMETER,
this.#conn.info,
'HY000',
null,
false,
null
)
);
}
return new Promise(this.#conn.importFile.bind(this.#conn, { file: opts.file, database: opts.database }));
}

/**
* Execute query returning a Readable Object that will emit columns/data/end/error events
* to permit streaming big result-set
Expand Down
183 changes: 181 additions & 2 deletions lib/connection.js
Expand Up @@ -29,6 +29,8 @@ const ChangeUser = require('./cmd/change-user');
const { Status } = require('./const/connection_status');
const CommandParameter = require('./command-parameter');
const LruPrepareCache = require('./lru-prepare-cache');
const fsPromises = require('fs').promises;
const Parse = require('./misc/parse');

const convertFixedTime = function (tz, conn) {
if (tz === 'UTC' || tz === 'Etc/UTC' || tz === 'Z' || tz === 'Etc/GMT') {
Expand Down Expand Up @@ -895,6 +897,7 @@ class Connection extends EventEmitter {
* @private
*/
authFailHandler(err) {
clearTimeout(this.timeout);
if (this.connectRejectFct) {
if (this.opts.logger.error) this.opts.logger.error(err);
//remove handshake command
Expand Down Expand Up @@ -1239,13 +1242,15 @@ class Connection extends EventEmitter {
reject(err);
return;
}
this.addCommand = this.addCommandEnable;
if (this.status < Status.CLOSING) {
this.addCommand = this.addCommandEnable;
}
this.addCommand(
new ChangeUser(
cmdParam,
this.opts,
(res) => {
if (this.opts.pipelining) this.addCommand = this.addCommandEnablePipeline;
if (this.status < Status.CLOSING && this.opts.pipelining) this.addCommand = this.addCommandEnablePipeline;
if (cmdParam.opts && cmdParam.opts.collation) this.opts.collation = cmdParam.opts.collation;
resolve(res);
},
Expand Down Expand Up @@ -1304,6 +1309,180 @@ class Connection extends EventEmitter {
this.addCommand(cmd);
}

importFile(cmdParam, resolve, reject) {
const conn = this;
if (!cmdParam.file) {
return reject(
Errors.createError(
'SQL file parameter is mandatory',
Errors.ER_MISSING_SQL_PARAMETER,
conn.info,
'HY000',
null,
false,
cmdParam.stack
)
);
}

let prevDatabase = null;
return (
cmdParam.skipDbCheck
? Promise.resolve()
: new Promise(conn.query.bind(conn, new CommandParameter('SELECT DATABASE() as db', null, {})))
).then((res) => {
prevDatabase = res ? res[0].db : null;
if (
(cmdParam.skipDbCheck && !conn.opts.database) ||
(!cmdParam.skipDbCheck && !cmdParam.database && !prevDatabase)
) {
return reject(
Errors.createError(
'Database parameter is not set and no database is selected',
Errors.ER_MISSING_DATABASE_PARAMETER,
conn.info,
'HY000',
null,
false,
cmdParam.stack
)
);
}
const searchDbPromise = cmdParam.database
? new Promise(
conn.query.bind(conn, new CommandParameter(`USE \`${cmdParam.database.replace(/`/gi, '``')}\``, null, {}))
)
: Promise.resolve();
return searchDbPromise.then(() => {
return fsPromises
.open(cmdParam.file, 'r')
.then(async (fd) => {
const buf = {
buffer: Buffer.allocUnsafe(16384),
offset: 0,
end: 0
};
let endingFunction = () => {};
if (conn.status < Status.CLOSING) {
conn.addCommand = conn.addCommandEnable.bind(conn);
if (conn.status < Status.CLOSING && conn.opts.pipelining) {
endingFunction = () => {
conn.addCommand = conn.addCommandEnablePipeline;
};
}
}

const queryPromises = [];
let cmdError = null;
while (!cmdError) {
try {
const res = await fd.read(buf.buffer, buf.end, buf.buffer.length - buf.end, null);
if (res.bytesRead == 0) {
// end of file reached.
fd.close().catch(() => {});
if (cmdError) {
endingFunction();
reject(cmdError);
return;
}
Promise.allSettled(queryPromises)
.then(() => {
if (!cmdParam.skipDbCheck && cmdParam.database && cmdParam.database != prevDatabase) {
return new Promise(
conn.query.bind(
conn,
new CommandParameter(`USE \`${prevDatabase.replace(/`/gi, '``')}\``, null, {})
)
);
}
return Promise.resolve();
})
.then(() => {
endingFunction();
resolve();
})
.catch((err) => {
endingFunction();
reject(err);
});
return;
} else {
buf.end += res.bytesRead;
const queries = Parse.parseQueries(buf);
const queryIntermediatePromise = queries.flatMap((element) => {
return new Promise(conn.query.bind(conn, new CommandParameter(element, null, null))).catch(
(err) => {
cmdError = err;
}
);
});

queryPromises.push(...queryIntermediatePromise);
if (buf.offset == buf.end) {
buf.offset = 0;
buf.end = 0;
} else {
// ensure that buffer can at least read 8k bytes,
// either by copying remaining data on used part or growing buffer
if (buf.offset > 8192) {
// reuse buffer, copying remaining data begin of buffer
buf.buffer.copy(buf.buffer, 0, buf.offset, buf.end);
buf.end -= buf.offset;
buf.offset = 0;
} else if (buf.buffer.length - buf.end < 8192) {
// grow buffer
const tmpBuf = Buffer.allocUnsafe(buf.buffer.length << 1);
buf.buffer.copy(tmpBuf, 0, buf.offset, buf.end);
buf.buffer = tmpBuf;
buf.end -= buf.offset;
buf.offset = 0;
}
}
}
} catch (e) {
fd.close().catch(() => {});
endingFunction();
Promise.allSettled(queryPromises).catch(() => {});
return reject(
Errors.createError(
e.message,
Errors.ER_SQL_FILE_ERROR,
conn.info,
'HY000',
null,
false,
cmdParam.stack
)
);
}
}
if (cmdError) {
endingFunction();
reject(cmdError);
}
})
.catch((err) => {
if (err.code === 'ENOENT') {
return reject(
Errors.createError(
`SQL file parameter '${cmdParam.file}' doesn't exists`,
Errors.ER_MISSING_SQL_FILE,
conn.info,
'HY000',
null,
false,
cmdParam.stack
)
);
}
return reject(
Errors.createError(err.message, Errors.ER_SQL_FILE_ERROR, conn.info, 'HY000', null, false, cmdParam.stack)
);
});
});
});
}

/**
* Clearing connection variables when ending.
*
Expand Down
4 changes: 4 additions & 0 deletions lib/misc/errors.js
Expand Up @@ -133,6 +133,10 @@ module.exports.ER_COMPRESSION_NOT_SUPPORTED = 45048;
module.exports.ER_UNDEFINED_SQL = 45049;
module.exports.ER_PARSING_PRECISION = 45050;
module.exports.ER_PREPARE_CLOSED = 45051;
module.exports.ER_MISSING_SQL_PARAMETER = 45052;
module.exports.ER_MISSING_SQL_FILE = 45053;
module.exports.ER_SQL_FILE_ERROR = 45054;
module.exports.ER_MISSING_DATABASE_PARAMETER = 45055;

const keys = Object.keys(module.exports);
const errByNo = {};
Expand Down

0 comments on commit a25d3b1

Please sign in to comment.