Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Flint Spark API error reporting with centralized handler #348

Merged
merged 16 commits into from
Jun 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
String.format("Index state [%s] doesn't satisfy precondition", latest.state()));
}

// Append optional transient log
Expand Down Expand Up @@ -126,7 +126,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
} catch (Exception ex) {
LOG.log(WARNING, "Failed to rollback transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
throw new IllegalStateException("Failed to commit transaction operation", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
Expand All @@ -34,7 +35,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGN
/**
* Flint Spark integration API entrypoint.
*/
class FlintSpark(val spark: SparkSession) extends Logging {
class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport with Logging {

/** Flint spark configuration */
private val flintSparkConf: FlintSparkConf =
Expand All @@ -46,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintMetadataLogService: FlintMetadataLogService = {
override protected val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
flintSparkConf.flintOptions(),
spark.sparkContext.getConf)
Expand Down Expand Up @@ -97,18 +98,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists")
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
try {
flintMetadataLogService
.startTransaction(indexName, true)
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx =>
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -119,14 +118,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand All @@ -136,15 +129,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintMetadataLogService
.startTransaction(indexName)
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, "Refresh Flint index") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
tx
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand All @@ -160,12 +150,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -221,25 +206,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

val indexName = index.name()
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

Expand All @@ -251,12 +230,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintMetadataLogService
.startTransaction(indexName)
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Delete Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
Expand All @@ -267,16 +244,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Delete a Flint index physically.
Expand All @@ -286,81 +258,66 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintMetadataLogService
.startTransaction(indexName)
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Vacuum Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintMetadataLogService
.startTransaction(indexName)
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Recover Flint index") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ =>
.commit(_ => {
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
.start(spark, flintSparkConf)
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
}
false
}
}

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down Expand Up @@ -460,11 +417,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintMetadataLogService
.startTransaction(indexName)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -478,12 +436,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
private def updateIndexManualToAuto(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintMetadataLogService
.startTransaction(indexName)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down
Loading
Loading