-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(dialect): snowflake dialect support
- Loading branch information
Jesse Peng
committed
Aug 13, 2021
1 parent
56bb1d6
commit 56a0957
Showing
33 changed files
with
2,768 additions
and
311 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
'use strict'; | ||
|
||
const AbstractConnectionManager = require('../abstract/connection-manager'); | ||
const SequelizeErrors = require('../../errors'); | ||
const { logger } = require('../../utils/logger'); | ||
const DataTypes = require('../../data-types').snowflake; | ||
const debug = logger.debugContext('connection:snowflake'); | ||
const parserStore = require('../parserStore')('snowflake'); | ||
|
||
/** | ||
* Snowflake Connection Manager | ||
* | ||
* Get connections, validate and disconnect them. | ||
* | ||
* @private | ||
*/ | ||
class ConnectionManager extends AbstractConnectionManager { | ||
constructor(dialect, sequelize) { | ||
sequelize.config.port = sequelize.config.port || 3306; | ||
super(dialect, sequelize); | ||
this.lib = this._loadDialectModule('snowflake-sdk'); | ||
this.refreshTypeParser(DataTypes); | ||
} | ||
|
||
_refreshTypeParser(dataType) { | ||
parserStore.refresh(dataType); | ||
} | ||
|
||
_clearTypeParser() { | ||
parserStore.clear(); | ||
} | ||
|
||
static _typecast(field, next) { | ||
if (parserStore.get(field.type)) { | ||
return parserStore.get(field.type)(field, this.sequelize.options, next); | ||
} | ||
return next(); | ||
} | ||
|
||
/** | ||
* Connect with a snowflake database based on config, Handle any errors in connection | ||
* Set the pool handlers on connection.error | ||
* Also set proper timezone once connection is connected. | ||
* | ||
* @param {object} config | ||
* @returns {Promise<Connection>} | ||
* @private | ||
*/ | ||
async connect(config) { | ||
const connectionConfig = { | ||
account: config.host, | ||
username: config.username, | ||
password: config.password, | ||
database: config.database, | ||
warehouse: config.warehouse, | ||
role: config.role, | ||
/* | ||
flags: '-FOUND_ROWS', | ||
timezone: this.sequelize.options.timezone, | ||
typeCast: ConnectionManager._typecast.bind(this), | ||
bigNumberStrings: false, | ||
supportBigNumbers: true, | ||
*/ | ||
...config.dialectOptions | ||
}; | ||
|
||
try { | ||
|
||
const connection = await new Promise((resolve, reject) => { | ||
this.lib.createConnection(connectionConfig).connect((err, conn) => { | ||
if (err) { | ||
console.log(err); | ||
reject(err); | ||
} else { | ||
resolve(conn); | ||
} | ||
}); | ||
}); | ||
|
||
debug('connection acquired'); | ||
|
||
if (!this.sequelize.config.keepDefaultTimezone) { | ||
// default value is '+00:00', put a quick workaround for it. | ||
const tzOffset = this.sequelize.options.timezone === '+00:00' ? 'Etc/UTC' : this.sequelize.options.timezone; | ||
const isNamedTzOffset = /\//.test(tzOffset); | ||
if ( isNamedTzOffset ) { | ||
await new Promise((resolve, reject) => { | ||
connection.execute({ | ||
sqlText: `ALTER SESSION SET timezone = '${tzOffset}'`, | ||
complete(err) { | ||
if (err) { | ||
console.log(err); | ||
reject(err); | ||
} else { | ||
resolve(); | ||
} | ||
} | ||
}); | ||
}); | ||
} else { | ||
throw Error('only support time zone name for snowflake!'); | ||
} | ||
} | ||
|
||
return connection; | ||
} catch (err) { | ||
switch (err.code) { | ||
case 'ECONNREFUSED': | ||
throw new SequelizeErrors.ConnectionRefusedError(err); | ||
case 'ER_ACCESS_DENIED_ERROR': | ||
throw new SequelizeErrors.AccessDeniedError(err); | ||
case 'ENOTFOUND': | ||
throw new SequelizeErrors.HostNotFoundError(err); | ||
case 'EHOSTUNREACH': | ||
throw new SequelizeErrors.HostNotReachableError(err); | ||
case 'EINVAL': | ||
throw new SequelizeErrors.InvalidConnectionError(err); | ||
default: | ||
throw new SequelizeErrors.ConnectionError(err); | ||
} | ||
} | ||
} | ||
|
||
async disconnect(connection) { | ||
// Don't disconnect connections with CLOSED state | ||
if (connection._closing) { | ||
debug('connection tried to disconnect but was already at CLOSED state'); | ||
return; | ||
} | ||
|
||
return new Promise((resolve, reject) => { | ||
connection.destroy(err => { | ||
if (err) { | ||
console.error(`Unable to disconnect: ${err.message}`); | ||
reject(err); | ||
} else { | ||
console.log(`Disconnected connection with id: ${connection.getId()}`); | ||
resolve(connection.getId()); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
validate(connection) { | ||
return connection.isUp(); | ||
} | ||
} | ||
|
||
module.exports = ConnectionManager; | ||
module.exports.ConnectionManager = ConnectionManager; | ||
module.exports.default = ConnectionManager; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
'use strict'; | ||
|
||
const moment = require('moment-timezone'); | ||
module.exports = BaseTypes => { | ||
BaseTypes.ABSTRACT.prototype.dialectTypes = 'https://dev.snowflake.com/doc/refman/5.7/en/data-types.html'; | ||
|
||
/** | ||
* types: [buffer_type, ...] | ||
* | ||
* @see buffer_type here https://dev.snowflake.com/doc/refman/5.7/en/c-api-prepared-statement-type-codes.html | ||
* @see hex here https://github.com/sidorares/node-mysql2/blob/master/lib/constants/types.js | ||
*/ | ||
|
||
BaseTypes.DATE.types.snowflake = ['DATETIME']; | ||
BaseTypes.STRING.types.snowflake = ['VAR_STRING']; | ||
BaseTypes.CHAR.types.snowflake = ['STRING']; | ||
BaseTypes.TEXT.types.snowflake = ['BLOB']; | ||
BaseTypes.TINYINT.types.snowflake = ['TINY']; | ||
BaseTypes.SMALLINT.types.snowflake = ['SHORT']; | ||
BaseTypes.MEDIUMINT.types.snowflake = ['INT24']; | ||
BaseTypes.INTEGER.types.snowflake = ['LONG']; | ||
BaseTypes.BIGINT.types.snowflake = ['LONGLONG']; | ||
BaseTypes.FLOAT.types.snowflake = ['FLOAT']; | ||
BaseTypes.TIME.types.snowflake = ['TIME']; | ||
BaseTypes.DATEONLY.types.snowflake = ['DATE']; | ||
BaseTypes.BOOLEAN.types.snowflake = ['TINY']; | ||
BaseTypes.BLOB.types.snowflake = ['TINYBLOB', 'BLOB', 'LONGBLOB']; | ||
BaseTypes.DECIMAL.types.snowflake = ['NEWDECIMAL']; | ||
BaseTypes.UUID.types.snowflake = false; | ||
// Enum is not supported | ||
// https://docs.snowflake.com/en/sql-reference/data-types-unsupported.html | ||
BaseTypes.ENUM.types.snowflake = false; | ||
BaseTypes.REAL.types.snowflake = ['DOUBLE']; | ||
BaseTypes.DOUBLE.types.snowflake = ['DOUBLE']; | ||
BaseTypes.GEOMETRY.types.snowflake = ['GEOMETRY']; | ||
BaseTypes.JSON.types.snowflake = ['JSON']; | ||
|
||
class DATE extends BaseTypes.DATE { | ||
toSql() { | ||
return 'TIMESTAMP'; | ||
} | ||
_stringify(date, options) { | ||
date = this._applyTimezone(date, options); | ||
if (this._length) { | ||
return date.format('YYYY-MM-DD HH:mm:ss.SSS'); | ||
} | ||
return date.format('YYYY-MM-DD HH:mm:ss'); | ||
} | ||
static parse(value, options) { | ||
value = value.string(); | ||
if (value === null) { | ||
return value; | ||
} | ||
if (moment.tz.zone(options.timezone)) { | ||
value = moment.tz(value, options.timezone).toDate(); | ||
} | ||
else { | ||
value = new Date(`${value} ${options.timezone}`); | ||
} | ||
return value; | ||
} | ||
} | ||
|
||
class DATEONLY extends BaseTypes.DATEONLY { | ||
static parse(value) { | ||
return value.string(); | ||
} | ||
} | ||
class UUID extends BaseTypes.UUID { | ||
toSql() { | ||
// https://community.snowflake.com/s/question/0D50Z00009LH2fl/what-is-the-best-way-to-store-uuids | ||
return 'VARCHAR(36)'; | ||
} | ||
} | ||
|
||
class TEXT extends BaseTypes.TEXT { | ||
toSql() { | ||
return 'TEXT'; | ||
} | ||
} | ||
|
||
class BOOLEAN extends BaseTypes.BOOLEAN { | ||
toSql() { | ||
return 'BOOLEAN'; | ||
} | ||
} | ||
|
||
class JSONTYPE extends BaseTypes.JSON { | ||
_stringify(value, options) { | ||
return options.operation === 'where' && typeof value === 'string' ? value : JSON.stringify(value); | ||
} | ||
} | ||
|
||
return { | ||
TEXT, | ||
DATE, | ||
BOOLEAN, | ||
DATEONLY, | ||
UUID, | ||
JSON: JSONTYPE | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
'use strict'; | ||
|
||
const _ = require('lodash'); | ||
const AbstractDialect = require('../abstract'); | ||
const ConnectionManager = require('./connection-manager'); | ||
const Query = require('./query'); | ||
const QueryGenerator = require('./query-generator'); | ||
const DataTypes = require('../../data-types').snowflake; | ||
const { SnowflakeQueryInterface } = require('./query-interface'); | ||
|
||
class SnowflakeDialect extends AbstractDialect { | ||
constructor(sequelize) { | ||
super(); | ||
this.sequelize = sequelize; | ||
this.connectionManager = new ConnectionManager(this, sequelize); | ||
this.queryGenerator = new QueryGenerator({ | ||
_dialect: this, | ||
sequelize | ||
}); | ||
this.queryInterface = new SnowflakeQueryInterface(sequelize, this.queryGenerator); | ||
} | ||
} | ||
|
||
SnowflakeDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototype.supports), { | ||
'VALUES ()': true, | ||
'LIMIT ON UPDATE': true, | ||
lock: true, | ||
forShare: 'LOCK IN SHARE MODE', | ||
settingIsolationLevelDuringTransaction: false, | ||
inserts: { | ||
ignoreDuplicates: ' IGNORE', | ||
// disable for now, but could be enable by approach below | ||
// https://stackoverflow.com/questions/54828745/how-to-migrate-on-conflict-do-nothing-from-postgresql-to-snowflake | ||
updateOnDuplicate: false | ||
}, | ||
index: { | ||
collate: false, | ||
length: true, | ||
parser: true, | ||
type: true, | ||
using: 1 | ||
}, | ||
constraints: { | ||
dropConstraint: false, | ||
check: false | ||
}, | ||
indexViaAlter: true, | ||
indexHints: true, | ||
NUMERIC: true, | ||
// disable for now, need more work to enable the GEOGRAPHY MAPPING | ||
GEOMETRY: false, | ||
JSON: false, | ||
REGEXP: true, | ||
schemas: true | ||
}); | ||
|
||
SnowflakeDialect.prototype.defaultVersion = '5.7.0'; | ||
SnowflakeDialect.prototype.Query = Query; | ||
SnowflakeDialect.prototype.QueryGenerator = QueryGenerator; | ||
SnowflakeDialect.prototype.DataTypes = DataTypes; | ||
SnowflakeDialect.prototype.name = 'snowflake'; | ||
SnowflakeDialect.prototype.TICK_CHAR = '"'; | ||
SnowflakeDialect.prototype.TICK_CHAR_LEFT = SnowflakeDialect.prototype.TICK_CHAR; | ||
SnowflakeDialect.prototype.TICK_CHAR_RIGHT = SnowflakeDialect.prototype.TICK_CHAR; | ||
|
||
module.exports = SnowflakeDialect; |
Oops, something went wrong.