Skip to content
Browse files

Added #updateAndReturnGeneratedKey

  • Loading branch information...
1 parent 664c61e commit a64a53286a9bb8acf1e82beb748181d659c72403 @seratch seratch committed Jul 26, 2013
View
2 .gitignore
@@ -64,3 +64,5 @@ logs
sandbox/db
data
+
+project/idea.sbt
View
8 README.md
@@ -60,11 +60,11 @@ val future: Future[Seq[AsyncQueryResult]] = AsyncDB.withPool { implicit session
- ConnectionPoolFactory -> Heroku
- Transaction control and returned values -> Done
- implicit ExecutionContext -> Done
-- oneToX API
-- updateAndReturnGeneratedKey API
-- AsyncTx Chain
+- updateAndReturnGeneratedKey API -> Done
- Logging -> Done
-- Examples
+- oneToX API
+- AsyncTx chain
+- More examples
### License
View
1 project/Build.scala
@@ -32,6 +32,7 @@ object ScalikeJDBCAsyncProject extends Build {
scalacOptions ++= _scalacOptions,
publishMavenStyle := true,
publishArtifact in Test := false,
+ parallelExecution in Test := false,
pomIncludeRepository := { x => false },
pomExtra := _pomExtra
)
View
12 src/main/scala/scalikejdbc/async/AsyncDBSession.scala
@@ -41,6 +41,18 @@ case class AsyncDBSession(connection: AsyncConnection) extends LogSupport {
}
}
+ def updateAndReturnGeneratedKey(statement: String, parameters: Any*)(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[Long] = {
+ queryLogging(statement, parameters)
+ connection.toNonSharedConnection().flatMap { conn =>
+ conn.sendPreparedStatement(statement, parameters: _*).map { result =>
+ result.generatedKey.getOrElse {
+ throw new IllegalArgumentException(ErrorMessage.FAILED_TO_RETRIEVE_GENERATED_KEY + " SQL: '" + statement + "'")
+ }
+ }
+ }
+ }
+
def traversable[A](statement: String, parameters: Any*)(extractor: WrappedResultSet => A)(
implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[Traversable[A]] = {
queryLogging(statement, parameters)
View
14 src/main/scala/scalikejdbc/async/AsyncQueryResult.scala
@@ -15,11 +15,17 @@
*/
package scalikejdbc.async
+import scala.concurrent.Future
+
/**
* Query Result
*/
-case class AsyncQueryResult(
- rowsAffected: Option[Long],
- statusMessage: Option[String],
- rows: Option[AsyncResultSet])
+abstract class AsyncQueryResult(
+ val rowsAffected: Option[Long],
+ val statusMessage: Option[String],
+ val rows: Option[AsyncResultSet]) {
+
+ val generatedKey: Option[Long]
+
+}
View
9 src/main/scala/scalikejdbc/async/AsyncSQLs.scala
@@ -37,6 +37,15 @@ class AsyncSQLUpdate(underlying: SQLUpdate)
}
}
+class AsyncSQLUpdateAndReturnGeneratedKey(underlying: SQLUpdateWithGeneratedKey)
+ extends SQLUpdateWithGeneratedKey(underlying.statement)(underlying.parameters: _*)(1) {
+
+ def future()(implicit session: AsyncDBSession,
+ cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[Long] = {
+ session.updateAndReturnGeneratedKey(underlying.statement, underlying.parameters: _*)
+ }
+}
+
class AsyncSQLToOption[A, E <: WithExtractor](underlying: SQLToOption[A, E])
extends SQLToOption[A, E](underlying.statement)(underlying.parameters: _*)(underlying.extractor)(SQL.Output.single) {
import GeneralizedTypeConstraintsForWithExtractor._
View
29 src/main/scala/scalikejdbc/async/internal/AsyncMySQLConnection.scala
@@ -15,20 +15,23 @@
*/
package scalikejdbc.async.internal
-import com.github.mauricio.async.db._
-import scalikejdbc.async.AsyncConnection
+import com.github.mauricio.async.db.Connection
+import scala.concurrent._
+import scalikejdbc.async.{ AsyncConnection, NonSharedAsyncConnection }
-/**
- * mysql-async's DB connection
- *
- * @see https://github.com/mauricio/postgresql-async
- */
-private[scalikejdbc] case class AsyncMySQLConnection(url: String, user: String, password: String)
- extends MauricioConnectionBaseImpl
- with AsyncConnection
- with MauricioConnectionConfiguration {
+trait AsyncMySQLConnection extends AsyncConnection {
- private[scalikejdbc] val underlying: Connection = new mysql.MySQLConnection(configuration)
+ private[scalikejdbc] val underlying: Connection
-}
+ override def toNonSharedConnection()(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
+ if (this.isInstanceOf[MauricioPoolableAsyncConnection[_]]) {
+ val pool = this.asInstanceOf[MauricioPoolableAsyncConnection[Connection]].pool
+ pool.take.map(conn => new MauricioNonSharedAsyncConnection(conn, Some(pool)) with AsyncMySQLConnection)
+ } else {
+ future(new MauricioSharedAsyncConnection(this.underlying) with AsyncMySQLConnection)
+ }
+ }
+
+}
View
2 src/main/scala/scalikejdbc/async/internal/AsyncMySQLConnectionPool.scala
@@ -40,7 +40,7 @@ private[scalikejdbc] class AsyncMySQLConnectionPool(
private[this] val factory = new mysql.pool.MySQLConnectionFactory(config)
private[this] val pool = new ConnectionPool[MySQLConnection](factory, PoolConfiguration.Default)
- override def borrow(): AsyncConnection = MauricioPoolableAsyncConnection(pool)
+ override def borrow(): AsyncConnection = new MauricioPoolableAsyncConnection(pool) with AsyncMySQLConnection
override def close(): Unit = pool.disconnect
View
28 src/main/scala/scalikejdbc/async/internal/AsyncPostgreSQLConnection.scala
@@ -15,19 +15,23 @@
*/
package scalikejdbc.async.internal
-import com.github.mauricio.async.db._
-import scalikejdbc.async.AsyncConnection
+import com.github.mauricio.async.db.Connection
+import scala.concurrent._
+import scalikejdbc.async.{ AsyncConnection, NonSharedAsyncConnection }
-/**
- * postgresql-async's DB connection
- *
- * @see https://github.com/mauricio/postgresql-async
- */
-private[scalikejdbc] case class AsyncPostgreSQLConnection(url: String, user: String, password: String)
- extends MauricioConnectionBaseImpl
- with AsyncConnection
- with MauricioConnectionConfiguration {
+trait AsyncPostgreSQLConnection extends AsyncConnection {
+
+ private[scalikejdbc] val underlying: Connection
+
+ override def toNonSharedConnection()(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
- private[scalikejdbc] val underlying: Connection = new postgresql.PostgreSQLConnection(configuration)
+ if (this.isInstanceOf[MauricioPoolableAsyncConnection[_]]) {
+ val pool = this.asInstanceOf[MauricioPoolableAsyncConnection[Connection]].pool
+ pool.take.map(conn => new MauricioNonSharedAsyncConnection(conn, Some(pool)) with AsyncPostgreSQLConnection)
+ } else {
+ future(new MauricioSharedAsyncConnection(this.underlying) with AsyncPostgreSQLConnection)
+ }
+ }
}
View
2 src/main/scala/scalikejdbc/async/internal/AsyncPostgreSQLConnectionPool.scala
@@ -40,7 +40,7 @@ private[scalikejdbc] class AsyncPostgreSQLConnectionPool(
private[this] val factory = new postgresql.pool.PostgreSQLConnectionFactory(config)
private[this] val pool = new ConnectionPool[PostgreSQLConnection](factory, PoolConfiguration.Default)
- override def borrow(): AsyncConnection = MauricioPoolableAsyncConnection(pool)
+ override def borrow(): AsyncConnection = new MauricioPoolableAsyncConnection(pool) with AsyncPostgreSQLConnection
override def close(): Unit = pool.disconnect
View
60 src/main/scala/scalikejdbc/async/internal/MauricioConnectionBaseImpl.scala
@@ -19,33 +19,18 @@ import scalikejdbc.async._
import scala.concurrent._
import scala.concurrent.duration.DurationInt
import com.github.mauricio.async.db.{ QueryResult, Connection }
+import com.github.mauricio.async.db.mysql.MySQLConnection
+import com.github.mauricio.async.db.pool.ConnectionPool
/**
* Connection impl
*/
-private[scalikejdbc] trait MauricioConnectionBaseImpl { self: AsyncConnection =>
+private[scalikejdbc] trait MauricioConnectionBaseImpl extends AsyncConnection {
private[scalikejdbc] val underlying: Connection
private[scalikejdbc] val defaultTimeout = 10.seconds
/**
- * Returns a non-shared connection.
- *
- * @param cxt execution context
- * @return non-shared connection
- */
- override def toNonSharedConnection()(
- implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
-
- if (this.isInstanceOf[MauricioPoolableAsyncConnection[_]]) {
- val pool = this.asInstanceOf[MauricioPoolableAsyncConnection[Connection]].pool
- pool.take.map(conn => MauricioNonSharedAsyncConnection(conn, Some(pool)))
- } else {
- future(MauricioSharedAsyncConnection(this.underlying))
- }
- }
-
- /**
* Send a query.
*
* @param statement statement
@@ -56,11 +41,13 @@ private[scalikejdbc] trait MauricioConnectionBaseImpl { self: AsyncConnection =>
implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[AsyncQueryResult] = {
underlying.sendQuery(statement).map { queryResult =>
- AsyncQueryResult(
+ new AsyncQueryResult(
rowsAffected = Option(queryResult.rowsAffected),
statusMessage = Option(queryResult.statusMessage),
- rows = queryResult.rows.map(rows => new internal.MauricioAsyncResultSet(rows))
- )
+ rows = queryResult.rows.map(rows => new internal.MauricioAsyncResultSet(rows))) {
+
+ lazy val generatedKey = extractGeneratedKey(queryResult)
+ }
}
}
@@ -80,14 +67,35 @@ private[scalikejdbc] trait MauricioConnectionBaseImpl { self: AsyncConnection =>
else underlying.sendPreparedStatement(statement, parameters)
}
queryResultFuture.map { queryResult =>
- AsyncQueryResult(
+ new AsyncQueryResult(
rowsAffected = Option(queryResult.rowsAffected),
statusMessage = Option(queryResult.statusMessage),
- rows = queryResult.rows.flatMap { rows =>
- if (rows.isEmpty) None
- else Some(new internal.MauricioAsyncResultSet(rows))
+ rows = queryResult.rows.map(rows => new internal.MauricioAsyncResultSet(rows))) {
+
+ lazy val generatedKey = extractGeneratedKey(queryResult)
+ }
+ }
+ }
+
+ protected def extractGeneratedKey(queryResult: QueryResult)(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Option[Long] = {
+ if (!this.isInstanceOf[NonSharedAsyncConnection]) {
+ throw new IllegalStateException("This async connection must be a non-shared connenction.")
+ }
+ this match {
+ case conn: AsyncMySQLConnection =>
+ Await.result(underlying.sendQuery("SELECT LAST_INSERT_ID()").map { result =>
+ result.rows.headOption.flatMap { rows =>
+ rows.headOption.map { row => row(0).asInstanceOf[Long] }
+ }
+ }, 10.seconds)
+ case _ =>
+ queryResult.rows.headOption.flatMap { rows =>
+ rows.headOption.flatMap(row => Option(row(0)).flatMap { value =>
+ try Some(value.toString.toLong)
+ catch { case e: Exception => None }
+ })
}
- )
}
}
View
6 src/main/scala/scalikejdbc/async/internal/MauricioNonSharedAsyncConnection.scala
@@ -3,6 +3,7 @@ package scalikejdbc.async.internal
import com.github.mauricio.async.db.{ Connection => MauricioConnection }
import com.github.mauricio.async.db.pool.{ ConnectionPool => MauricioConnectionPool }
import scalikejdbc.async.NonSharedAsyncConnection
+import scala.concurrent._
/**
* Non-shared Asynchronous Connection
@@ -13,6 +14,11 @@ case class MauricioNonSharedAsyncConnection(underlying: MauricioConnection, pool
extends MauricioConnectionBaseImpl
with NonSharedAsyncConnection {
+ override def toNonSharedConnection()(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
+ future(this)
+ }
+
override def release(): Unit = pool.map(_.giveBack(this.underlying))
}
View
8 src/main/scala/scalikejdbc/async/internal/MauricioPoolableAsyncConnection.scala
@@ -16,8 +16,9 @@
package scalikejdbc.async.internal
import com.github.mauricio.async.db.pool.ConnectionPool
-import scalikejdbc.async.AsyncConnection
+import scalikejdbc.async.{ NonSharedAsyncConnection, AsyncConnection }
import com.github.mauricio.async.db.Connection
+import scala.concurrent._
/**
* AsyncConnection implementation which is based on Mauricio's Connection
@@ -29,6 +30,11 @@ private[scalikejdbc] case class MauricioPoolableAsyncConnection[T <: com.github.
extends MauricioConnectionBaseImpl
with AsyncConnection {
+ override def toNonSharedConnection()(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
+ Future.failed(new UnsupportedOperationException)
+ }
+
private[scalikejdbc] val underlying: Connection = pool
}
View
6 src/main/scala/scalikejdbc/async/internal/MauricioSharedAsyncConnection.scala
@@ -17,11 +17,17 @@ package scalikejdbc.async.internal
import scalikejdbc.async._
import com.github.mauricio.async.db.Connection
+import scala.concurrent._
case class MauricioSharedAsyncConnection(underlying: Connection)
extends NonSharedAsyncConnection
with MauricioConnectionBaseImpl {
+ override def toNonSharedConnection()(
+ implicit cxt: ExecutionContext = ExecutionContext.Implicits.global): Future[NonSharedAsyncConnection] = {
+ future(this)
+ }
+
override def release(): Unit = {}
}
View
35 src/main/scala/scalikejdbc/async/internal/SimpleAsyncMySQLConnection.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013 Kazuhiro Sera
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package scalikejdbc.async.internal
+
+import com.github.mauricio.async.db._
+import scalikejdbc.async.AsyncConnection
+
+/**
+ * mysql-async's DB connection
+ *
+ * @see https://github.com/mauricio/postgresql-async
+ */
+private[scalikejdbc] case class SimpleAsyncMySQLConnection(url: String, user: String, password: String)
+ extends MauricioConnectionBaseImpl
+ with AsyncConnection
+ with AsyncMySQLConnection
+ with MauricioConnectionConfiguration {
+
+ private[scalikejdbc] val underlying: Connection = new mysql.MySQLConnection(configuration)
+
+}
+
View
35 src/main/scala/scalikejdbc/async/internal/SimpleAsyncPostgreSQLConnection.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013 Kazuhiro Sera
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package scalikejdbc.async.internal
+
+import com.github.mauricio.async.db._
+import scalikejdbc.async.AsyncConnection
+
+/**
+ * mysql-async's DB connection
+ *
+ * @see https://github.com/mauricio/postgresql-async
+ */
+private[scalikejdbc] case class SimpleAsyncPostgreSQLConnection(url: String, user: String, password: String)
+ extends MauricioConnectionBaseImpl
+ with AsyncConnection
+ with AsyncPostgreSQLConnection
+ with MauricioConnectionConfiguration {
+
+ private[scalikejdbc] val underlying: Connection = new postgresql.PostgreSQLConnection(configuration)
+
+}
+
View
4 src/main/scala/scalikejdbc/async/package.scala
@@ -10,6 +10,10 @@ package object async {
new AsyncSQLUpdate(sql)
}
+ implicit def makeSQLUpdateAndReturnGeneratedKeyAsync(sql: SQLUpdateWithGeneratedKey): AsyncSQLUpdateAndReturnGeneratedKey = {
+ new AsyncSQLUpdateAndReturnGeneratedKey(sql)
+ }
+
implicit def makeSQLToOptionAsync[A, E <: WithExtractor](sql: SQLToOption[A, E]): AsyncSQLToOption[A, E] = {
new AsyncSQLToOption[A, E](sql)
}
View
35 src/test/scala/example/MySQLExample.scala
@@ -52,6 +52,41 @@ class MySQLExample extends FlatSpec with ShouldMatchers {
f.value.get.get.size should equal(2)
}
+ it should "return generated key" in {
+
+ val createdTime = DateTime.now.withMillisOfSecond(0)
+ val f: Future[Long] = NamedAsyncDB('mysql).withPool { implicit s =>
+ val column = AsyncLover.column
+ withSQL {
+ insert.into(AsyncLover).namedValues(
+ column.name -> "Eric",
+ column.rating -> 2,
+ column.isReactive -> false,
+ column.createdAt -> createdTime)
+ }.updateAndReturnGeneratedKey().future()
+ }
+ Await.result(f, 5.seconds)
+
+ f.value.get.isSuccess should be(true)
+ val generatedId: Long = f.value.get.get
+ log.info(s"MySQL generated key: ${generatedId}")
+
+ val ff: Future[Option[AsyncLover]] = NamedAsyncDB('mysql).withPool { implicit s =>
+ withSQL { select.from(AsyncLover as al).where.eq(al.id, generatedId) }.map(AsyncLover(al)).single.future()
+ }
+ Await.result(ff, 5.seconds)
+
+ ff.value.get.isSuccess should be(true)
+ ff.value.get.get.isDefined should be(true)
+
+ val asyncLover = ff.value.get.get.get
+ asyncLover.id should equal(generatedId)
+ asyncLover.name should equal("Eric")
+ asyncLover.rating should equal(2)
+ asyncLover.isReactive should be(false)
+ asyncLover.createdAt should equal(createdTime)
+ }
+
it should "update" in {
val f: Future[Int] = NamedAsyncDB('mysql).withPool { implicit s =>
val column = AsyncLover.column
View
35 src/test/scala/example/PostgreSQLExample.scala
@@ -51,6 +51,41 @@ class PostgreSQLExample extends FlatSpec with ShouldMatchers {
f.value.get.get.size should equal(2)
}
+ it should "return generated key" in {
+
+ val createdTime = DateTime.now.withMillisOfSecond(0)
+ val f: Future[Long] = AsyncDB.withPool { implicit s =>
+ val column = AsyncLover.column
+ withSQL {
+ insert.into(AsyncLover).namedValues(
+ column.name -> "Eric",
+ column.rating -> 2,
+ column.isReactive -> false,
+ column.createdAt -> createdTime).append(sqls"returning id")
+ }.updateAndReturnGeneratedKey().future()
+ }
+ Await.result(f, 5.seconds)
+
+ f.value.get.isSuccess should be(true)
+ val generatedId: Long = f.value.get.get
+ log.info(s"PostgreSQL generated key: ${generatedId}")
+
+ val ff: Future[Option[AsyncLover]] = AsyncDB.withPool { implicit s =>
+ withSQL { select.from(AsyncLover as al).where.eq(al.id, generatedId) }.map(AsyncLover(al)).single.future()
+ }
+ Await.result(ff, 5.seconds)
+
+ ff.value.get.isSuccess should be(true)
+ ff.value.get.get.isDefined should be(true)
+
+ val asyncLover = ff.value.get.get.get
+ asyncLover.id should equal(generatedId)
+ asyncLover.name should equal("Eric")
+ asyncLover.rating should equal(2)
+ asyncLover.isReactive should be(false)
+ asyncLover.createdAt should equal(createdTime)
+ }
+
it should "update" in {
val f: Future[Int] = AsyncDB.withPool { implicit s =>
val column = AsyncLover.column

0 comments on commit a64a532

Please sign in to comment.
Something went wrong with that request. Please try again.