Skip to content

Commit

Permalink
feat: External rollup implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed May 11, 2019
1 parent 32e3d46 commit d22a809
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 91 deletions.
4 changes: 3 additions & 1 deletion examples/hn-insights/schema/Events.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ cube(`Events`, {
partitionGranularity: `day`,
refreshKey: {
sql: `select current_timestamp`
}
},
external: true
},
leaderBoard: {
type: `rollup`,
Expand Down Expand Up @@ -319,6 +320,7 @@ cube(`Events`, {
refreshKey: {
sql: `select current_timestamp`
},
external: true
}
}
});
12 changes: 12 additions & 0 deletions packages/cubejs-mysql-driver/driver/MySqlDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ const genericPool = require('generic-pool');
const { promisify } = require('util');
const BaseDriver = require('@cubejs-backend/query-orchestrator/driver/BaseDriver');

const GenericTypeToMySql = {
'string': 'varchar(255)'
};

class MySqlDriver extends BaseDriver {
constructor(config) {
super();
Expand Down Expand Up @@ -114,6 +118,14 @@ class MySqlDriver extends BaseDriver {
informationSchemaQuery() {
return `${super.informationSchemaQuery()} AND columns.table_schema = '${this.config.database}'`
}

quoteIdentifier(identifier) {
return `\`${identifier}\``;
}

fromGenericType(columnType) {
return GenericTypeToMySql[columnType] || super.fromGenericType(columnType);
}
}

module.exports = MySqlDriver;
71 changes: 70 additions & 1 deletion packages/cubejs-query-orchestrator/driver/BaseDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ const sortByKeys = (unordered) => {
return ordered;
};

const DbTypeToGenericType = {
'timestamp without time zone': 'timestamp',
'integer': 'int',
'character varying': 'text',
'varchar': 'text',
'text': 'text',
'string': 'text',
'boolean': 'boolean',
'bigint': 'bigint',
'time': 'string',
'datetime': 'timestamp',
'date': 'date',
'double precision': 'decimal'
};

class BaseDriver {
informationSchemaQuery() {
return `
Expand Down Expand Up @@ -77,12 +92,66 @@ class BaseDriver {
}

param(/* paramIndex */) {
return '?'
return '?';
}

testConnectionTimeout() {
return 10000;
}

async downloadTable(table) {
return { rows: await this.query(`SELECT * FROM ${table}`) };
}

async uploadTable(table, columns, tableData) {
if (!tableData.rows) {
throw new Error(`${this.constructor} driver supports only rows upload`);
}
await this.createTable(table, columns);
for (let i = 0; i < tableData.rows.length; i++) {
await this.query(
`INSERT INTO ${table}
(${columns.map(c => this.quoteIdentifier(c.name)).join(', ')})
VALUES (${columns.map((c, paramIndex) => this.param(paramIndex)).join(', ')})`,
columns.map(c => tableData.rows[i][c.name])
);
}
}

async tableColumnTypes(table) {
const [schema, name] = table.split('.');
const columns = await this.query(
`SELECT columns.column_name,
columns.table_name,
columns.table_schema,
columns.data_type
FROM information_schema.columns
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`,
[name, schema]
);
return columns.map(c => ({ name: c.column_name, type: this.toGenericType(c.data_type) }));
}

createTable(quotedTableName, columns) {
return this.query(this.createTableSql(quotedTableName, columns), []);
}

createTableSql(quotedTableName, columns) {
columns = columns.map(c => `${this.quoteIdentifier(c.name)} ${this.fromGenericType(c.type)}`);
return `CREATE TABLE ${quotedTableName} (${columns.join(', ')})`;
}

toGenericType(columnType) {
return DbTypeToGenericType[columnType] || columnType;
}

fromGenericType(columnType) {
return columnType;
}

quoteIdentifier(identifier) {
return `"${identifier}"`;
}
}

module.exports = BaseDriver;

0 comments on commit d22a809

Please sign in to comment.