Skip to content

Commit

Permalink
Canceled stage in the case of result table creation failure (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdalbertMemSQL committed Feb 26, 2024
1 parent c37e884 commit 512bf68
Showing 1 changed file with 24 additions and 18 deletions.
@@ -1,14 +1,12 @@
package com.singlestore.spark

import java.sql.Connection
import java.sql.{Connection, SQLException}
import java.util.Properties

import com.singlestore.spark.JdbcHelpers.getDDLConnProperties
import com.singlestore.spark.SQLGen.VariableList
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{
SparkListener,
SparkListenerApplicationEnd,
SparkListenerStageCompleted,
SparkListenerStageSubmitted
}
Expand All @@ -26,7 +24,8 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
new mutable.HashMap[Int, SingleStoreRDDInfo]()

// SingleStoreRDDInfo is information needed to create a result table
private case class SingleStoreRDDInfo(query: String,
private case class SingleStoreRDDInfo(sc: SparkContext,
query: String,
variables: VariableList,
schema: StructType,
connectionProperties: Properties,
Expand All @@ -37,6 +36,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
def addRDDInfo(rdd: SinglestoreRDD): Unit = {
rddInfos.synchronized({
rddInfos += (rdd.id -> SingleStoreRDDInfo(
rdd.sparkContext,
rdd.query,
rdd.variables,
rdd.schema,
Expand Down Expand Up @@ -70,9 +70,10 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
.foreach(singleStoreRDDInfo => {
val stageId = stageSubmitted.stageInfo.stageId
val attemptNumber = stageSubmitted.stageInfo.attemptNumber()
val randHex = rddInfo.name.substring("SingleStoreRDD".size)
val randHex = rddInfo.name.substring("SingleStoreRDD".size)
val tableName =
JdbcHelpers.getResultTableName(applicationId, stageId, rddInfo.id, attemptNumber, randHex)
JdbcHelpers
.getResultTableName(applicationId, stageId, rddInfo.id, attemptNumber, randHex)

// Create connection and save it in the map
val conn =
Expand All @@ -81,17 +82,22 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
connectionsMap += (tableName -> conn)
)

// Create result table
JdbcHelpers.createResultTable(
conn,
tableName,
singleStoreRDDInfo.query,
singleStoreRDDInfo.schema,
singleStoreRDDInfo.variables,
singleStoreRDDInfo.materialized,
singleStoreRDDInfo.needsRepartition,
singleStoreRDDInfo.repartitionColumns
)
try {
// Create result table
JdbcHelpers.createResultTable(
conn,
tableName,
singleStoreRDDInfo.query,
singleStoreRDDInfo.schema,
singleStoreRDDInfo.variables,
singleStoreRDDInfo.materialized,
singleStoreRDDInfo.needsRepartition,
singleStoreRDDInfo.repartitionColumns
)
} catch {
// Cancel execution if we failed to create a result table
case _: SQLException => singleStoreRDDInfo.sc.cancelStage(stageId)
}
})
}
})
Expand All @@ -102,7 +108,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
if (rddInfo.name.startsWith("SingleStoreRDD")) {
val stageId = stageCompleted.stageInfo.stageId
val attemptNumber = stageCompleted.stageInfo.attemptNumber()
val randHex = rddInfo.name.substring("SingleStoreRDD".size)
val randHex = rddInfo.name.substring("SingleStoreRDD".size)
val tableName =
JdbcHelpers.getResultTableName(applicationId, stageId, rddInfo.id, attemptNumber, randHex)

Expand Down

0 comments on commit 512bf68

Please sign in to comment.