Skip to content

Commit

Permalink
Merge branch 'master' into 639_hikaricp
Browse files Browse the repository at this point in the history
  • Loading branch information
jdegoes committed Aug 13, 2022
2 parents e07707f + 34829cf commit 36ca79a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
37 changes: 28 additions & 9 deletions jdbc/src/main/scala/zio/sql/ConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,25 @@ final case class ConnectionPoolLive(

private def release(connection: ResettableConnection): UIO[Any] =
ZIO.uninterruptible {
tryRelease(connection).commit.flatMap {
case Some(handle) =>
handle.interrupted.get.tap { interrupted =>
ZSTM.when(!interrupted)(handle.promise.succeed(connection))
}.commit.flatMap { interrupted =>
ZIO.when(interrupted)(release(connection))
ZIO
.whenZIO(connection.isValid.map(!_)) {
ZIO.attempt(connection.connection.close).zipParRight(addFreshConnection).orDie
}
.flatMap { opt =>
val conn = opt match {
case Some(c) => c
case None => connection
}
case None => ZIO.unit
}
tryRelease(conn).commit.flatMap {
case Some(handle) =>
handle.interrupted.get.tap { interrupted =>
ZSTM.when(!interrupted)(handle.promise.succeed(conn))
}.commit.flatMap { interrupted =>
ZIO.when(interrupted)(release(conn))
}
case None => ZIO.unit
}
}
}

private def tryRelease(
Expand Down Expand Up @@ -171,5 +181,14 @@ final case class ConnectionPoolLive(
}

private[sql] final class ResettableConnection(val connection: Connection, resetter: Connection => Unit) {
def reset: UIO[Any] = ZIO.succeed(resetter(connection))
def reset: UIO[Any] = ZIO.succeed(resetter(connection))
def isValid: UIO[Boolean] =
ZIO
.when(!connection.isClosed) {
ZIO.succeed(connection.prepareStatement("SELECT 1"))
}
.map {
case Some(stmt) => stmt != null
case None => false
}
}
10 changes: 9 additions & 1 deletion jdbc/src/test/scala/zio/sql/ConnectionPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ object ConnectionPoolSpec extends ZIOSpecDefault {
_ <- promise.complete(ZIO.unit)
_ <- ZIO.scoped(cp.connection)
} yield assert("")(Assertion.anything)
} @@ timeout(10.seconds) @@ withLiveClock
} @@ timeout(10.seconds) @@ withLiveClock +

test("Invalid or closed fibers should be reacquired") {
for {
cp <- ZIO.service[ConnectionPool]
_ <- ZIO.replicateZIO(poolSize)(ZIO.scoped(cp.connection.map(_.close)))
conn <- ZIO.scoped(cp.connection)
} yield assert(conn.isValid(10))(Assertion.isTrue)
}
) @@ sequential
}

0 comments on commit 36ca79a

Please sign in to comment.