Skip to content

Commit

Permalink
rdar://111235765 (Create Table and Alter Table with Distribution Port…
Browse files Browse the repository at this point in the history
… to 3.4) (apache#1808)

* rdar://111235765 ALTER TABLE ... WRITE command (apache#1231) (apache#1466)

This PR cherry-picks the command to set write distribution and ordering in a table.

These changes are needed to allow customers to control the required distribution and ordering in Iceberg.

It adds a new command that will be only supported by the Iceberg data source.

This PR comes with tests.

* rdar://84102488 Support ordering and distribution during table creation (apache#1485)

This PR cherry-picks the ordering and distribution during table creation to 3.2. The same syntax is supported in 3.0 and 3.1.

These changes are needed to define a sort key and distribution in Iceberg tables.

Yes but the changes won't affect anyone except Iceberg users.

This PR comes with tests.

Co-authored-by: Russell Spitzer <russell.spitzer@gmail.com>
Co-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
  • Loading branch information
3 people authored and GitHub Enterprise committed Jul 6, 2023
1 parent a60962a commit a8b91d5
Show file tree
Hide file tree
Showing 16 changed files with 1,017 additions and 49 deletions.
5 changes: 5 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ Below is a list of all the keywords in Spark SQL.
|DIRECTORY|non-reserved|non-reserved|non-reserved|
|DISTINCT|reserved|non-reserved|reserved|
|DISTRIBUTE|non-reserved|non-reserved|non-reserved|
|DISTRIBUTED|non-reserved|non-reserved|non-reserved|
|DIV|non-reserved|non-reserved|not a keyword|
|DROP|non-reserved|non-reserved|reserved|
|ELSE|reserved|non-reserved|reserved|
Expand Down Expand Up @@ -493,6 +494,7 @@ Below is a list of all the keywords in Spark SQL.
|LIST|non-reserved|non-reserved|non-reserved|
|LOAD|non-reserved|non-reserved|non-reserved|
|LOCAL|non-reserved|non-reserved|reserved|
|LOCALLY|non-reserved|non-reserved|non-reserved|
|LOCATION|non-reserved|non-reserved|non-reserved|
|LOCK|non-reserved|non-reserved|non-reserved|
|LOCKS|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -528,6 +530,7 @@ Below is a list of all the keywords in Spark SQL.
|OPTIONS|non-reserved|non-reserved|non-reserved|
|OR|reserved|non-reserved|reserved|
|ORDER|reserved|non-reserved|reserved|
|ORDERED|non-reserved|non-reserved|non-reserved|
|OUT|non-reserved|non-reserved|reserved|
|OUTER|reserved|non-reserved|reserved|
|OUTPUTFORMAT|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -634,6 +637,7 @@ Below is a list of all the keywords in Spark SQL.
|UNIQUE|reserved|non-reserved|reserved|
|UNKNOWN|reserved|non-reserved|reserved|
|UNLOCK|non-reserved|non-reserved|non-reserved|
|UNORDERED|non-reserved|non-reserved|non-reserved|
|UNPIVOT|non-reserved|non-reserved|non-reserved|
|UNSET|non-reserved|non-reserved|non-reserved|
|UPDATE|non-reserved|non-reserved|reserved|
Expand All @@ -651,6 +655,7 @@ Below is a list of all the keywords in Spark SQL.
|WINDOW|non-reserved|non-reserved|reserved|
|WITH|reserved|non-reserved|reserved|
|WITHIN|reserved|non-reserved|reserved|
|WRITE|non-reserved|non-reserved|non-reserved|
|YEAR|non-reserved|non-reserved|non-reserved|
|YEARS|non-reserved|non-reserved|non-reserved|
|ZONE|non-reserved|non-reserved|non-reserved|
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ DIRECTORIES: 'DIRECTORIES';
DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DISTRIBUTED: 'DISTRIBUTED';
DIV: 'DIV';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -229,6 +230,7 @@ LINES: 'LINES';
LIST: 'LIST';
LOAD: 'LOAD';
LOCAL: 'LOCAL';
LOCALLY: 'LOCALLY';
LOCATION: 'LOCATION';
LOCK: 'LOCK';
LOCKS: 'LOCKS';
Expand Down Expand Up @@ -263,6 +265,7 @@ OPTION: 'OPTION';
OPTIONS: 'OPTIONS';
OR: 'OR';
ORDER: 'ORDER';
ORDERED: 'ORDERED';
OUT: 'OUT';
OUTER: 'OUTER';
OUTPUTFORMAT: 'OUTPUTFORMAT';
Expand Down Expand Up @@ -368,6 +371,7 @@ UNION: 'UNION';
UNIQUE: 'UNIQUE';
UNKNOWN: 'UNKNOWN';
UNLOCK: 'UNLOCK';
UNORDERED: 'UNORDERED';
UNPIVOT: 'UNPIVOT';
UNSET: 'UNSET';
UPDATE: 'UPDATE';
Expand All @@ -385,6 +389,7 @@ WHERE: 'WHERE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
WRITE: 'WRITE';
YEAR: 'YEAR';
YEARS: 'YEARS';
ZONE: 'ZONE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ statement
| ALTER TABLE multipartIdentifier
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -380,6 +381,7 @@ createTableClauses
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
bucketSpec |
writeSpec |
rowFormat |
createFileFormat |
locationSpec |
Expand Down Expand Up @@ -1174,11 +1176,31 @@ comment
: stringLit
| NULL
;
writeSpec
: (writeDistributionSpec | writeOrderingSpec)+
;

writeDistributionSpec
: DISTRIBUTED BY PARTITION
;

version
: INTEGER_VALUE
| stringLit
;
writeOrderingSpec
: LOCALLY? ORDERED BY writeOrder
| UNORDERED
;

writeOrder
: fields+=writeOrderField (',' fields+=writeOrderField)*
| '(' fields+=writeOrderField (',' fields+=writeOrderField)* ')'
;

writeOrderField
: transform direction=(ASC | DESC)? (NULLS nullOrder=(FIRST | LAST))?
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
// - Reserved keywords:
Expand Down Expand Up @@ -1246,6 +1268,7 @@ ansiNonReserved
| DIRECTORIES
| DIRECTORY
| DISTRIBUTE
| DISTRIBUTED
| DIV
| DROP
| ESCAPED
Expand Down Expand Up @@ -1291,6 +1314,7 @@ ansiNonReserved
| LIST
| LOAD
| LOCAL
| LOCALLY
| LOCATION
| LOCK
| LOCKS
Expand All @@ -1317,6 +1341,7 @@ ansiNonReserved
| OF
| OPTION
| OPTIONS
| ORDERED
| OUT
| OUTPUTFORMAT
| OVER
Expand Down Expand Up @@ -1404,6 +1429,7 @@ ansiNonReserved
| UNBOUNDED
| UNCACHE
| UNLOCK
| UNORDERED
| UNPIVOT
| UNSET
| UPDATE
Expand All @@ -1415,6 +1441,7 @@ ansiNonReserved
| WEEK
| WEEKS
| WINDOW
| WRITE
| YEAR
| YEARS
| ZONE
Expand Down Expand Up @@ -1523,6 +1550,7 @@ nonReserved
| DIRECTORY
| DISTINCT
| DISTRIBUTE
| DISTRIBUTED
| DIV
| DROP
| ELSE
Expand Down Expand Up @@ -1584,6 +1612,7 @@ nonReserved
| LIST
| LOAD
| LOCAL
| LOCALLY
| LOCATION
| LOCK
| LOCKS
Expand Down Expand Up @@ -1616,6 +1645,7 @@ nonReserved
| OPTIONS
| OR
| ORDER
| ORDERED
| OUT
| OUTER
| OUTPUTFORMAT
Expand Down Expand Up @@ -1718,6 +1748,7 @@ nonReserved
| UNKNOWN
| UNLOCK
| UNPIVOT
| UNORDERED
| UNSET
| UPDATE
| USE
Expand All @@ -1733,6 +1764,7 @@ nonReserved
| WINDOW
| WITH
| WITHIN
| WRITE
| YEAR
| YEARS
| ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.util.Map;

import com.google.common.base.Preconditions;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -106,6 +109,30 @@ StagedTable stageReplace(
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;

/**
* Stage the creation of a table with a specific distribution and ordering.
*/
default StagedTable stageCreate(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties,
String distributionMode,
SortOrder[] ordering) throws TableAlreadyExistsException, NoSuchNamespaceException {

Preconditions.checkArgument(
distributionMode.equals("none"),
"%s does not support tables with a specific distribution",
this.getClass().getName());

Preconditions.checkArgument(
ordering.length == 0,
"%s does not support tables with a specific ordering",
this.getClass().getName());

return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
Expand Down Expand Up @@ -155,6 +182,30 @@ StagedTable stageCreateOrReplace(
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException;

/**
* Stage the replacement of a table with a specific distribution and ordering.
*/
default StagedTable stageReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties,
String distributionMode,
SortOrder[] ordering) throws NoSuchNamespaceException, NoSuchTableException {

Preconditions.checkArgument(
distributionMode.equals("none"),
"%s does not support tables with a specific distribution",
this.getClass().getName());

Preconditions.checkArgument(
ordering.length == 0,
"%s does not support tables with a specific ordering",
this.getClass().getName());

return stageReplace(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
Expand Down Expand Up @@ -188,4 +239,28 @@ default StagedTable stageCreateOrReplace(
return stageCreateOrReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation or replacement of a table with a specific distribution and ordering.
*/
default StagedTable stageCreateOrReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties,
String distributionMode,
SortOrder[] ordering) throws NoSuchNamespaceException {

Preconditions.checkArgument(
distributionMode.equals("none"),
"%s does not support tables with a specific distribution",
this.getClass().getName());

Preconditions.checkArgument(
ordering.length == 0,
"%s does not support tables with a specific ordering",
this.getClass().getName());

return stageCreateOrReplace(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand All @@ -29,6 +30,8 @@
import java.util.Map;
import java.util.Set;

import com.google.common.base.Preconditions;

/**
* Catalog methods for working with Tables.
* <p>
Expand Down Expand Up @@ -199,6 +202,30 @@ default Table createTable(
return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Create a table with a specific distribution and ordering.
*/
default Table createTable(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties,
String distributionMode,
SortOrder[] ordering) throws TableAlreadyExistsException, NoSuchNamespaceException {

Preconditions.checkArgument(
distributionMode.equals("none"),
"%s does not support tables with a specific distribution",
this.getClass().getName());

Preconditions.checkArgument(
ordering.length == 0,
"%s does not support tables with a specific ordering",
this.getClass().getName());

return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Apply a set of {@link TableChange changes} to a table in the catalog.
* <p>
Expand Down
Loading

0 comments on commit a8b91d5

Please sign in to comment.