Skip to content

Commit

Permalink
Add spark compatibility tests for GCS and Azure
Browse files Browse the repository at this point in the history
Also with native filesystem enabled
  • Loading branch information
anusudarsan committed May 24, 2024
1 parent 5cc8a8c commit faa0d12
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.DockerContainer;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.EnvironmentProvider;
import io.trino.tests.product.launcher.env.common.Hadoop;
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;
import io.trino.tests.product.launcher.testcontainers.PortBinder;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,6 +35,7 @@
import java.nio.file.attribute.PosixFilePermissions;

import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
Expand All @@ -46,16 +51,23 @@
public class EnvMultinodeAzure
extends EnvironmentProvider
{
private static final int SPARK_THRIFT_PORT = 10213;
private static final File HIVE_JDBC_PROVIDER = new File("testing/trino-product-tests-launcher/target/hive-jdbc.jar");

private final DockerFiles.ResourceProvider configDir;
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopBaseImage;
private final String hadoopImagesVersion;

@Inject
public EnvMultinodeAzure(DockerFiles dockerFiles, StandardMultinode standardMultinode, Hadoop hadoop, EnvironmentConfig environmentConfig)
public EnvMultinodeAzure(DockerFiles dockerFiles, StandardMultinode standardMultinode, Hadoop hadoop, EnvironmentConfig environmentConfig, PortBinder portBinder)
{
super(ImmutableList.of(standardMultinode, hadoop));
configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-azure");
requireNonNull(environmentConfig, "environmentConfig is null");
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
hadoopBaseImage = environmentConfig.getHadoopBaseImage();
hadoopImagesVersion = environmentConfig.getHadoopImagesVersion();
}
Expand All @@ -66,6 +78,8 @@ public void extendEnvironment(Environment.Builder builder)
String dockerImageName = hadoopBaseImage + ":" + hadoopImagesVersion;
String schema = "test_" + randomNameSuffix();

String abfsContainer = requireEnv("ABFS_CONTAINER");
String abfsAccount = requireEnv("ABFS_ACCOUNT");
builder.configureContainer(HADOOP, container -> {
container.setDockerImageName(dockerImageName);
container.withCopyFileToContainer(
Expand All @@ -75,37 +89,84 @@ public void extendEnvironment(Environment.Builder builder)
forHostPath(configDir.getPath("apply-azure-config.sh")),
CONTAINER_HADOOP_INIT_D + "apply-azure-config.sh");
container
.withEnv("ABFS_CONTAINER", requireEnv("ABFS_CONTAINER"))
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_CONTAINER", abfsContainer)
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_SCHEMA", schema);
container.withCopyFileToContainer(
forHostPath(configDir.getPath("update-location.sh")),
CONTAINER_HADOOP_INIT_D + "update-location.sh");
});

String abfsAccessKey = requireEnv("ABFS_ACCESS_KEY");
builder.configureContainer(COORDINATOR, container -> container
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_ACCESS_KEY", requireEnv("ABFS_ACCESS_KEY")));
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_ACCESS_KEY", abfsAccessKey));

builder.configureContainer(WORKER, container -> container
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_ACCESS_KEY", requireEnv("ABFS_ACCESS_KEY")));
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withEnv("ABFS_ACCESS_KEY", abfsAccessKey));

String temptoConfig = "/docker/presto-product-tests/conf/tempto/tempto-configuration-abfs.yaml";
builder.configureContainer(TESTS, container -> container
.withEnv("ABFS_CONTAINER", requireEnv("ABFS_CONTAINER"))
.withEnv("ABFS_ACCOUNT", requireEnv("ABFS_ACCOUNT"))
.withEnv("ABFS_CONTAINER", abfsContainer)
.withEnv("ABFS_ACCOUNT", abfsAccount)
.withCopyFileToContainer(
forHostPath(getTemptoConfiguration(schema)),
temptoConfig)
.withEnv("TEMPTO_CONFIG_FILES", temptoConfigFiles ->
temptoConfigFiles
.map(files -> files + "," + temptoConfig)
.orElse(temptoConfig)));
.orElse(temptoConfig))
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

builder.addConnector("hive", forHostPath(configDir.getPath("hive.properties")), CONTAINER_TRINO_HIVE_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(configDir.getPath("delta.properties")), CONTAINER_TRINO_ETC + "/catalog/delta.properties");
builder.addConnector("iceberg", forHostPath(configDir.getPath("iceberg.properties")), CONTAINER_TRINO_ETC + "/catalog/iceberg.properties");

builder.addContainer(createSpark())
.containerDependsOn("spark", HADOOP);
}

private DockerContainer createSpark()
{
DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, "spark")
.withEnv("HADOOP_USER_NAME", "hive")
.withCopyFileToContainer(
forHostPath(getSparkConf()),
"/spark/conf/spark-defaults.conf")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("common/spark/log4j2.properties")),
"/spark/conf/log4j2.properties")
.withCommand(
"spark-submit",
"--master", "local[*]",
"--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
"--name", "Thrift JDBC/ODBC Server",
"--packages", "org.apache.spark:spark-avro_2.12:3.2.1",
"--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT,
"spark-internal")
.withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
.waitingFor(forSelectedPorts(SPARK_THRIFT_PORT));

portBinder.exposePort(container, SPARK_THRIFT_PORT);

return container;
}

private Path getSparkConf()
{
try {
String sparkConf = Files.readString(configDir.getPath("spark-defaults.conf"))
.replace("%ABFS_ACCOUNT%", requireEnv("ABFS_ACCOUNT"))
.replace("%ABFS_ACCESS_KEY%", requireEnv("ABFS_ACCESS_KEY"));
File sparkConfFile = Files.createTempFile("spark-defaults", ".conf", PosixFilePermissions.asFileAttribute(fromString("rwxrwxrwx"))).toFile();
sparkConfFile.deleteOnExit();
Files.writeString(sparkConfFile.toPath(), sparkConf);
return sparkConfFile.toPath();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private Path getCoreSiteOverrideXml()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.tests.product.launcher.docker.DockerFiles;
import io.trino.tests.product.launcher.env.DockerContainer;
import io.trino.tests.product.launcher.env.Environment;
import io.trino.tests.product.launcher.env.EnvironmentConfig;
import io.trino.tests.product.launcher.env.EnvironmentProvider;
import io.trino.tests.product.launcher.env.common.Hadoop;
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;
import io.trino.tests.product.launcher.testcontainers.PortBinder;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
import org.testcontainers.utility.MountableFile;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,6 +37,7 @@
import java.util.Base64;
import java.util.UUID;

import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
Expand All @@ -54,14 +61,20 @@
public class EnvMultinodeGcs
extends EnvironmentProvider
{
private static final int SPARK_THRIFT_PORT = 10213;
private static final File HIVE_JDBC_PROVIDER = new File("testing/trino-product-tests-launcher/target/hive-jdbc.jar");
private final String gcsTestDirectory = "env_multinode_gcs_" + UUID.randomUUID();
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopImagesVersion;

@Inject
public EnvMultinodeGcs(DockerFiles dockerFiles, StandardMultinode multinode, Hadoop hadoop)
public EnvMultinodeGcs(DockerFiles dockerFiles, StandardMultinode multinode, Hadoop hadoop, EnvironmentConfig environmentConfig, PortBinder portBinder)
{
super(ImmutableList.of(multinode, hadoop));
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
hadoopImagesVersion = environmentConfig.getHadoopImagesVersion();
}

@Override
Expand Down Expand Up @@ -104,13 +117,45 @@ public void extendEnvironment(Environment.Builder builder)

builder.configureContainer(TESTS, container -> container
.withEnv("GCP_STORAGE_BUCKET", gcpStorageBucket)
.withEnv("GCP_TEST_DIRECTORY", gcsTestDirectory));
.withEnv("GCP_TEST_DIRECTORY", gcsTestDirectory)
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

builder.addConnector("hive", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/hive.properties")), CONTAINER_TRINO_HIVE_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/delta.properties")), CONTAINER_TRINO_ETC + "/catalog/delta.properties");
builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/iceberg.properties")), CONTAINER_TRINO_ETC + "/catalog/iceberg.properties");

configureTempto(builder, dockerFiles.getDockerFilesHostDirectory("conf/environment/multinode-gcs/"));

builder.addContainer(createSpark(forHostPath(gcpCredentialsFile.toPath())))
.containerDependsOn("spark", HADOOP);
}

private DockerContainer createSpark(MountableFile credentialsFile)
{
String containerGcpCredentialsFile = "/spark/conf/gcp-credentials.json";
DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, "spark")
.withEnv("HADOOP_USER_NAME", "hive")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-gcs/spark-defaults.conf")),
"/spark/conf/spark-defaults.conf")
.withCopyFileToContainer(
forHostPath(dockerFiles.getDockerFilesHostPath("common/spark/log4j2.properties")),
"/spark/conf/log4j2.properties")
.withCommand(
"spark-submit",
"--master", "local[*]",
"--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
"--name", "Thrift JDBC/ODBC Server",
"--packages", "org.apache.spark:spark-avro_2.12:3.2.1",
"--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT,
"spark-internal")
.withCopyFileToContainer(credentialsFile, containerGcpCredentialsFile)
.withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
.waitingFor(forSelectedPorts(SPARK_THRIFT_PORT));

portBinder.exposePort(container, SPARK_THRIFT_PORT);

return container;
}

private Path getCoreSiteOverrideXml(String containerGcpCredentialsFilePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
spark.sql.catalogImplementation=hive
spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.sql.hive.thriftServer.singleSession=false

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_test.type=hive
spark.sql.catalog.iceberg_test.uri=thrift://hadoop-master:9083
; disabling caching allows us to run spark queries interchangeably with trino's
spark.sql.catalog.iceberg_test.cache-enabled=false
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000
spark.hive.metastore.uris=thrift://hadoop-master:9083
spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.hive.metastore.schema.verification=false

spark.hadoop.fs.azure.account.key.%ABFS_ACCOUNT%.dfs.core.windows.net=%ABFS_ACCESS_KEY%
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
spark.sql.catalogImplementation=hive
spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.sql.hive.thriftServer.singleSession=false

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_test.type=hive
spark.sql.catalog.iceberg_test.uri=thrift://hadoop-master:9083
; disabling caching allows us to run spark queries interchangeably with trino's
spark.sql.catalog.iceberg_test.cache-enabled=false
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000
spark.hive.metastore.uris=thrift://hadoop-master:9083
spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse
spark.hive.metastore.schema.verification=false
spark.hive.enforce.bucketing=false

spark.hadoop.fs.gs.auth.service.account.json.keyfile=/spark/conf/gcp-credentials.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onSpark;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -127,4 +128,39 @@ protected void testPathContainsSpecialCharacter(String schemaLocation, String pa
onTrino().executeQuery("DROP SCHEMA %1$s.test".formatted(getCatalogName()));
}
}

protected void testSparkCompatibilityOnTrinoCreatedTable(String schemaLocation)
{
String baseTableName = "trino_created_table_using_parquet_" + randomNameSuffix();
String sparkTableName = format("%s.test_compat.%s", getSparkCatalog(), baseTableName);
String trinoTableName = format("%s.test_compat.%s", getCatalogName(), baseTableName);
try {
onTrino().executeQuery(format("CREATE SCHEMA %1$s.test_compat WITH (location = '%2$s')", getCatalogName(), schemaLocation));

onTrino().executeQuery(format(
"CREATE TABLE %s ( " +
" a_boolean boolean, " +
" a_dummy varchar " +
") " +
"WITH ( " +
" format = 'PARQUET' " +
")",
trinoTableName));

onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (true, 'dummy')");

List<QueryAssert.Row> expected = List.of(row(true, "dummy"));
assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected);
assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected);
}
finally {
onTrino().executeQuery("DROP TABLE IF EXISTS %s".formatted(trinoTableName));
onTrino().executeQuery("DROP SCHEMA IF EXISTS %s.test_compat".formatted(getCatalogName()));
}
}

protected String getSparkCatalog()
{
return "spark_catalog";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,10 @@ public void testPathContainsSpecialCharacter()
{
super.testPathContainsSpecialCharacter(schemaLocation, "partitioned_by");
}

@Test(groups = {AZURE, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingTrinoData()
{
super.testSparkCompatibilityOnTrinoCreatedTable(schemaLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public void testPathContainsSpecialCharacter()
super.testPathContainsSpecialCharacter(warehouseDirectory, "partitioned_by");
}

@Test(groups = {HIVE_GCS, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingTrinoData()
{
super.testSparkCompatibilityOnTrinoCreatedTable(warehouseDirectory);
}

@Override
protected String getCatalogName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ public void testPathContainsSpecialCharacter()
{
super.testPathContainsSpecialCharacter(schemaLocation, "partitioning");
}

@Test(groups = {ICEBERG_AZURE, PROFILE_SPECIFIC_TESTS})
public void testSparkReadingTrinoData()
{
super.testSparkCompatibilityOnTrinoCreatedTable(schemaLocation);
}

@Override
protected String getSparkCatalog()
{
return "iceberg_test";
}
}
Loading

0 comments on commit faa0d12

Please sign in to comment.