Skip to content

Commit

Permalink
[SPARK-42398][SQL] Refine default column value DS v2 interface
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The current default value DS V2 API is a bit inconsistent. The `createTable` API only takes `StructType`, so implementations must know the special metadata key of the default value to access it. The `TableChange` API has the default value as an individual field.

This API adds a new `Column` interface, which holds both current default (as a SQL string) and exist default (as a v2 literal). `createTable` API now takes `Column`. This avoids the need of special metadata key and is also more extensible when adding more special cols like generated cols. This is also type-safe and makes sure the exist default is literal. The implementation is free to decide how to encode and store default values. Note: backward compatibility is taken care of.

### Why are the changes needed?

better DS v2 API for default value

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes apache#40049 from cloud-fan/table2.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan and cloud-fan committed Feb 20, 2023
1 parent 5fc44da commit 70a098c
Show file tree
Hide file tree
Showing 43 changed files with 670 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession {

protected val inputFilePath: Path = baseResourcePath.resolve("queries")
protected val goldenFilePath: Path = baseResourcePath.resolve("explain-results")
private val emptyProps: util.Map[String, String] = util.Collections.emptyMap()

private val analyzer = {
val inMemoryCatalog = new InMemoryCatalog
inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty())
inMemoryCatalog.createNamespace(Array("tempdb"), util.Collections.emptyMap())
inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
inMemoryCatalog.createTable(
Identifier.of(Array("tempdb"), "myTable"),
new StructType().add("id", "long"),
Array.empty,
util.Collections.emptyMap())
Array.empty[Transform],
emptyProps)

val catalogManager = new CatalogManager(
inMemoryCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, schema, Array.empty, emptyProps)
catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Map;
import javax.annotation.Nullable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.connector.ColumnImpl;
import org.apache.spark.sql.types.DataType;

/**
* An interface representing a column of a {@link Table}. It defines basic properties of a column,
* such as name and data type, as well as some advanced ones like default column value.
* <p>
* Data Sources do not need to implement it. They should consume it in APIs like
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in
* {@link Table#columns()} by calling the static {@code create} functions of this interface to
* create it.
*/
@Evolving
public interface Column {

static Column create(String name, DataType dataType) {
return create(name, dataType, true);
}

static Column create(String name, DataType dataType, boolean nullable) {
return create(name, dataType, nullable, null, null, null);
}

static Column create(
String name,
DataType dataType,
boolean nullable,
String comment,
ColumnDefaultValue defaultValue,
String metadataInJSON) {
return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON);
}

/**
* Returns the name of this table column.
*/
String name();

/**
* Returns the data type of this table column.
*/
DataType dataType();

/**
* Returns true if this column may produce null values.
*/
boolean nullable();

/**
* Returns the comment of this table column. Null means no comment.
*/
@Nullable
String comment();

/**
* Returns the default value of this table column. Null means no default value.
*/
@Nullable
ColumnDefaultValue defaultValue();

/**
* Returns the column metadata in JSON format.
*/
@Nullable
String metadataInJSON();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Objects;
import javax.annotation.Nonnull;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Literal;

/**
* A class representing the default value of a column. It contains both the SQL string and literal
* value of the user-specified default value expression. The SQL string should be re-evaluated for
* each table writing command, which may produce different values if the default value expression is
* something like {@code CURRENT_DATE()}. The literal value is used to back-fill existing data if
* new columns with default value are added. Note: the back-fill can be lazy. The data sources can
* remember the column default value and let the reader fill the column value when reading existing
* data that do not have these new columns.
*/
@Evolving
public class ColumnDefaultValue {
private String sql;
private Literal<?> value;

public ColumnDefaultValue(String sql, Literal<?> value) {
this.sql = sql;
this.value = value;
}

/**
* Returns the SQL string (Spark SQL dialect) of the default value expression. This is the
* original string contents of the SQL expression specified at the time the column was created in
* a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for
* "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string literal "40 + 2" (without
* quotation marks).
*/
@Nonnull
public String getSql() {
return sql;
}

/**
* Returns the default value literal. This is the literal value corresponding to
* {@link #getSql()}. For the example in the doc of {@link #getSql()}, this returns a literal
* integer with a value of 42.
*/
@Nonnull
public Literal<?> getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ColumnDefaultValue)) return false;
ColumnDefaultValue that = (ColumnDefaultValue) o;
return sql.equals(that.sql) && value.equals(that.value);
}

@Override
public int hashCode() {
return Objects.hash(sql, value);
}

@Override
public String toString() {
return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
@Evolving
public interface StagingTableCatalog extends TableCatalog {

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
* This is deprecated. Please override
* {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
*/
@Deprecated
StagedTable stageCreate(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
Expand All @@ -64,19 +77,34 @@ public interface StagingTableCatalog extends TableCatalog {
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreate(
default StagedTable stageCreate(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
*/
StagedTable stageReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
Expand All @@ -97,19 +125,35 @@ StagedTable stageCreate(
* operation.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws NoSuchTableException If the table does not exist
*/
StagedTable stageReplace(
default StagedTable stageReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
return stageReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} instead.
*/
StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
Map<String, String> properties) throws NoSuchNamespaceException;

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
Expand All @@ -129,16 +173,19 @@ StagedTable stageReplace(
* the staged changes are committed but the table doesn't exist at commit time.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreateOrReplace(
default StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException;
Map<String, String> properties) throws NoSuchNamespaceException {
return stageCreateOrReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ public interface Table {
/**
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
* empty schema can be returned here.
* <p>
* This is deprecated. Please override {@link #columns} instead.
*/
@Deprecated
StructType schema();

/**
* Returns the columns of this table. If the table is not readable and doesn't have a schema, an
* empty array can be returned here.
*/
default Column[] columns() {
return CatalogV2Util.structTypeToV2Columns(schema());
}

/**
* Returns the physical partitioning of this table.
*/
Expand Down
Loading

0 comments on commit 70a098c

Please sign in to comment.