Skip to content

Commit

Permalink
Fixed error handling in BatchInsertWriter
Browse files Browse the repository at this point in the history
Summary:
If we will try to abort closed connection, we will get a NullPointer exception and it will hide the actual error.
Also, added column names to the query.
**Design doc/spec**:
**Docs impact**: none
**Preliminary Reviewer(s)**:
**Final Reviewer**:

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A393

Reviewers: pmishchenko-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6543

Differential Revision: https://grizzly.internal.memcompute.com/D61847
  • Loading branch information
AdalbertMemSQL committed Mar 31, 2023
1 parent 4ca9993 commit 4eae4a3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Expand Up @@ -21,7 +21,8 @@ class BatchInsertWriterFactory(table: TableIdentifier, conf: SinglestoreOptions)
attemptNumber: Int,
isReferenceTable: Boolean,
mode: SaveMode): DataWriter[Row] = {
val queryPrefix = s"INSERT INTO ${table.quotedString} VALUES "
val columnNames = schema.map(s => SinglestoreDialect.quoteIdentifier(s.name))
val queryPrefix = s"INSERT INTO ${table.quotedString} (${columnNames.mkString(", ")}) VALUES "
val querySuffix = s" ON DUPLICATE KEY UPDATE ${conf.onDuplicateKeySQL.get}"

val rowTemplate = "(" + schema
Expand Down Expand Up @@ -101,7 +102,8 @@ class BatchInsertWriter(batchSize: Int, writeBatch: ListBuffer[Row] => Long, con

override def abort(e: Exception): Unit = {
buff = ListBuffer.empty[Row]
conn.abort(ExecutionContext.global)
conn.close()
if (!conn.isClosed) {
conn.abort(ExecutionContext.global)
}
}
}
17 changes: 17 additions & 0 deletions src/test/scala/com/singlestore/spark/BatchInsertTest.scala
Expand Up @@ -218,4 +218,21 @@ class BatchInsertTest extends IntegrationSuiteBase with BeforeAndAfterEach with
)
)
}

it("non-existing column") {
executeQueryWithLog("DROP TABLE IF EXISTS batchinsert")
executeQueryWithLog("CREATE TABLE batchinsert(id INT, name TEXT)")

df = spark.createDF(
List((5, "EBCEFGRHFED" * 100, 50)),
List(("id", IntegerType, true), ("name", StringType, true), ("age", IntegerType, true))
)

try {
insertValues("testdb.batchinsert", df, "age = age + 1", 10)
fail()
} catch {
case e: Exception if e.getMessage.contains("Unknown column 'age' in 'field list'") =>
}
}
}

0 comments on commit 4eae4a3

Please sign in to comment.