Skip to content

Commit

Permalink
[Native] Add CTE support
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Jun 19, 2024
1 parent 1ab45c2 commit 2f3f268
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,11 @@ connector::hive::LocationHandle::TableType toTableType(
return connector::hive::LocationHandle::TableType::kNew;
case protocol::TableType::EXISTING:
return connector::hive::LocationHandle::TableType::kExisting;
// Temporary tables are written and read by the SPI in a single pipeline.
// So they can be treated as New. They do not require Append or Overwrite
// semantics as applicable for regular tables.
case protocol::TableType::TEMPORARY:
return connector::hive::LocationHandle::TableType::kNew;
default:
VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.hive.TestCteExecution;
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Test;

public abstract class AbstractTestNativeCteExecution
extends TestCteExecution
{
@Override
protected void createTables()
{
QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner();
// This avoids casting date fields to VARCHAR (required for DWRF). The change
// is needed because several CTE tests use EXTRACT functions from date columns.
NativeQueryRunnerUtils.createAllTables(queryRunner, false);
}

@Override
@Test(enabled = false)
// Char type is not supported in Prestissimo.
public void testPersistentCteWithChar() {}

@Override
// Unsupported nested encoding in Velox Parquet
// Error : VeloxRuntimeError: vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding()
// An unsupported nested encoding was found. Operator: TableWrite(1)
@Test(enabled = false)
public void testPersistentCteWithStructTypes() {}

@Override
// Unsupported nested encoding in Velox Parquet
// Error : VeloxRuntimeError: vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding()
// An unsupported nested encoding was found. Operator: TableWrite(1)
@Test(enabled = false)
public void testPersistentCteWithMap() {}

@Override
// Unsupported nested encoding in Velox Parquet
// Error : VeloxRuntimeError: vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding()
// An unsupported nested encoding was found. Operator: TableWrite(1)
@Test(enabled = false)
public void testPersistentCteWithArrayWhereInnerTypeIsNotSupported() {}

@Override
// Unsupported nested encoding in Velox Parquet
// Error : VeloxRuntimeError: vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding()
// An unsupported nested encoding was found. Operator: TableWrite(1)
@Test(enabled = false)
public void testPersistentCteWithArrayWhereInnerTypeSupported() {}

// Native engine does not support PAGEFILE which is needed for serializing Hive non-native types.
@Test(enabled = false)
public void testPersistentCteWithTimeStampWithTimeZoneType() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ public static Map<String, String> getNativeWorkerSystemProperties()
*/
public static void createAllTables(QueryRunner queryRunner)
{
createLineitem(queryRunner);
createOrders(queryRunner);
createAllTables(queryRunner, true);
}

public static void createAllTables(QueryRunner queryRunner, boolean castDateToVarchar)
{
createLineitem(queryRunner, castDateToVarchar);
createOrders(queryRunner, castDateToVarchar);
createOrdersEx(queryRunner);
createOrdersHll(queryRunner);
createNation(queryRunner);
Expand Down Expand Up @@ -96,12 +101,20 @@ public static void createAllIcebergTables(QueryRunner queryRunner)
}

public static void createLineitem(QueryRunner queryRunner)
{
createLineitem(queryRunner, true);
}

public static void createLineitem(QueryRunner queryRunner, boolean castDateToVarchar)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "lineitem")) {
String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate";
String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate";
String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate";
queryRunner.execute("CREATE TABLE lineitem AS " +
"SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
" returnflag, linestatus, cast(shipdate as varchar) as shipdate, cast(commitdate as varchar) as commitdate, " +
" cast(receiptdate as varchar) as receiptdate, shipinstruct, shipmode, comment, " +
" returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " +
" shipinstruct, shipmode, comment, " +
" linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " +
" cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " +
" cast(linenumber as smallint) as linenumber_as_smallint, " +
Expand All @@ -122,10 +135,16 @@ public static void createLineitemForIceberg(QueryRunner queryRunner)
}

public static void createOrders(QueryRunner queryRunner)
{
createOrders(queryRunner, true);
}

public static void createOrders(QueryRunner queryRunner, boolean castDateToVarchar)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders")) {
String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate";
queryRunner.execute("CREATE TABLE orders AS " +
"SELECT orderkey, custkey, orderstatus, totalprice, cast(orderdate as varchar) as orderdate, " +
"SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " +
" orderpriority, clerk, shippriority, comment " +
"FROM tpch.tiny.orders");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,43 @@ public static QueryRunner createNativeQueryRunner(
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds));
}

public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String storageFormat)
throws Exception
{
int cacheMaxSize = 0;

NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
String dataDirectory = nativeQueryRunnerParameters.dataDirectory.toString();
String prestoServerPath = nativeQueryRunnerParameters.serverBinary.toString();
Optional<Integer> workerCount = nativeQueryRunnerParameters.workerCount;

// The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner.
ImmutableMap<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.putAll(getNativeWorkerHiveProperties(storageFormat))
.put("hive.allow-drop-table", "true")
.put("hive.enable-parquet-dereference-pushdown", "true")
.put("hive.temporary-table-compression-codec", "NONE")
.put("hive.temporary-table-storage-format", storageFormat)
.build();

// Make query runner with external workers for tests
return HiveQueryRunner.createQueryRunner(
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.<String, String>builder()
.put("http-server.http.port", "8081")
.put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift))
.putAll(getNativeWorkerSystemProperties())
.put("query.cte-partitioning-provider-catalog", "hive")
.build(),
ImmutableMap.of(),
"legacy",
hiveProperties,
workerCount,
Optional.of(Paths.get(dataDirectory + "/" + storageFormat)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty()));
}

public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.Session;
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.SystemSessionProperties.CTE_FILTER_AND_PROJECTION_PUSHDOWN_ENABLED;

@Test(groups = {"parquet"})
public class TestPrestoNativeCteExecutionParquet
extends AbstractTestNativeCteExecution
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.createNativeCteQueryRunner(true, "PARQUET");
}

@Override
protected QueryRunner createExpectedQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.createJavaQueryRunner("PARQUET");
}

@Override
protected Session getSession()
{
return Session.builder(super.getSession())
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "NONE")
.build();
}
@Override
protected Session getMaterializedSession()
{
return Session.builder(super.getSession())
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL")
.setSystemProperty(CTE_FILTER_AND_PROJECTION_PUSHDOWN_ENABLED, "true")
.build();
}
}
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 148 files

0 comments on commit 2f3f268

Please sign in to comment.