diff --git a/docs/opensearch-table.md b/docs/opensearch-table.md new file mode 100644 index 00000000..2f617eb1 --- /dev/null +++ b/docs/opensearch-table.md @@ -0,0 +1,60 @@ +# OpenSearch Table + +## Overview + +The `OpenSearchCatalog` class integrates Apache Spark with OpenSearch, allowing Spark to interact with OpenSearch indices as tables. This integration supports read and write operations, enabling seamless data processing and querying across Spark and OpenSearch. + +## Configuration Parameters + +To configure the `OpenSearchCatalog`, set the following parameters in your Spark session: + +- **`opensearch.port`**: The port to connect to OpenSearch. Default is `9200`. +- **`opensearch.scheme`**: The scheme to use for the connection. Default is `http`. Valid values are `[http, https]`. +- **`opensearch.auth`**: The authentication method to use. Default is `noauth`. Valid values are `[noauth, sigv4, basic]`. +- **`opensearch.auth.username`**: The username for basic authentication. +- **`opensearch.auth.password`**: The password for basic authentication. +- **`opensearch.region`**: The AWS region to use for SigV4 authentication. Default is `us-west-2`. Used only when `auth` is `sigv4`. + +## Usage + +### Initializing the Catalog + +To configure and initialize the catalog in your Spark session, set the following configurations: + +```scala +spark.conf.set("spark.sql.catalog.dev", "org.apache.spark.opensearch.catalog.OpenSearchCatalog") +spark.conf.set("spark.sql.catalog.dev.opensearch.port", "9200") +spark.conf.set("spark.sql.catalog.dev.opensearch.scheme", "http") +spark.conf.set("spark.sql.catalog.dev.opensearch.auth", "noauth") +``` + +### Querying Data + +Once the catalog is configured, you can use Spark SQL to query OpenSearch indices as tables: + +- The namespace **MUST** be `default`. +- When using a wildcard index name, it **MUST** be wrapped in backticks. + +Example: + +```scala +val df = spark.sql("SELECT * FROM dev.default.my_index") +df.show() +``` + +Using a wildcard index name: +```scala +val df = spark.sql("SELECT * FROM dev.default.`my_index*`") +df.show() +``` + +## Limitation +### catalog operation +- List Tables: Not supported. +- Create Table: Not supported. +- Alter Table: Not supported. +- Drop Table: Not supported. +- Rename Table: Not supported. + +### table operation +- Table only support read operation, for instance, SELECT, DESCRIBE. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index b9ef0585..e5e18f12 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -40,7 +40,7 @@ public interface FlintClient { * @return map where the keys are the matched index names, and the values are * corresponding index metadata */ - Map getAllIndexMetadata(String indexNamePattern); + Map getAllIndexMetadata(String... indexNamePattern); /** * Retrieve metadata in a Flint index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 36db4a04..2a3bf2da 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -101,11 +101,12 @@ public boolean exists(String indexName) { } @Override - public Map getAllIndexMetadata(String indexNamePattern) { - LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); - String osIndexNamePattern = sanitizeIndexName(indexNamePattern); + public Map getAllIndexMetadata(String... indexNamePattern) { + LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern)); + String[] indexNames = + Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new); try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); + GetIndexRequest request = new GetIndexRequest(indexNames); GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) @@ -117,7 +118,8 @@ public Map getAllIndexMetadata(String indexNamePattern) { ) )); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + + String.join(",", indexNames), e); } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala new file mode 100644 index 00000000..3594f41d --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalog.scala @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import org.apache.spark.internal.Logging +import org.apache.spark.opensearch.catalog.OpenSearchCatalog.OPENSEARCH_PREFIX +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.flint.FlintReadOnlyTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A Spark TableCatalog implementation wrap OpenSearch domain as Catalog. + * + *

Configuration parameters for OpenSearchCatalog: + * + *

  • opensearch.port: Default is 9200.
  • + *
  • opensearch.scheme: Default is http. Valid values are [http, https].
  • + *
  • opensearch.auth: Default is noauth. Valid values are [noauth, sigv4, + * basic].
  • opensearch.auth.username: Basic auth username.
  • + *
  • opensearch.auth.password: Basic auth password.
  • + *
  • opensearch.region: Default is us-west-2. Only used when auth is sigv4.
  • + *
+ */ +class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging { + + private var catalogName: String = _ + private var options: CaseInsensitiveStringMap = _ + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + this.catalogName = name + this.options = options + } + + override def name(): String = catalogName + + @throws[NoSuchNamespaceException] + override def listTables(namespace: Array[String]): Array[Identifier] = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support listTables") + } + + @throws[NoSuchTableException] + override def loadTable(ident: Identifier): Table = { + logInfo(s"Loading table ${ident.name()}") + if (!ident.namespace().exists(n => OpenSearchCatalog.isDefaultNamespace(n))) { + throw new NoSuchTableException(ident.namespace().mkString("."), ident.name()) + } + + val conf = new java.util.HashMap[String, String]( + removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX)) + conf.put("path", ident.name()) + + new FlintReadOnlyTable(conf, Option.empty) + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support createTable") + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support alterTable") + } + + override def dropTable(ident: Identifier): Boolean = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support dropTable") + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("OpenSearchCatalog does not support renameTable") + } + + private def removePrefixFromMap( + map: java.util.Map[String, String], + prefix: String): java.util.Map[String, String] = { + val result = new java.util.HashMap[String, String]() + map.forEach { (key, value) => + if (key.startsWith(prefix)) { + val newKey = key.substring(prefix.length) + result.put(newKey, value) + } else { + result.put(key, value) + } + } + result + } +} + +object OpenSearchCatalog { + + /** + * The reserved namespace. + */ + val RESERVED_DEFAULT_NAMESPACE: String = "default" + + /** + * The prefix for OpenSearch-related configuration keys. + */ + val OPENSEARCH_PREFIX: String = "opensearch." + + /** + * Checks if the given namespace is the reserved default namespace. + * + * @param namespace + * The namespace to check. + * @return + * True if the namespace is the reserved default namespace, false otherwise. + */ + def isDefaultNamespace(namespace: String): Boolean = { + RESERVED_DEFAULT_NAMESPACE.equalsIgnoreCase(namespace) + } + + /** + * Splits the table name into index names. + * + * @param tableName + * The name of the table, potentially containing comma-separated index names. + * @return + * An array of index names. + */ + def indexNames(tableName: String): Array[String] = { + tableName.split(",") + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala new file mode 100644 index 00000000..ac83d2ef --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.flint + +import java.util + +import scala.collection.JavaConverters._ + +import org.opensearch.flint.core.FlintClientBuilder + +import org.apache.spark.opensearch.catalog.OpenSearchCatalog +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * FlintReadOnlyTable. + * + * @param conf + * configuration + * @param userSpecifiedSchema + * userSpecifiedSchema + */ +class FlintReadOnlyTable( + val conf: util.Map[String, String], + val userSpecifiedSchema: Option[StructType]) + extends Table + with SupportsRead { + + lazy val sparkSession = SparkSession.active + + lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf) + + lazy val name: String = flintSparkConf.tableName() + + // todo. currently, we use first index schema in multiple indices. we should merge StructType + // to widen type + lazy val schema: StructType = { + userSpecifiedSchema.getOrElse { + FlintClientBuilder + .build(flintSparkConf.flintOptions()) + .getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*) + .values() + .asScala + .headOption + .map(m => FlintDataType.deserialize(m.getContent)) + .getOrElse(StructType(Nil)) + } + } + + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(BATCH_READ) + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + FlintScanBuilder(name, schema, flintSparkConf) + } +} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index c078f7fb..d86702b0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -7,17 +7,10 @@ package org.apache.spark.sql.flint import java.util -import org.opensearch.flint.core.FlintClientBuilder - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE} -import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * FlintTable. @@ -26,39 +19,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * @param userSpecifiedSchema * userSpecifiedSchema */ -case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType]) - extends Table - with SupportsRead +case class FlintTable( + override val conf: util.Map[String, String], + override val userSpecifiedSchema: Option[StructType]) + extends FlintReadOnlyTable(conf, userSpecifiedSchema) with SupportsWrite { - lazy val sparkSession = SparkSession.active - - lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf) - - lazy val name: String = flintSparkConf.tableName() - - var schema: StructType = { - if (schema == null) { - schema = if (userSpecifiedSchema.isDefined) { - userSpecifiedSchema.get - } else { - FlintDataType.deserialize( - FlintClientBuilder - .build(flintSparkConf.flintOptions()) - .getIndexMetadata(name) - .getContent) - } - } - schema - } - override def capabilities(): util.Set[TableCapability] = util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE) - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - FlintScanBuilder(name, schema, flintSparkConf) - } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { FlintWriteBuilder(name, info, flintSparkConf) } diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala b/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala new file mode 100644 index 00000000..7c578ecf --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogTest.scala @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import scala.collection.JavaConverters._ + +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.flint.FlintReadOnlyTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class OpenSearchCatalogTest + extends FlintSuite + with Matchers + with BeforeAndAfterEach + with MockitoSugar { + + private var catalog: OpenSearchCatalog = _ + private val catalogName = "dev" + private val optionsMap = Map( + "opensearch.port" -> "9200", + "opensearch.scheme" -> "http", + "opensearch.auth" -> "noauth", + "some.other.config" -> "value") + private val options = new CaseInsensitiveStringMap(optionsMap.asJava) + + override def beforeEach(): Unit = { + catalog = new OpenSearchCatalog() + catalog.initialize(catalogName, options) + } + + test("Catalog should initialize with given name and options") { + catalog.name() should be(catalogName) + } + + test("listTables should throw UnsupportedOperationException") { + val namespace = Array("default") + + intercept[UnsupportedOperationException] { + catalog.listTables(namespace) + } + } + + test("loadTable should throw NoSuchTableException if namespace is not default") { + val identifier = mock[Identifier] + when(identifier.namespace()).thenReturn(Array("non-default")) + when(identifier.name()).thenReturn("table1") + + intercept[NoSuchTableException] { + catalog.loadTable(identifier) + } + } + + test("loadTable should load table if namespace is default") { + val identifier = mock[Identifier] + when(identifier.namespace()).thenReturn(Array("default")) + when(identifier.name()).thenReturn("table1") + + val table = catalog.loadTable(identifier) + table should not be null + table.name() should be("table1") + + val flintTableConf = table.asInstanceOf[FlintReadOnlyTable].conf + flintTableConf.get("port") should be("9200") + flintTableConf.get("scheme") should be("http") + flintTableConf.get("auth") should be("noauth") + flintTableConf.get("some.other.config") should be("value") + flintTableConf.containsKey("opensearch.port") should be(false) + flintTableConf.containsKey("opensearch.scheme") should be(false) + flintTableConf.containsKey("opensearch.auth") should be(false) + } + + test("createTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + val schema = mock[StructType] + val partitions = Array.empty[Transform] + val properties = Map.empty[String, String].asJava + + intercept[UnsupportedOperationException] { + catalog.createTable(identifier, schema, partitions, properties) + } + } + + test("alterTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + val changes = Array.empty[TableChange] + + intercept[UnsupportedOperationException] { + catalog.alterTable(identifier, changes: _*) + } + } + + test("dropTable should throw UnsupportedOperationException") { + val identifier = mock[Identifier] + + intercept[UnsupportedOperationException] { + catalog.dropTable(identifier) + } + } + + test("renameTable should throw UnsupportedOperationException") { + val oldIdentifier = mock[Identifier] + val newIdentifier = mock[Identifier] + + intercept[UnsupportedOperationException] { + catalog.renameTable(oldIdentifier, newIdentifier) + } + } + + test("isDefaultNamespace should return true for default namespace") { + OpenSearchCatalog.isDefaultNamespace("default") should be(true) + } + + test("isDefaultNamespace should return false for non-default namespace") { + OpenSearchCatalog.isDefaultNamespace("non-default") should be(false) + } + + test("indexNames should split table name by comma") { + val tableName = "index-1,index-2" + val result = OpenSearchCatalog.indexNames(tableName) + result should contain theSameElementsAs Array("index-1", "index-2") + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index cadb6c93..c07a443b 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark.covering +import scala.collection.JavaConverters._ + import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry @@ -198,7 +200,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { }) indexes.foreach { index => - when(client.getIndexMetadata(index.name())).thenReturn(index.metadata()) + when(client.getAllIndexMetadata(index.name())) + .thenReturn(Map.apply(index.name() -> index.metadata()).asJava) } rule.apply(plan) } diff --git a/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala new file mode 100644 index 00000000..1d5229d9 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.catalog + +import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.spark.FlintSparkSuite + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.streaming.StreamTest + +class OpenSearchCatalogITSuite extends FlintSparkSuite { + + private val catalogName = "dev" + + override def beforeAll(): Unit = { + super.beforeAll() + + spark.conf.set( + s"spark.sql.catalog.${catalogName}", + "org.apache.spark.opensearch.catalog.OpenSearchCatalog") + spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.port", s"$openSearchPort") + spark.conf.set(s"spark.sql.catalog.${catalogName}.opensearch.host", openSearchHost) + spark.conf.set( + s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy", + "wait_for") + } + + test("Load single index as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.$indexName""") + + assert(df.count() == 1) + checkAnswer(df, Row("123", "event", "source")) + } + } + + test("Describe single index as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + DESC ${catalogName}.default.$indexName""") + + assert(df.count() == 6) + checkAnswer( + df, + Seq( + Row("# Partitioning", "", ""), + Row("", "", ""), + Row("Not partitioned", "", ""), + Row("accountId", "string", ""), + Row("eventName", "string", ""), + Row("eventSource", "string", ""))) + } + } + + test("Failed to write value to readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s""" + INSERT INTO ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""") + } + assert( + exception.getMessage.contains(s"Table $indexName does not support append in batch mode.")) + } + } + + test("Failed to delete value from readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s"DELETE FROM ${catalogName}.default.$indexName WHERE accountId = '234'") + } + assert(exception.getMessage.contains(s"Table does not support deletes: $indexName")) + } + } + + test("Failed to overwrite value of readonly table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val exception = intercept[AnalysisException] { + spark.sql(s""" + INSERT OVERWRITE TABLE ${catalogName}.default.$indexName VALUES ('234', 'event-1', 'source-1')""") + } + assert( + exception.getMessage.contains( + s"Table $indexName does not support truncate in batch mode.")) + } + } + + test("Load index wildcard expression as table") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.`t*`""") + + assert(df.count() == 1) + checkAnswer(df, Row("123", "event", "source")) + } + } + + // FIXME, enable it when add partition info into OpenSearchTable. + ignore("Load comma seperated index expression as table") { + val indexName1 = "t0001" + val indexName2 = "t0002" + withIndexName(indexName1) { + withIndexName(indexName2) { + simpleIndex(indexName1) + simpleIndex(indexName2) + val df = spark.sql(s""" + SELECT accountId, eventName, eventSource + FROM ${catalogName}.default.`t0001,t0002`""") + + assert(df.count() == 2) + checkAnswer(df, Seq(Row("123", "event", "source"), Row("123", "event", "source"))) + } + } + } +}