Skip to content

Commit

Permalink
Merge branch 'main' into improve-error-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Jun 27, 2024
2 parents a4b4fe7 + 2d46cff commit 2a88ca9
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 43 deletions.
60 changes: 60 additions & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# OpenSearch Table

## Overview

The `OpenSearchCatalog` class integrates Apache Spark with OpenSearch, allowing Spark to interact with OpenSearch indices as tables. This integration supports read and write operations, enabling seamless data processing and querying across Spark and OpenSearch.

## Configuration Parameters

To configure the `OpenSearchCatalog`, set the following parameters in your Spark session:

- **`opensearch.port`**: The port to connect to OpenSearch. Default is `9200`.
- **`opensearch.scheme`**: The scheme to use for the connection. Default is `http`. Valid values are `[http, https]`.
- **`opensearch.auth`**: The authentication method to use. Default is `noauth`. Valid values are `[noauth, sigv4, basic]`.
- **`opensearch.auth.username`**: The username for basic authentication.
- **`opensearch.auth.password`**: The password for basic authentication.
- **`opensearch.region`**: The AWS region to use for SigV4 authentication. Default is `us-west-2`. Used only when `auth` is `sigv4`.

## Usage

### Initializing the Catalog

To configure and initialize the catalog in your Spark session, set the following configurations:

```scala
spark.conf.set("spark.sql.catalog.dev", "org.apache.spark.opensearch.catalog.OpenSearchCatalog")
spark.conf.set("spark.sql.catalog.dev.opensearch.port", "9200")
spark.conf.set("spark.sql.catalog.dev.opensearch.scheme", "http")
spark.conf.set("spark.sql.catalog.dev.opensearch.auth", "noauth")
```

### Querying Data

Once the catalog is configured, you can use Spark SQL to query OpenSearch indices as tables:

- The namespace **MUST** be `default`.
- When using a wildcard index name, it **MUST** be wrapped in backticks.

Example:

```scala
val df = spark.sql("SELECT * FROM dev.default.my_index")
df.show()
```

Using a wildcard index name:
```scala
val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```

## Limitation
### catalog operation
- List Tables: Not supported.
- Create Table: Not supported.
- Alter Table: Not supported.
- Drop Table: Not supported.
- Rename Table: Not supported.

### table operation
- Table only support read operation, for instance, SELECT, DESCRIBE.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface FlintClient {
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern);
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Retrieve metadata in a Flint index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ public boolean exists(String indexName) {
}

@Override
public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
public Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern));
String[] indexNames =
Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexRequest request = new GetIndexRequest(indexNames);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
Expand All @@ -117,7 +118,8 @@ public Map<String, FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
)
));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
throw new IllegalStateException("Failed to get Flint index metadata for " +
String.join(",", indexNames), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.opensearch.catalog.OpenSearchCatalog.OPENSEARCH_PREFIX
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.flint.FlintReadOnlyTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A Spark TableCatalog implementation wrap OpenSearch domain as Catalog.
*
* <p> Configuration parameters for OpenSearchCatalog:
*
* <ul> <li><code>opensearch.port</code>: Default is 9200.</li>
* <li><code>opensearch.scheme</code>: Default is http. Valid values are [http, https].</li>
* <li><code>opensearch.auth</code>: Default is noauth. Valid values are [noauth, sigv4,
* basic].</li> <li><code>opensearch.auth.username</code>: Basic auth username.</li>
* <li><code>opensearch.auth.password</code>: Basic auth password.</li>
* <li><code>opensearch.region</code>: Default is us-west-2. Only used when auth is sigv4.</li>
* </ul>
*/
class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging {

private var catalogName: String = _
private var options: CaseInsensitiveStringMap = _

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
this.catalogName = name
this.options = options
}

override def name(): String = catalogName

@throws[NoSuchNamespaceException]
override def listTables(namespace: Array[String]): Array[Identifier] = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support listTables")
}

@throws[NoSuchTableException]
override def loadTable(ident: Identifier): Table = {
logInfo(s"Loading table ${ident.name()}")
if (!ident.namespace().exists(n => OpenSearchCatalog.isDefaultNamespace(n))) {
throw new NoSuchTableException(ident.namespace().mkString("."), ident.name())
}

val conf = new java.util.HashMap[String, String](
removePrefixFromMap(options.asCaseSensitiveMap(), OPENSEARCH_PREFIX))
conf.put("path", ident.name())

new FlintReadOnlyTable(conf, Option.empty)
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support createTable")
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support alterTable")
}

override def dropTable(ident: Identifier): Boolean = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support dropTable")
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support renameTable")
}

private def removePrefixFromMap(
map: java.util.Map[String, String],
prefix: String): java.util.Map[String, String] = {
val result = new java.util.HashMap[String, String]()
map.forEach { (key, value) =>
if (key.startsWith(prefix)) {
val newKey = key.substring(prefix.length)
result.put(newKey, value)
} else {
result.put(key, value)
}
}
result
}
}

object OpenSearchCatalog {

/**
* The reserved namespace.
*/
val RESERVED_DEFAULT_NAMESPACE: String = "default"

/**
* The prefix for OpenSearch-related configuration keys.
*/
val OPENSEARCH_PREFIX: String = "opensearch."

/**
* Checks if the given namespace is the reserved default namespace.
*
* @param namespace
* The namespace to check.
* @return
* True if the namespace is the reserved default namespace, false otherwise.
*/
def isDefaultNamespace(namespace: String): Boolean = {
RESERVED_DEFAULT_NAMESPACE.equalsIgnoreCase(namespace)
}

/**
* Splits the table name into index names.
*
* @param tableName
* The name of the table, potentially containing comma-separated index names.
* @return
* An array of index names.
*/
def indexNames(tableName: String): Array[String] = {
tableName.split(",")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql.flint

import java.util

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.opensearch.catalog.OpenSearchCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* FlintReadOnlyTable.
*
* @param conf
* configuration
* @param userSpecifiedSchema
* userSpecifiedSchema
*/
class FlintReadOnlyTable(
val conf: util.Map[String, String],
val userSpecifiedSchema: Option[StructType])
extends Table
with SupportsRead {

lazy val sparkSession = SparkSession.active

lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf)

lazy val name: String = flintSparkConf.tableName()

// todo. currently, we use first index schema in multiple indices. we should merge StructType
// to widen type
lazy val schema: StructType = {
userSpecifiedSchema.getOrElse {
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*)
.values()
.asScala
.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}
}

override def capabilities(): util.Set[TableCapability] =
util.EnumSet.of(BATCH_READ)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, flintSparkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,10 @@ package org.apache.spark.sql.flint

import java.util

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* FlintTable.
Expand All @@ -26,39 +19,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* @param userSpecifiedSchema
* userSpecifiedSchema
*/
case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Option[StructType])
extends Table
with SupportsRead
case class FlintTable(
override val conf: util.Map[String, String],
override val userSpecifiedSchema: Option[StructType])
extends FlintReadOnlyTable(conf, userSpecifiedSchema)
with SupportsWrite {

lazy val sparkSession = SparkSession.active

lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf)

lazy val name: String = flintSparkConf.tableName()

var schema: StructType = {
if (schema == null) {
schema = if (userSpecifiedSchema.isDefined) {
userSpecifiedSchema.get
} else {
FlintDataType.deserialize(
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getIndexMetadata(name)
.getContent)
}
}
schema
}

override def capabilities(): util.Set[TableCapability] =
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, flintSparkConf)
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
FlintWriteBuilder(name, info, flintSparkConf)
}
Expand Down
Loading

0 comments on commit 2a88ca9

Please sign in to comment.