diff --git a/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/BUILD b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/BUILD new file mode 100644 index 000000000..cf12143c0 --- /dev/null +++ b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/BUILD @@ -0,0 +1,18 @@ +scala_library( + sources = ["*.scala"], + compiler_option_sets = ["fatal_warnings"], + platform = "java8", + provides = scala_artifact( + org = "com.twitter", + name = "finatra-mysql-util", + repo = artifactory, + ), + strict_deps = True, + tags = ["bazel-compatible"], + dependencies = [ + "3rdparty/jvm/joda-time", + "finagle/finagle-mysql/src/main/scala", + "finatra/inject/inject-core/src/main/scala/com/twitter/inject", + "util/util-stats", + ], +) diff --git a/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOFieldExtractors.scala b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOFieldExtractors.scala new file mode 100644 index 000000000..0e7840144 --- /dev/null +++ b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOFieldExtractors.scala @@ -0,0 +1,283 @@ +package com.twitter.finatra.mysql.util + +import com.twitter.finagle.mysql.Parameter.NullParameter +import com.twitter.finagle.mysql._ +import com.twitter.util.Future +import com.twitter.util.Return +import com.twitter.util.Throw +import com.twitter.util.Try +import com.twitter.util.{Time => TwitterTime} +import java.sql.Timestamp +import java.util.TimeZone +import org.joda.time.DateTime + +object DAOFieldExtractors extends DAOFieldExtractors { + + case class NoSuchColumnException(expression: String) + extends Exception(s"Column [$expression] not found in result set") + + case class NullValueException(expression: String) + extends Exception(s"NULL was unexpectedly returned from query for $expression") + + case class DataTypeException(expression: String, expected: String, actual: String = "") + extends Exception(s"Wrong type found for $expression: expected $expected, actual $actual") + + case class NoRowsException() extends Exception("Query returned no rows") + + case class TooManyRowsException() extends Exception("Query returned too many rows") + + case class RowCountAndMaxUpdateId(rowCount: Long, maxId: Option[Long]) + +} + +trait DAOFieldExtractors { + + import DAOFieldExtractors._ + + def selectCount( + client: Client, + sql: String, + expression: String, + params: Parameter* + ): Future[Long] = { + selectCount(client.prepare(sql), expression, params: _*) + } + + def selectCount(stmt: PreparedStatement, expression: String, params: Parameter*): Future[Long] = { + selectOne[Long](stmt, (row: Row) => extractLong(expression)(row), params: _*).flatMap { + case Some(cnt) => Future.value(cnt) + case _ => Future.exception(NoRowsException()) + } + } + + // Run a query that returns None for no rows found, or throws TooManyRowsException for more than 1 row. + def selectOne[T]( + client: Client, + sql: String, + f: (Row => T), + params: Parameter* + ): Future[Option[T]] = { + selectOne(client.prepare(sql), f, params: _*) + } + + def selectOne[T]( + stmt: PreparedStatement, + f: (Row => T), + params: Parameter* + ): Future[Option[T]] = { + stmt + .select(params: _*)(f) + .flatMap { + case head +: Nil => Future.value(Some(head)) + case Nil => Future.None + case _ => Future.exception(TooManyRowsException()) + } + } + + // Run a query that returns None for no rows found, or throws TooManyRowsException for more than 1 row. + // The row extractor function returns an Option, so this method is handy if you are selecting + // a value that may be legally be NULL. + def selectOneOption[T]( + client: Client, + sql: String, + f: (Row => Option[T]), + params: Parameter* + ): Future[Option[T]] = { + selectOneOption(client.prepare(sql), f, params: _*) + } + + def selectOneOption[T]( + stmt: PreparedStatement, + f: (Row => Option[T]), + params: Parameter* + ): Future[Option[T]] = { + stmt + .select(params: _*)(f) + .flatMap { + case head +: Nil => Future.value(head) + case Nil => Future.None + case _ => Future.exception(TooManyRowsException()) + } + } + + def asValue(maybeDateTime: Option[DateTime]): Option[Value] = { + maybeDateTime match { + case Some(dateTime) => Some(asValue(dateTime)) + case None => None + } + } + + def asValue(dateTime: DateTime): Value = { + timestampValueWithTimezone.apply(new Timestamp(dateTime.getMillis)) + } + + def asParameter[T](option: Option[T]): Parameter = { + option match { + case Some(t) => Parameter.unsafeWrap(t) + case None => NullParameter + } + } + + def extractString(expression: String)(row: Row): String = { + extractStringOption(expression)(row) match { + case Some(s) => s + case None => throw NullValueException(expression) + } + } + + def extractStringOption(expression: String)(row: Row): Option[String] = + extract[String](row, expression)({ + case StringValue(s) => s + case EmptyValue => "" + case other => throw DataTypeException(expression, "StringValue or EmptyValue", other.toString) + }) + + def extractInt(expression: String)(row: Row): Int = { + extractIntOption(expression)(row) match { + case Some(i) => i + case None => throw NullValueException(expression) + } + } + + def extractIntOption(expression: String)(row: Row): Option[Int] = + extract[Int](row, expression)({ + case IntValue(i) => i + case other => throw DataTypeException(expression, "IntValue", other.toString) + }) + + def extractLong(expression: String)(row: Row): Long = { + extractLongOption(expression)(row) match { + case Some(l) => l + case None => throw NullValueException(expression) + } + } + + def extractLongOption(expression: String)(row: Row): Option[Long] = + extract[Long](row, expression)({ + case LongValue(l) => l + case other => throw DataTypeException(expression, "LongValue", other.toString) + }) + + def extractBoolean(expression: String)(row: Row): Boolean = { + // Even tho it's a TINYINT in the database, finagle mysql is returning a Byte instead of an Int + extractByte(expression)(row) == 1 + } + + def extractBooleanOption(expression: String)(row: Row): Option[Boolean] = { + // Even tho it's a TINYINT in the database, finagle mysql is returning a Byte instead of an Int + extractByteOption(expression)(row) match { + case Some(b) => Some(b == 1) + case None => None + } + } + + def extractByte(expression: String)(row: Row): Byte = { + extractByteOption(expression)(row) match { + case Some(b) => b + case None => throw NullValueException(expression) + } + } + + def extractByteOption(expression: String)(row: Row): Option[Byte] = + extract[Byte](row, expression)({ + case ByteValue(b) => b + case other => throw DataTypeException(expression, "ByteValue", other.toString) + }) + + // Note: Returned DateTime will be in UTC + def extractDateTime(expression: String)(row: Row): DateTime = { + extractDateTimeOption(expression)(row) match { + case Some(dateTime) => dateTime + case None => throw NullValueException(expression) + } + } + + // Note: Returned DateTime will be in UTC + def extractDateTimeOption(expression: String)(row: Row): Option[DateTime] = + extract[DateTime](row, expression)({ v => + { + timestampValueWithTimezone.unapply(v) match { + case Some(timestamp) => new DateTime(timestamp.getTime) + case None => throw DataTypeException(expression, "TimestampValue", "None") + } + } + }) + + def extractTwitterDateTime(expression: String)(row: Row): TwitterTime = { + extractTwitterDateTimeOption(expression)(row) match { + case Some(dateTime) => dateTime + case None => throw NullValueException(expression) + } + } + + def extractTwitterDateTimeOption(expression: String)(row: Row): Option[TwitterTime] = + extract[TwitterTime](row, expression)({ v => + { + timestampValueWithTimezone.unapply(v) match { + case Some(timestamp: java.sql.Timestamp) => + TwitterTime(new java.util.Date(timestamp.getTime)) + case None => throw DataTypeException(expression, "TimestampValue", "None") + } + } + }) + + def extractInsertId(result: Result): Long = { + extractOk(result, _.insertId).apply() + } + + def extractAffectedRows(result: Result): Long = { + extractOk(result, _.affectedRows).apply() + } + + def extractOk[A](result: Result, f: OK => A): Try[A] = { + result match { + case ok: OK => new Return(f(ok)) + case e: Error => new Throw(new Exception(e.message)) + case unknown => + new Throw(new Exception(s"Unexpected response from Result match of type $unknown")) + } + } + + def extractResultSet[A](result: Result, f: ResultSet => A): Try[A] = { + result match { + case resultSet: ResultSet => Return(f(resultSet)) + case e: Error => Throw(new Exception(e.message)) + case unknown => + Throw(new Exception(s"Unexpected response from Result match of type $unknown")) + } + } + + def extractOkWithId[A](result: Result, f: OK => (A, Long)): Try[(A, Long)] = { + result match { + case ok: OK => new Return(f(ok)) + case e: Error => new Throw(new Exception(e.message)) + case unknown => + new Throw(new Exception(s"Unexpected response from Result match of type $unknown")) + } + } + + def extractAffectedRowsAsFuture(result: Result): Future[Long] = { + Future.const(extractOk(result, _.affectedRows)) + } + + def extractAffectedRowsWithIdsAsFuture(result: Result): Future[RowCountAndMaxUpdateId] = { + Future.const(extractOkWithId(result, { ok => (ok.affectedRows, ok.insertId) }).map { i => + RowCountAndMaxUpdateId(i._1, Some(i._2)) + }) + } + + private def extract[T](row: Row, expression: String)(f: Value => T): Option[T] = { + row(expression) match { + case None => throw NoSuchColumnException(expression) + case Some(NullValue) => None + case Some(value) => Some(f(value)) + } + } + + private def timestampValueWithTimezone = + new TimestampValue( + injectionTimeZone = TimeZone.getDefault, + extractionTimeZone = TimeZone.getTimeZone("UTC") + ) + +} diff --git a/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOLocks.scala b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOLocks.scala new file mode 100644 index 000000000..6f39b1853 --- /dev/null +++ b/mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOLocks.scala @@ -0,0 +1,137 @@ +package com.twitter.finatra.mysql.util + +import com.twitter.finagle.mysql.Client +import com.twitter.finagle.mysql.Parameter +import com.twitter.finagle.mysql.ServerError +import com.twitter.finagle.mysql.Session +import com.twitter.finagle.mysql.Transactions +import com.twitter.finagle.stats.NullStatsReceiver +import com.twitter.finagle.stats.Stat +import com.twitter.finatra.mysql.util.DAOFieldExtractors._ +import com.twitter.finatra.mysql.util.DAOLocks.AdvisoryLockException +import com.twitter.util.Future +import com.twitter.util.logging.Logging +import scala.util.control.NonFatal + +object DAOLocks extends DAOLocks { + + case object AdvisoryLockException extends Exception + +} + +trait DAOLocks extends Logging { + + def getAdvisoryLock( + client: Client, + lockName: String, + timeoutSeconds: Option[Int] = None + ): Future[Unit] = { + // timeout of 0 means fail immediately if lock is unavailable. -1 means wait forever + selectOne( + client, + "select get_lock(?,?) as value", + extractLong("value")(_), + Parameter.wrap(lockName), + Parameter.wrap(timeoutSeconds.getOrElse(0)) + ).flatMap { + case Some(1) => Future.Unit + case _ => Future.exception(AdvisoryLockException) + } + } + + def releaseAdvisoryLock(client: Client, lockName: String): Future[Unit] = { + selectOne(client, "select release_lock(?) as value", extractLong("value")(_), lockName) + .flatMap { + case Some(1) => Future.Unit + case _ => + error(s"Advisory lock $lockName cannot be released as it is not owned by the client") + Future.Unit // silently ignore this. + } + } + + def isAdvisoryLockFree(client: Client, lockName: String): Future[Boolean] = { + selectOne(client, "select is_free_lock(?) as value", extractLong("value")(_), lockName) + .flatMap { + case Some(1) => Future.value(true) + case _ => Future.value(false) + } + } + + def synchronizedTransaction[A]( + mysqlClient: Client with Transactions, + advisoryLockName: String, + timeoutSeconds: Option[Int] = None, + lockTimingStat: Stat = NullStatsReceiver.stat("nullStat") + )( + f: Client => Future[A] + ): Future[A] = { + + def discardSession(sessionClient: Client with Transactions with Session): Future[Unit] = { + sessionClient.discard() + } + + def executeTransactionInSession( + sessionClient: Client with Transactions with Session + ): Future[A] = { + sessionClient.transaction { sessionInTransaction => + f(sessionInTransaction) + .onFailure(ex => error("Failed executing f() within transaction", ex)) + } + } + + def releaseLock(sessionClient: Client with Transactions with Session): Future[Unit] = { + releaseAdvisoryLock(sessionClient, advisoryLockName) + .rescue { + case NonFatal(ex) => + // If releasing the lock fails, that is 'ok' because the inner command succeeded so we can + // return the result of the operation we actually care about, and because the release lock + // failed, we'll discard the session to ensure everything is cleaned up with respect to the + // lock. + error(s"Failed releasing lock $advisoryLockName, discarding session", ex) + discardSession(sessionClient) + } + } + + def executeWithinLock( + sessionClient: Client with Transactions with Session + )( + opInLock: Client with Transactions with Session => Future[A] + ): Future[A] = { + debug(s"Attempting to get advisory lock [$advisoryLockName]") + + val resultFuture = for { + _ <- Stat.timeFuture(lockTimingStat)( + getAdvisoryLock(sessionClient, advisoryLockName, timeoutSeconds)) + result <- opInLock(sessionClient) + _ <- releaseLock(sessionClient) + } yield result + + resultFuture.rescue { + case AdvisoryLockException => + // Failed to acquire the original lock, just log and propagate the exception + debug(s"Failed to acquire advisory lock [$advisoryLockName]") + Future.exception(AdvisoryLockException) + case ex: ServerError => + error(s"MySql command failed", ex) + releaseLock(sessionClient) + .transform(_ => Future.exception(ex)) + case NonFatal(ex) => + // Either failed because: + // - Failed to acquire lock due to network issue + // - Failed to execute transaction/query due to network issue + + error("Failed due to non-MySql server error", ex) + // Try and release the lock, and then discard the session regardless + releaseLock(sessionClient) + .flatMap(_ => discardSession(sessionClient)) + .transform(_ => Future.exception(ex)) + } + } + + mysqlClient.session { sessionClient => + executeWithinLock(sessionClient) { sessionClientInLock => + executeTransactionInSession(sessionClientInLock) + } + } + } +} diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/IgnoreMysqlHarnessIfUnavailable.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/IgnoreMysqlHarnessIfUnavailable.scala similarity index 88% rename from mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/IgnoreMysqlHarnessIfUnavailable.scala rename to mysql-client/src/test/scala/com/twitter/finatra/mysql/IgnoreMysqlHarnessIfUnavailable.scala index a8e1123c0..3adbb8619 100644 --- a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/IgnoreMysqlHarnessIfUnavailable.scala +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/IgnoreMysqlHarnessIfUnavailable.scala @@ -1,6 +1,5 @@ -package com.twitter.finatra.mysql.client.tests +package com.twitter.finatra.mysql -import com.twitter.finatra.mysql.EmbeddedMysqlServer import com.twitter.inject.Test import org.scalactic.source.Position import org.scalatest.Tag diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlHarnessTag.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/MysqlHarnessTag.scala similarity index 62% rename from mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlHarnessTag.scala rename to mysql-client/src/test/scala/com/twitter/finatra/mysql/MysqlHarnessTag.scala index 2771523f6..ba3a22c10 100644 --- a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlHarnessTag.scala +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/MysqlHarnessTag.scala @@ -1,4 +1,4 @@ -package com.twitter.finatra.mysql.client.tests +package com.twitter.finatra.mysql import org.scalatest.Tag diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/EmbeddedMysqlServerIntegrationTest.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/EmbeddedMysqlServerIntegrationTest.scala index 32fa988c4..bf6d05b7d 100644 --- a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/EmbeddedMysqlServerIntegrationTest.scala +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/EmbeddedMysqlServerIntegrationTest.scala @@ -3,6 +3,8 @@ package com.twitter.finatra.mysql.client.tests import com.twitter.finagle.mysql.ServerError import com.twitter.finagle.mysql.harness.config.User import com.twitter.finatra.mysql.EmbeddedMysqlServer +import com.twitter.finatra.mysql.IgnoreMysqlHarnessIfUnavailable +import com.twitter.finatra.mysql.MysqlHarnessTag import com.twitter.inject.Test import java.nio.file.Paths diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlClientModuleTraitTest.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlClientModuleTraitTest.scala index 955808e86..4013422de 100644 --- a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlClientModuleTraitTest.scala +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlClientModuleTraitTest.scala @@ -6,6 +6,8 @@ import com.google.inject.Singleton import com.twitter.finagle.mysql.Result import com.twitter.finagle.stats.StatsReceiver import com.twitter.finatra.mysql.EmbeddedMysqlServer +import com.twitter.finatra.mysql.IgnoreMysqlHarnessIfUnavailable +import com.twitter.finatra.mysql.MysqlHarnessTag import com.twitter.finatra.mysql.client.Config import com.twitter.finatra.mysql.client.Credentials import com.twitter.finatra.mysql.client.MysqlClient diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlTestLifecycleCompilationTest.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlTestLifecycleCompilationTest.scala index 46cfbd5cb..0298a9a59 100644 --- a/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlTestLifecycleCompilationTest.scala +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/client/tests/MysqlTestLifecycleCompilationTest.scala @@ -1,6 +1,8 @@ package com.twitter.finatra.mysql.client.tests import com.twitter.finatra.mysql.EmbeddedMysqlServer +import com.twitter.finatra.mysql.IgnoreMysqlHarnessIfUnavailable +import com.twitter.finatra.mysql.MysqlHarnessTag import com.twitter.finatra.mysql.MysqlTestLifecycle import com.twitter.inject.Test diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/BUILD.bazel b/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/BUILD.bazel new file mode 100644 index 000000000..474815753 --- /dev/null +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/BUILD.bazel @@ -0,0 +1,26 @@ +junit_tests( + sources = ["*.scala"], + compiler_option_sets = ["fatal_warnings"], + runtime_platform = "java11", + strict_deps = True, + tags = ["bazel-compatible"], + dependencies = [ + "finagle/finagle-core/src/main", + "finagle/finagle-mysql/src/main/scala", + "finagle/finagle-mysql/src/test/scala/com/twitter/finagle/mysql/harness", + "finagle/finagle-mysql/src/test/scala/com/twitter/finagle/mysql/harness/config", + "finatra/inject/inject-app/src/test/scala/com/twitter/inject/app", + "finatra/inject/inject-core/src/main/scala/com/twitter/inject", + "finatra/inject/inject-core/src/test/scala/com/twitter/inject", + "finatra/inject/inject-modules/src/main/scala/com/twitter/inject/modules", + "finatra/inject/inject-server/src/test/scala/com/twitter/inject/server", + "finatra/mysql-client/src/main/scala/com/twitter/finatra/mysql/client", + "finatra/mysql-client/src/main/scala/com/twitter/finatra/mysql/client/modules", + "finatra/mysql-client/src/main/scala/com/twitter/finatra/mysql/util", + "finatra/mysql-client/src/test/scala/com/twitter/finatra/mysql", + "util/util-mock/src/main/scala/com/twitter/util/mock", + "util/util-stats/src/main/scala/com/twitter/finagle/stats", + ], + # the below may be required when running locally on a Mac + # extra_jvm_options= ['-Djava.io.tmpdir=/tmp/mysql/extracted/'], +) diff --git a/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/DAOLocksTest.scala b/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/DAOLocksTest.scala new file mode 100644 index 000000000..aca5c9619 --- /dev/null +++ b/mysql-client/src/test/scala/com/twitter/finatra/mysql/util/tests/DAOLocksTest.scala @@ -0,0 +1,373 @@ +package com.twitter.finatra.mysql.util + +import com.twitter.finagle.mysql._ +import com.twitter.finagle.mysql.harness.config.User +import com.twitter.finatra.mysql.EmbeddedMysqlServer +import com.twitter.finatra.mysql.IgnoreMysqlHarnessIfUnavailable +import com.twitter.finatra.mysql.MysqlHarnessTag +import com.twitter.finatra.mysql.MysqlTestLifecycle +import com.twitter.finatra.mysql.client.MysqlClient +import com.twitter.finatra.mysql.util.DAOFieldExtractors._ +import com.twitter.inject.Test +import com.twitter.util.Future +import com.twitter.util.Time +import com.twitter.util.mock.Mockito +import org.joda.time.DateTime + +object DAOLocksTest { + val LockName: String = "someLockName" +} + +class DAOLocksTest + extends Test + with DAOLocks + with DAOFieldExtractors + with MysqlTestLifecycle + with IgnoreMysqlHarnessIfUnavailable + with Mockito { + + import DAOLocksTest._ + + val embeddedMysqlServer: EmbeddedMysqlServer = EmbeddedMysqlServer + .newBuilder() + .withUsers(Seq(User.Root)) + .newServer() + + lazy val mysqlClient: MysqlClient = embeddedMysqlServer.mysqlClient + lazy val mysqlClient2: MysqlClient = embeddedMysqlServer.mysqlClient + + case class Record( + id: Int, + string1: String, + string2: Option[String], + int1: Int, + int2: Option[Int], + long1: Long, + long2: Option[Long], + bool1: Boolean, + bool2: Option[Boolean], + dateTime1: DateTime, + dateTime2: Option[DateTime]) + + val string1 = "string1" + val string2 = "string2" + val int1 = 2147483647 + val int2: Int = -2147483648 + val long1 = 9223372036854775807L + val long2: Long = -9223372036854775808L + val bool1 = true + val bool2 = false + val dateTime1 = new DateTime(0) + val dateTime2 = new DateTime(1000L) + + val row1Id = 1 + val row2Id = 2 + + val record1: Record = Record( + row1Id, + string1, + Some(string2), + int1, + Some(int2), + long1, + Some(long2), + bool1, + Some(bool2), + dateTime1, + Some(dateTime2) + ) + + val record2: Record = Record( + row2Id, + string1, + None, + int1, + None, + long1, + None, + bool1, + None, + dateTime1, + None + ) + + override def beforeEach(): Unit = { + super.beforeEach() + val sql = + """ + CREATE TABLE test ( + id int(10) NOT NULL AUTO_INCREMENT, + string_required varchar(64) CHARACTER SET ascii NOT NULL, + string_optional varchar(64) CHARACTER SET ascii NULL, + int_required int NOT NULL, + int_optional int NULL, + long_required bigint NOT NULL, + long_optional bigint NULL, + bool_required tinyint NOT NULL, + bool_optional tinyint NULL, + datetime_required datetime NOT NULL, + datetime_optional datetime NULL, + PRIMARY KEY (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8 + """ + await(mysqlClient.query(sql)) + populate + } + + override def afterEach(): Unit = { + super.afterEach() + await(mysqlClient.query("drop table test")) + } + + test("DAOLocks#Mysql db tools should select a fully populated row correctly", MysqlHarnessTag) { + val sql = "select * from test where id=?" + val recordSeq: Seq[Record] = await(mysqlClient.prepare(sql).select(row1Id)(convertRow)) + recordSeq.size should equal(1) + recordSeq.head should equal(record1) + + val recordOpt = await(selectOne(mysqlClient, sql, convertRow(_), row1Id)) + recordOpt.get should equal(record1) + } + + test("DAOLocks#Mysql db tools should select a row with nulls correctly", MysqlHarnessTag) { + val sql = "select * from test where id=?" + val recordSeq: Seq[Record] = await(mysqlClient.prepare(sql).select(row2Id)(convertRow)) + recordSeq.size should equal(1) + recordSeq.head should equal(record2) + + val recordOpt = await(selectOne(mysqlClient, sql, convertRow(_), row2Id)) + recordOpt.get should equal(record2) + } + + test("DAOLocks#Mysql db tools should select count correctly", MysqlHarnessTag) { + var sql = "select count(*) as cnt from test where datetime_optional is null" + var result: Long = await(selectCount(mysqlClient, sql, "cnt")) + result should equal(1) + + sql = "select count(*) as cnt from test where id > ?" + result = await(selectCount(mysqlClient, sql, "cnt", 1000)) + result should equal(0) + } + + test( + "DAOLocks#Mysql db tools should fail when extracting null with non-null extractor", + MysqlHarnessTag) { + val sql = "select string_optional from test where id=?" + a[NullValueException] shouldBe thrownBy { + // Should be calling extractStringOption + await(selectOne(mysqlClient, sql, extractString("string_optional")(_), row2Id)) + } + } + + test( + "DAOLocks#Mysql db tools should fail when extracting non-null field with wrong type extractor", + MysqlHarnessTag) { + val sql = "select string_required from test where id=1" + a[DataTypeException] shouldBe thrownBy { + // Should be calling extractString + await(selectOne(mysqlClient, sql, extractInt("string_required")(_))) + } + } + + test( + "DAOLocks#Mysql db tools should fail when extracting nullable field with data using wrong type extractor", + MysqlHarnessTag) { + val sql = "select datetime_optional from test where id=1" + a[DataTypeException] shouldBe thrownBy { + // Should be calling extractDatetimeOption + await(selectOne(mysqlClient, sql, extractLong("datetime_optional")(_))) + } + } + + test("DAOLocks#Mysql db tools should fail when too many rows match", MysqlHarnessTag) { + val sql = "select * from test" // e.g. we forgot the where clause + a[TooManyRowsException] shouldBe thrownBy { + //noinspection ConvertibleToMethodValue + await(selectOne(mysqlClient, sql, convertRow(_))) + } + } + + test("DAOLocks#Mysql db tools should fail when extracting invalid column", MysqlHarnessTag) { + val sql = "select id from test where id=1" + a[NoSuchColumnException] shouldBe thrownBy { + await(selectOne(mysqlClient, sql, extractInt("ID")(_))) // it's case-sensitive! + } + } + + test("DAOLocks#be able to use advisory locks", MysqlHarnessTag) { + await(getAdvisoryLock(mysqlClient, LockName)) + + var boolResult = await(isAdvisoryLockFree(mysqlClient, LockName)) + boolResult shouldBe false + + await(releaseAdvisoryLock(mysqlClient, LockName)) + + boolResult = await(isAdvisoryLockFree(mysqlClient, LockName)) + boolResult shouldBe true + } + + test( + "DAOLocks#Failed mysql results in the transaction and lock being returned but no session discarding", + MysqlHarnessTag) { + a[ServerError] shouldBe thrownBy { + await { + synchronizedTransaction(mysqlClient, LockName, Some(1)) { client => + client.query("invalid sql command") + } + } + } + } + + test( + "DAOLocks#Non-MySQL errors result in releasing locks and discarding sessions", + MysqlHarnessTag) { + val mockMysqlClient = new MockMysqlClient + a[RuntimeException] shouldBe thrownBy { + await { + synchronizedTransaction(mockMysqlClient, LockName, Some(1)) { client => + Future.exception( + new RuntimeException("Something besides a mysql server error happened here")) + } + } + } + + mockMysqlClient.sessionDiscarded shouldBe true + } + + test( + "DAOLocks#MySQL Server errors result in releasing locks and not discarding sessions", + MysqlHarnessTag) { + val mockMysqlClient = new MockMysqlClient + a[ServerError] shouldBe thrownBy { + await { + synchronizedTransaction(mockMysqlClient, LockName, Some(1)) { client => + Future.exception(ServerError(0, "", "")) + } + } + } + + mockMysqlClient.sessionDiscarded shouldBe false + } + + test( + "DAOLocks#MySQL Server errors result in releasing locks and discarding sessions if close fails", + MysqlHarnessTag) { + val mockMysqlClient = new MockMysqlClient + a[ServerError] shouldBe thrownBy { + await { + synchronizedTransaction(mockMysqlClient, LockName, Some(1)) { client => + mockMysqlClient.failReleaseLock = true + Future.exception(ServerError(0, "", "")) + } + } + } + + mockMysqlClient.sessionDiscarded shouldBe true + } + + class MockMysqlClient extends Client with Transactions with Session { + @volatile var sessionDiscarded = false + @volatile var failReleaseLock = false + + override def discard(): Future[Unit] = { + sessionDiscarded = true + Future.Done + } + + override def session[T](f: Client with Transactions with Session => Future[T]): Future[T] = { + f(this) + } + + override def transaction[T](f: Client => Future[T]): Future[T] = { + f(this) + } + + private val rowFromAcquireLock = new Row { + override val fields: IndexedSeq[Field] = IndexedSeq( + Field("", "", "", "", "name", "origName", 0, 0, 0, 0, 0)) + override val values: IndexedSeq[Value] = IndexedSeq(LongValue(1)) + + override def indexOf(columnName: String): Option[Int] = Some(0) + } + + override def prepare(sql: String): PreparedStatement = new PreparedStatement { + override def apply(params: Parameter*): Future[Result] = { + if (failReleaseLock) { + Future.exception(new RuntimeException(s"Some exception releasing the lock: $sql")) + } else { + Future.value(ResultSet(rowFromAcquireLock.fields, Seq(rowFromAcquireLock))) + } + } + } + + override def select[T](sql: String)(f: Row => T): Future[Seq[T]] = ??? + + override def query(sql: String): Future[Result] = ??? + + override def close(deadline: Time): Future[Unit] = ??? + + override def cursor(sql: String): CursoredStatement = ??? + + override def ping(): Future[Unit] = ??? + + override def transactionWithIsolation[T]( + isolationLevel: IsolationLevel + )( + f: Client => Future[T] + ): Future[T] = ??? + } + + private def convertRow(row: Row): Record = { + Record( + id = extractInt("id")(row), + string1 = extractString("string_required")(row), + string2 = extractStringOption("string_optional")(row), + int1 = extractInt("int_required")(row), + int2 = extractIntOption("int_optional")(row), + long1 = extractLong("long_required")(row), + long2 = extractLongOption("long_optional")(row), + bool1 = extractBoolean("bool_required")(row), + bool2 = extractBooleanOption("bool_optional")(row), + dateTime1 = extractDateTime("datetime_required")(row), + dateTime2 = extractDateTimeOption("datetime_optional")(row) + ) + } + + private def populate: Result = { + val sql = + """ + insert into test (id, string_required, string_optional, int_required, int_optional, + long_required, long_optional, bool_required, bool_optional, datetime_required, datetime_optional) + values + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + await( + mysqlClient.prepare(sql)( + record1.id, + record1.string1, + asParameter(record1.string2), + record1.int1, + asParameter(record1.int2), + record1.long1, + asParameter(record1.long2), + record1.bool1, + asParameter(record1.bool2), + asValue(record1.dateTime1), + asParameter(asValue(record1.dateTime2)), + record2.id, + record2.string1, + asParameter(record2.string2), + record2.int1, + asParameter(record2.int2), + record2.long1, + asParameter(record2.long2), + record2.bool1, + asParameter(record2.bool2), + asValue(record2.dateTime1), + asParameter(asValue(record2.dateTime2)) + ) + ) + + } +}