Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more product tests coverage for native filesystem GCS and Azure #21958

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class TestGroups
public static final String PROFILE_SPECIFIC_TESTS = "profile_specific_tests";
public static final String HDFS_IMPERSONATION = "hdfs_impersonation";
public static final String HDFS_NO_IMPERSONATION = "hdfs_no_impersonation";
public static final String HIVE_GCS = "hive_gcs";
public static final String HIVE_SPARK = "hive_spark";
public static final String HIVE_SPARK_NO_STATS_FALLBACK = "hive_spark_no_stats_fallback";
public static final String HIVE_COMPRESSION = "hive_compression";
Expand All @@ -75,6 +76,8 @@ public final class TestGroups
public static final String KAFKA_CONFLUENT_LICENSE = "kafka_confluent_license";
public static final String TWO_HIVES = "two_hives";
public static final String ICEBERG = "iceberg";
public static final String ICEBERG_GCS = "iceberg_gcs";
public static final String ICEBERG_AZURE = "iceberg_azure";
public static final String ICEBERG_ALLUXIO_CACHING = "iceberg_alluxio_caching";
public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility";
public static final String ICEBERG_REST = "iceberg_rest";
Expand All @@ -88,6 +91,7 @@ public final class TestGroups
public static final String DELTA_LAKE_OSS = "delta-lake-oss";
public static final String DELTA_LAKE_HDFS = "delta-lake-hdfs";
public static final String DELTA_LAKE_MINIO = "delta-lake-minio";
public static final String DELTA_LAKE_AZURE = "delta-lake-azure";
public static final String DELTA_LAKE_GCS = "delta-lake-gcs";
public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks";
public static final String DELTA_LAKE_DATABRICKS_104 = "delta-lake-databricks-104";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.DockerContainer;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.EnvironmentProvider;
import io.trino.tests.product.launcher.env.common.Hadoop;
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;
import io.trino.tests.product.launcher.testcontainers.PortBinder;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,11 +35,14 @@
import java.nio.file.attribute.PosixFilePermissions;

import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.WORKER;
import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D;
import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_TRINO_HIVE_PROPERTIES;
import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC;
import static java.nio.file.attribute.PosixFilePermissions.fromString;
import static java.util.Objects.requireNonNull;
import static org.testcontainers.utility.MountableFile.forHostPath;
Expand All @@ -44,16 +51,23 @@
public class EnvMultinodeAzure
extends EnvironmentProvider
{
private static final int SPARK_THRIFT_PORT = 10213;
private static final File HIVE_JDBC_PROVIDER = new File("testing/trino-product-tests-launcher/target/hive-jdbc.jar");

private final DockerFiles.ResourceProvider configDir;
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopBaseImage;
private final String hadoopImagesVersion;

@Inject
public EnvMultinodeAzure(DockerFiles dockerFiles, StandardMultinode standardMultinode, Hadoop hadoop, EnvironmentConfig environmentConfig)
public EnvMultinodeAzure(DockerFiles dockerFiles, StandardMultinode standardMultinode, Hadoop hadoop, EnvironmentConfig environmentConfig, PortBinder portBinder)
{
super(ImmutableList.of(standardMultinode, hadoop));
configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-azure");
requireNonNull(environmentConfig, "environmentConfig is null");
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
hadoopBaseImage = environmentConfig.getHadoopBaseImage();
hadoopImagesVersion = environmentConfig.getHadoopImagesVersion();
}
Expand All @@ -64,6 +78,8 @@ public void extendEnvironment(Environment.Builder builder)
String dockerImageName = hadoopBaseImage + ":" + hadoopImagesVersion;
String schema = "test_" + randomNameSuffix();

String abfsContainer = requireEnv("ABFS_CONTAINER");
String abfsAccount = requireEnv("ABFS_ACCOUNT");
builder.configureContainer(HADOOP, container -> {
container.setDockerImageName(dockerImageName);
container.withCopyFileToContainer(
Expand All @@ -73,35 +89,84 @@ public void extendEnvironment(Environment.Builder builder)
forHostPath(configDir.getPath("apply-azure-config.sh")),
CONTAINER_HADOOP_INIT_D + "apply-azure-config.sh");
container
.withEnv("ABFS_CONTAINER", requireEnv("ABFS_CONTAINER"))
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_CONTAINER", abfsContainer)
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_SCHEMA", schema);
container.withCopyFileToContainer(
forHostPath(configDir.getPath("update-location.sh")),
CONTAINER_HADOOP_INIT_D + "update-location.sh");
});

String abfsAccessKey = requireEnv("ABFS_ACCESS_KEY");
builder.configureContainer(COORDINATOR, container -> container
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_ACCESS_KEY", requireEnv("ABFS_ACCESS_KEY")));
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_ACCESS_KEY", abfsAccessKey));

builder.configureContainer(WORKER, container -> container
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_ACCESS_KEY", requireEnv("ABFS_ACCESS_KEY")));
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_ACCESS_KEY", abfsAccessKey));

String temptoConfig = "/docker/presto-product-tests/conf/tempto/tempto-configuration-abfs.yaml";
builder.configureContainer(TESTS, container -> container
.withEnv("ABFS_CONTAINER", requireEnv("ABFS_CONTAINER"))
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_CONTAINER", abfsContainer)
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withCopyFileToContainer(
forHostPath(getTemptoConfiguration(schema)),
temptoConfig)
.withEnv("TEMPTO_CONFIG_FILES", temptoConfigFiles ->
temptoConfigFiles
.map(files -> files + "," + temptoConfig)
.orElse(temptoConfig)));
.orElse(temptoConfig))
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

builder.addConnector("hive", forHostPath(configDir.getPath("hive.properties")), CONTAINER_TRINO_HIVE_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(configDir.getPath("delta.properties")), CONTAINER_TRINO_ETC + "/catalog/delta.properties");
builder.addConnector("iceberg", forHostPath(configDir.getPath("iceberg.properties")), CONTAINER_TRINO_ETC + "/catalog/iceberg.properties");

builder.addContainer(createSpark())
.containerDependsOn("spark", HADOOP);
}

private DockerContainer createSpark()
{
DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, "spark")
.withEnv("HADOOP_USER_NAME", "hive")
.withCopyFileToContainer(
forHostPath(getSparkConf()),
"/spark/conf/spark-defaults.conf")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("common/spark/log4j2.properties")),
"/spark/conf/log4j2.properties")
.withCommand(
"spark-submit",
"--master", "local[*]",
"--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
"--name", "Thrift JDBC/ODBC Server",
"--packages", "org.apache.spark:spark-avro_2.12:3.2.1",
"--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT,
"spark-internal")
.withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
.waitingFor(forSelectedPorts(SPARK_THRIFT_PORT));

builder.addConnector("hive", forHostPath(configDir.getPath("hive.properties")));
portBinder.exposePort(container, SPARK_THRIFT_PORT);

return container;
}

private Path getSparkConf()
{
try {
String sparkConf = Files.readString(configDir.getPath("spark-defaults.conf"))
.replace("%ABFS_ACCOUNT%", requireEnv("ABFS_ACCOUNT"))
.replace("%ABFS_ACCESS_KEY%", requireEnv("ABFS_ACCESS_KEY"));
File sparkConfFile = Files.createTempFile("spark-defaults", ".conf", PosixFilePermissions.asFileAttribute(fromString("rwxrwxrwx"))).toFile();
sparkConfFile.deleteOnExit();
Files.writeString(sparkConfFile.toPath(), sparkConf);
return sparkConfFile.toPath();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private Path getCoreSiteOverrideXml()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.DockerContainer;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.EnvironmentProvider;
import io.trino.tests.product.launcher.env.common.Hadoop;
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;
import io.trino.tests.product.launcher.testcontainers.PortBinder;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
import org.testcontainers.utility.MountableFile;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,6 +37,7 @@
import java.util.Base64;
import java.util.UUID;

import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
Expand All @@ -54,14 +61,20 @@
public class EnvMultinodeGcs
extends EnvironmentProvider
{
private static final int SPARK_THRIFT_PORT = 10213;
private static final File HIVE_JDBC_PROVIDER = new File("testing/trino-product-tests-launcher/target/hive-jdbc.jar");
private final String gcsTestDirectory = "env_multinode_gcs_" + UUID.randomUUID();
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopImagesVersion;

@Inject
public EnvMultinodeGcs(DockerFiles dockerFiles, StandardMultinode multinode, Hadoop hadoop)
public EnvMultinodeGcs(DockerFiles dockerFiles, StandardMultinode multinode, Hadoop hadoop, EnvironmentConfig environmentConfig, PortBinder portBinder)
{
super(ImmutableList.of(multinode, hadoop));
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
hadoopImagesVersion = environmentConfig.getHadoopImagesVersion();
}

@Override
Expand Down Expand Up @@ -104,13 +117,45 @@ public void extendEnvironment(Environment.Builder builder)

builder.configureContainer(TESTS, container -> container
.withEnv("GCP_STORAGE_BUCKET", gcpStorageBucket)
.withEnv("GCP_TEST_DIRECTORY", gcsTestDirectory));
.withEnv("GCP_TEST_DIRECTORY", gcsTestDirectory)
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

builder.addConnector("hive", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/hive.properties")), CONTAINER_TRINO_HIVE_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/delta.properties")), CONTAINER_TRINO_ETC + "/catalog/delta.properties");
builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/iceberg.properties")), CONTAINER_TRINO_ETC + "/catalog/iceberg.properties");

configureTempto(builder, dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-gcs/"));

builder.addContainer(createSpark(forHostPath(gcpCredentialsFile.toPath())))
.containerDependsOn("spark", HADOOP);
}

private DockerContainer createSpark(MountableFile credentialsFile)
{
String containerGcpCredentialsFile = "/spark/conf/gcp-credentials.json";
DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, "spark")
.withEnv("HADOOP_USER_NAME", "hive")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/spark-defaults.conf")),
"/spark/conf/spark-defaults.conf")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("common/spark/log4j2.properties")),
"/spark/conf/log4j2.properties")
.withCommand(
"spark-submit",
"--master", "local[*]",
"--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
"--name", "Thrift JDBC/ODBC Server",
"--packages", "org.apache.spark:spark-avro_2.12:3.2.1",
"--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT,
"spark-internal")
.withCopyFileToContainer(credentialsFile, containerGcpCredentialsFile)
.withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
.waitingFor(forSelectedPorts(SPARK_THRIFT_PORT));

portBinder.exposePort(container, SPARK_THRIFT_PORT);

return container;
}

private Path getCoreSiteOverrideXml(String containerGcpCredentialsFilePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import static io.trino.tests.product.TestGroups.AZURE;
import static io.trino.tests.product.TestGroups.CONFIGURED_FEATURES;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_AZURE;
import static io.trino.tests.product.TestGroups.ICEBERG_AZURE;
import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment;

public class SuiteAzure
Expand All @@ -34,6 +36,12 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
return ImmutableList.of(
testOnEnvironment(EnvMultinodeAzure.class)
.withGroups(CONFIGURED_FEATURES, AZURE)
.build(),
testOnEnvironment(EnvMultinodeAzure.class)
.withGroups(DELTA_LAKE_AZURE, CONFIGURED_FEATURES)
.build(),
testOnEnvironment(EnvMultinodeAzure.class)
.withGroups(ICEBERG_AZURE, CONFIGURED_FEATURES)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import static io.trino.tests.product.TestGroups.CONFIGURED_FEATURES;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_GCS;
import static io.trino.tests.product.TestGroups.HIVE_GCS;
import static io.trino.tests.product.TestGroups.ICEBERG_GCS;
import static io.trino.tests.product.launcher.suite.SuiteTestRun.testOnEnvironment;

public class SuiteGcs
Expand All @@ -34,6 +36,12 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
return ImmutableList.of(
testOnEnvironment(EnvMultinodeGcs.class)
.withGroups(DELTA_LAKE_GCS, CONFIGURED_FEATURES)
.build(),
testOnEnvironment(EnvMultinodeGcs.class)
.withGroups(ICEBERG_GCS, CONFIGURED_FEATURES)
.build(),
testOnEnvironment(EnvMultinodeGcs.class)
.withGroups(HIVE_GCS, CONFIGURED_FEATURES)
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
connector.name=delta_lake
hive.metastore.uri=thrift://hadoop-master:9083
fs.hadoop.enabled=false
fs.native-azure.enabled=true
azure.auth-type=ACCESS_KEY
azure.access-key=${ENV:ABFS_ACCESS_KEY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
connector.name=iceberg
hive.metastore.uri=thrift://hadoop-master:9083
iceberg.file-format=PARQUET
fs.hadoop.enabled=false
fs.native-azure.enabled=true
azure.auth-type=ACCESS_KEY
azure.access-key=${ENV:ABFS_ACCESS_KEY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
spark.sql.catalogImplementation=hive
spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.sql.hive.thriftServer.singleSession=false

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_test.type=hive
spark.sql.catalog.iceberg_test.uri=thrift://hadoop-master:9083
; disabling caching allows us to run spark queries interchangeably with trino's
spark.sql.catalog.iceberg_test.cache-enabled=false
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000
spark.hive.metastore.uris=thrift://hadoop-master:9083
spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.hive.metastore.schema.verification=false

spark.hadoop.fs.azure.account.key.%ABFS_ACCOUNT%.dfs.core.windows.net=%ABFS_ACCESS_KEY%
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
<value>false</value>
</property>

<property>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened when this parameter was not specified?

Copy link
Member Author

@anusudarsan anusudarsan May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the with space test in Hive GCS failed. Caused by: io.trino.spi.TrinoException: java.lang.IllegalArgumentException: Invalid bucket name (....) or object name (env_multinode_gcs_..../test_path_special_character7ehvy1chkk/part=with space) This is true for legacy fs tests as well.

<name>fs.gs.path.encoding</name>
<value>uri-path</value>
</property>

<!-- Google Cloud Storage properties -->
<property>
<name>fs.gs.impl</name>
Expand Down
Loading