Permalink
Find file
82ff029 Oct 22, 2014
710 lines (590 sloc) 24.4 KB

Tutorial

Optiq-csv is a fully functional adapter for Calcite that reads text files in CSV (comma-separated values) format. It is remarkable that a couple of hundred lines of Java code are sufficient to provide full SQL query capability.

Optiq-csv also serves as a template for building adapters to other data formats. Even though there are not many lines of code, it covers several important concepts:

  • user-defined schema using SchemaFactory and Schema interfaces;
  • declaring schemas in a model JSON file;
  • declaring views in a model JSON file;
  • user-defined table using the Table interface;
  • determining the record type of a table;
  • a simple implementation of Table that enumerates all rows directly;
  • advanced implementation of Table that translates to relational operators using planner rules.

First queries

Without further ado, let's get started.

If you haven't already installed, follow the instructions in the README. It's just two commands: git clone followed mvn install.

Now let's connect to Calcite using sqlline, a SQL shell that is included in the optiq-csv github project.

$ ./sqlline
sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin

(If you are running Windows, the command is sqlline.bat.)

Execute a metadata query:

sqlline> !tables
+------------+--------------+-------------+---------------+----------+------+
| TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE |
+------------+--------------+-------------+---------------+----------+------+
| null       | SALES        | DEPTS       | TABLE         | null     | null |
| null       | SALES        | EMPS        | TABLE         | null     | null |
| null       | metadata     | COLUMNS     | SYSTEM_TABLE  | null     | null |
| null       | metadata     | TABLES      | SYSTEM_TABLE  | null     | null |
+------------+--------------+-------------+---------------+----------+------+

(JDBC experts, note: sqlline's !tables command is just executing DatabaseMetaData.getTables() behind the scenes. It has other commands to query JDBC metadata, such as !columns and !describe.)

As you can see there are 4 tables in the system: tables EMPS and DEPTS in the current SALES schema, and COLUMNS and TABLES in the system metadata schema. The system tables are always present in Calcite, but the other tables are provided by the specific implementation of the schema; in this case, the EMPS and DEPTS tables are based on the EMPS.csv and DEPTS.csv files in the target/test-classes directory.

Let's execute some queries on those tables, to show that Calcite is providing a full implementation of SQL. First, a table scan:

sqlline> SELECT * FROM emps;
+--------+--------+---------+---------+----------------+--------+-------+---+
| EMPNO  |  NAME  | DEPTNO  | GENDER  |      CITY      | EMPID  |  AGE  | S |
+--------+--------+---------+---------+----------------+--------+-------+---+
| 100    | Fred   | 10      |         |                | 30     | 25    | t |
| 110    | Eric   | 20      | M       | San Francisco  | 3      | 80    | n |
| 110    | John   | 40      | M       | Vancouver      | 2      | null  | f |
| 120    | Wilma  | 20      | F       |                | 1      | 5     | n |
| 130    | Alice  | 40      | F       | Vancouver      | 2      | null  | f |
+--------+--------+---------+---------+----------------+--------+-------+---+

Now JOIN and GROUP BY:

sqlline> SELECT d.name, COUNT(*)
. . . .> FROM emps AS e JOIN depts AS d ON e.deptno = d.deptno
. . . .> GROUP BY d.name;
+------------+---------+
|    NAME    | EXPR$1  |
+------------+---------+
| Sales      | 1       |
| Marketing  | 2       |
+------------+---------+

Last, the VALUES operator generates a single row, and is a convenient way to test expressions and SQL built-in functions:

sqlline> VALUES CHAR_LENGTH('Hello, ' || 'world!');
+---------+
| EXPR$0  |
+---------+
| 13      |
+---------+

Calcite has many other SQL features. We don't have time to cover them here. Write some more queries to experiment.

Schema discovery

Now, how did Calcite find these tables? Remember, core Calcite does not know anything about CSV files. (As a "database without a storage layer", Calcite doesn't know about any file formats.) Calcite knows about those tables because we told it to run code in the optiq-csv project.

There are a couple of steps in that chain. First, we define a schema based on a schema factory class in a model file. Then the schema factory creates a schema, and the schema creates several tables, each of which knows how to get data by scanning a CSV file. Last, after Calcite has parsed the query and planned it to use those tables, Calcite invokes the tables to read the data as the query is being executed. Now let's look at those steps in more detail.

On the JDBC connect string we gave the path of a model in JSON format. Here is the model:

{
  version: '1.0',
  defaultSchema: 'SALES',
  schemas: [
    {
      name: 'SALES',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.csv.CsvSchemaFactory',
      operand: {
        directory: 'target/test-classes/sales'
      }
    }
  ]
}

The model defines a single schema called 'SALES'. The schema is powered by a plugin class, net.hydromatic.optiq.impl.csv.CsvSchemaFactory, which is part of the optiq-csv project and implements the Calcite interface SchemaFactory. Its create method instantiates a schema, passing in the directory argument from the model file:

public Schema create(SchemaPlus parentSchema, String name,
    Map<String, Object> operand) {
  String directory = (String) operand.get("directory");
  Boolean smart = (Boolean) operand.get("smart");
  return new CsvSchema(
      parentSchema,
      name,
      new File(directory),
      smart != null && smart);
}

Driven by the model, the schema factory instantiates a single schema called 'SALES'. The schema is an instance of net.hydromatic.optiq.impl.csv.CsvSchema and implements the Calcite interface Schema.

A schema's job is to produce a list of tables. (It can also list sub-schemas and table-functions, but these are advanced features and optiq-csv does not support them.) The tables implement Calcite's Table interface. CsvSchema produces tables that are instances of CsvTable.

Here is the relevant code from CsvSchema, overriding the getTableMap() method in the AbstractSchema base class.

protected Map<String, Table> getTableMap() {
  final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
  File[] files = directoryFile.listFiles(
      new FilenameFilter() {
        public boolean accept(File dir, String name) {
          return name.endsWith(".csv");
        }
      });
  if (files == null) {
    System.out.println("directory " + directoryFile + " not found");
    files = new File[0];
  }
  for (File file : files) {
    String tableName = file.getName();
    if (tableName.endsWith(".csv")) {
      tableName = tableName.substring(
          0, tableName.length() - ".csv".length());
    }
    final CsvTable table;
    if (smart) {
      table = new CsvSmartTable(file, null);
    } else {
      table = new CsvTable(file, null);
    }
    builder.put(tableName, table);
  }
  return builder.build();
}

The schema scans the directory and finds all files whose name ends with ".csv" and creates tables for them. In this case, the directory is target/test-classes/sales and contains files EMPS.csv and DEPTS.csv, which these become the tables EMPS and DEPTS.

Tables and views in schemas

Note how we did not need to define any tables in the model; the schema generated the tables automatically.

You can define extra tables, beyond those that are created automatically, using the tables property of a schema.

Let's see how to create an important and useful type of table, namely a view.

A view looks like a table when you are writing a query, but it doesn't store data. It derives its result by executing a query. The view is expanded while the query is being planned, so the query planner can often perform optimizations like removing expressions from the SELECT clause that are not used in the final result.

Here is a schema that defines a view:

{
  version: '1.0',
  defaultSchema: 'SALES',
  schemas: [
    {
      name: 'SALES',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.csv.CsvSchemaFactory',
      operand: {
        directory: 'target/test-classes/sales'
      },
      tables: [
        {
          name: 'FEMALE_EMPS',
          type: 'view',
          sql: 'SELECT * FROM emps WHERE gender = \'F\''
        }
      ]
    }
  ]
}

The line type: 'view' tags FEMALE_EMPS as a view, as opposed to a regular table or a custom table. Note that single-quotes within the view definition are escaped using a back-slash, in the normal way for JSON.

JSON doesn't make it easy to author long strings, so Calcite supports an alternative syntax. If your view has a long SQL statement, you can instead supply a list of lines rather than a single string:

        {
          name: 'FEMALE_EMPS',
          type: 'view',
          sql: [
            'SELECT * FROM emps',
            'WHERE gender = \'F\''
          ]
        }

Now we have defined a view, we can use it in queries just as if it were a table:

sqlline> SELECT e.name, d.name FROM female_emps AS e JOIN depts AS d on e.deptno = d.deptno;
+--------+------------+
|  NAME  |    NAME    |
+--------+------------+
| Wilma  | Marketing  |
+--------+------------+

Custom tables

Custom tables are tables whose implementation is driven by user-defined code. They don't need to live in a custom schema.

There is an example in model-with-custom-table.json:

{
  version: '1.0',
  defaultSchema: 'CUSTOM_TABLE',
  schemas: [
    {
      name: 'CUSTOM_TABLE',
      tables: [
        {
          name: 'EMPS',
          type: 'custom',
          factory: 'net.hydromatic.optiq.impl.csv.CsvTableFactory',
          operand: {
            file: 'target/test-classes/sales/EMPS.csv',
            smart: false
          }
        }
      ]
    }
  ]
}

We can query the table in the usual way:

sqlline> !connect jdbc:calcite:model=target/test-classes/model-with-custom-table.json admin admin
sqlline> SELECT empno, name FROM custom_table.emps;
+--------+--------+
| EMPNO  |  NAME  |
+--------+--------+
| 100    | Fred   |
| 110    | Eric   |
| 110    | John   |
| 120    | Wilma  |
| 130    | Alice  |
+--------+--------+

The schema is a regular one, and contains a custom table powered by net.hydromatic.optiq.impl.csv.CsvTableFactory, which implements the Calcite interface TableFactory. Its create method instantiates a table, passing in the file argument from the model file:

public CsvTable create(SchemaPlus schema, String name,
    Map<String, Object> map, RelDataType rowType) {
  String fileName = (String) map.get("file");
  Boolean smart = (Boolean) map.get("smart");
  final File file = new File(fileName);
  final RelProtoDataType protoRowType =
      rowType != null ? RelDataTypeImpl.proto(rowType) : null;
  if (smart != null && smart) {
    return new CsvSmartTable(file, protoRowType);
  } else {
    return new CsvTable(file, protoRowType);
  }
}

Implementing a custom table is often a simpler alternative to implementing a custom schema. Both approaches might end up creating a similar implementation of the Table interface, but for the custom table you don't need to implement metadata discovery. (CsvTableFactory creates a CsvTable, just as CsvSchema does, but the table implementation does not scan the filesystem for .csv files.)

Custom tables require more work for the author of the model (the author needs to specify each table and its file explicitly) but also give the author more control (say, providing different parameters for each table).

Comments in models

Models can include comments using /* ... */ and // syntax:

{
  version: '1.0',
  /* Multi-line
     comment. */
  defaultSchema: 'CUSTOM_TABLE',
  // Single-line comment.
  schemas: [
    ..
  ]
}

(Comments are not standard JSON, but are a harmless extension.)

Optimizing queries using planner rules

The table implementations we have seen so far are fine as long as the tables don't contain a great deal of data. But if your customer table has, say, a hundred columns and a million rows, you would rather that the system did not retrieve all of the data for every query. You would like Calcite to negotiate with the adapter and find a more efficient way of accessing the data.

This negotiation is a simple form of query optimization. Calcite supports query optimization by adding planner rules. Planner rules operate by looking for patterns in the query parse tree (for instance a project on top of a certain kind of table), and

Planner rules are also extensible, like schemas and tables. So, if you have a data store that you want to access via SQL, you first define a custom table or schema, and then you define some rules to make the access efficient.

To see this in action, let's use a planner rule to access a subset of columns from a CSV file. Let's run the same query against two very similar schemas:

sqlline> !connect jdbc:calcite:model=target/test-classes/model.json admin admin
sqlline> explain plan for select name from emps;
+-----------------------------------------------------+
| PLAN                                                |
+-----------------------------------------------------+
| EnumerableCalcRel(expr#0..9=[{inputs}], NAME=[$t1]) |
|   EnumerableTableAccessRel(table=[[SALES, EMPS]])   |
+-----------------------------------------------------+
sqlline> !connect jdbc:calcite:model=target/test-classes/smart.json admin admin
sqlline> explain plan for select name from emps;
+-----------------------------------------------------+
| PLAN                                                |
+-----------------------------------------------------+
| EnumerableCalcRel(expr#0..9=[{inputs}], NAME=[$t1]) |
|   CsvTableScan(table=[[SALES, EMPS]])               |
+-----------------------------------------------------+

What causes the difference in plan? Let's follow the trail of evidence. In the smart.json model file, there is just one extra line:

smart: true

This causes a CsvSchema to be created with smart = true, and its createTable method creates instances of CsvSmartTable rather than a CsvTable.

CsvSmartTable overrides the TranslatableTable.toRel() method to create CsvTableScan. Table scans are the leaves of a query operator tree. The usual implementation is EnumerableTableAccessRel, but we have created a distinctive sub-type that will cause rules to fire.

Here is the rule in its entirety:

public class CsvPushProjectOntoTableRule extends RelOptRule {
  public static final CsvPushProjectOntoTableRule INSTANCE =
      new CsvPushProjectOntoTableRule();

  private CsvPushProjectOntoTableRule() {
    super(
        operand(ProjectRel.class,
            operand(CsvTableScan.class, none())),
        "CsvPushProjectOntoTableRule");
  }

  @Override
  public void onMatch(RelOptRuleCall call) {
    final ProjectRel project = call.rel(0);
    final CsvTableScan scan = call.rel(1);
    int[] fields = getProjectFields(project.getProjects());
    if (fields == null) {
      // Project contains expressions more complex than just field references.
      return;
    }
    call.transformTo(
        new CsvTableScan(
            scan.getCluster(),
            scan.getTable(),
            scan.csvTable,
            fields));
  }

  private int[] getProjectFields(List<RexNode> exps) {
    final int[] fields = new int[exps.size()];
    for (int i = 0; i < exps.size(); i++) {
      final RexNode exp = exps.get(i);
      if (exp instanceof RexInputRef) {
        fields[i] = ((RexInputRef) exp).getIndex();
      } else {
        return null; // not a simple projection
      }
    }
    return fields;
  }
}

The constructor declares the pattern of relational expressions that will cause the rule to fire.

The onMatch method generates a new relational expression and calls RelOptRuleCall.transformTo() to indicate that the rule has fired successfully.

The query optimization process

There's a lot to say about how clever Calcite's query planner is, but we won't say it here. The cleverness is designed to take the burden off you, the writer of planner rules.

First, Calcite doesn't fire rules in a prescribed order. The query optimization process follows many branches of a branching tree, just like a chess playing program examines many possible sequences of moves. If rules A and B both match a given section of the query operator tree, then Calcite can fire both.

Second, Calcite uses cost in choosing between plans, but the cost model doesn't prevent rules from firing which may seem to be more expensive in the short term.

Many optimizers have a linear optimization scheme. Faced with a choice between rule A and rule B, as above, such an optimizer needs to choose immediately. It might have a policy such as "apply rule A to the whole tree, then apply rule B to the whole tree", or apply a cost-based policy, applying the rule that produces the cheaper result.

Calcite doesn't require such compromises. This makes it simple to combine various sets of rules. If, say you want to combine rules to recognize materialized views with rules to read from CSV and JDBC source systems, you just give Calcite the set of all rules and tell it to go at it.

Calcite does use a cost model. The cost model decides which plan to ultimately use, and sometimes to prune the search tree to prevent the search space from exploding, but it never forces you to choose between rule A and rule B. This is important, because it avoids falling into local minima in the search space that are not actually optimal.

Also (you guessed it) the cost model is pluggable, as are the table and query operator statistics it is based upon. But that can be a subject for later.

JDBC adapter

The JDBC adapter maps a schema in a JDBC data source as a Calcite schema.

For example, this schema reads from a MySQL "foodmart" database:

{
  version: '1.0',
  defaultSchema: 'FOODMART',
  schemas: [
    {
      name: 'FOODMART',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.jdbc.JdbcSchema$Factory',
      operand: {
        jdbcDriver: 'com.mysql.jdbc.Driver',
        jdbcUrl: 'jdbc:mysql://localhost/foodmart',
        jdbcUser: 'foodmart',
        jdbcPassword: 'foodmart'
      }
    }
  ]
}

(The FoodMart database will be familiar to those of you who have used the Mondrian OLAP engine, because it is Mondrian's main test data set. To load the data set, follow Mondrian's installation instructions.)

Current limitations: The JDBC adapter currently only pushes down table scan operations; all other processing (filtering, joins, aggregations and so forth) occurs within Calcite. Our goal is to push down as much processing as possible to the source system, translating syntax, data types and built-in functions as we go. If a Calcite query is based on tables from a single JDBC database, in principle the whole query should go to that database. If tables are from multiple JDBC sources, or a mixture of JDBC and non-JDBC, Calcite will use the most efficient distributed query approach that it can.

The cloning JDBC adapter

The cloning JDBC adapter creates a hybrid database. The data is sourced from a JDBC database but is read into in-memory tables the first time each table is accessed. Calcite evaluates queries based on those in-memory tables, effectively a cache of the database.

For example, the following model reads tables from a MySQL "foodmart" database:

{
  version: '1.0',
  defaultSchema: 'FOODMART_CLONE',
  schemas: [
    {
      name: 'FOODMART_CLONE',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.clone.CloneSchema$Factory',
      operand: {
        jdbcDriver: 'com.mysql.jdbc.Driver',
        jdbcUrl: 'jdbc:mysql://localhost/foodmart',
        jdbcUser: 'foodmart',
        jdbcPassword: 'foodmart'
      }
    }
  ]
}

Another technique is to build a clone schema on top of an existing schema. You use the source property to reference a schema defined earlier in the model, like this:

{
  version: '1.0',
  defaultSchema: 'FOODMART_CLONE',
  schemas: [
    {
      name: 'FOODMART',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.jdbc.JdbcSchema$Factory',
      operand: {
        jdbcDriver: 'com.mysql.jdbc.Driver',
        jdbcUrl: 'jdbc:mysql://localhost/foodmart',
        jdbcUser: 'foodmart',
        jdbcPassword: 'foodmart'
      }
    },
    {
      name: 'FOODMART_CLONE',
      type: 'custom',
      factory: 'net.hydromatic.optiq.impl.clone.CloneSchema$Factory',
      operand: {
        source: 'FOODMART'
      }
    }
  ]
}

You can use this approach to create a clone schema on any type of schema, not just JDBC.

The cloning adapter isn't the be-all and end-all. We plan to develop more sophisticated caching strategies, and a more complete and efficient implementation of in-memory tables, but for now the cloning JDBC adapter shows what is possible and allows us to try out our initial implementations.

Further topics

Defining a custom schema

(To be written.)

Modifying data

How to enable DML operations (INSERT, UPDATE and DELETE) on your schema.

(To be written.)

Calling conventions

(To be written.)

Statistics and cost

(To be written.)

Defining and using user-defined functions

(To be written.)

Defining tables in a schema

(To be written.)

Defining custom tables

(To be written.)

Built-in SQL implementation

How does Calcite implement SQL, if an adapter does not implement all of the core relational operators?

(To be written.)

Table functions

(To be written.)

Further resources