Skip to content

Commit

Permalink
Loader: Do not attempt rollback when connection is already closed (close
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed May 2, 2023
1 parent 85d6a67 commit 45b9751
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object DatabricksSpec {
Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None),
None,
Config.Schedules(Nil),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Expand Down
3 changes: 2 additions & 1 deletion modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"loading": "45 minutes",
"nonLoading": "10 minutes",
"sqsVisibility": "5 minutes",
"rollbackCommit": "20 minutes"
"rollbackCommit": "20 minutes",
"connectionIsValid": "30 seconds"
},
"featureFlags": {
"addLoadTstampColumn": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ object Config {
loading: FiniteDuration,
nonLoading: FiniteDuration,
sqsVisibility: FiniteDuration,
rollbackCommit: FiniteDuration
)
rollbackCommit: FiniteDuration,
connectionIsValid: FiniteDuration
) {
def totalTimeToRollBack: FiniteDuration = rollbackCommit + connectionIsValid
}
final case class Retries(
strategy: Strategy,
attempts: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sealed trait StorageTarget extends Product with Serializable {
def password: StorageTarget.PasswordConfig
def sshTunnel: Option[StorageTarget.TunnelConfig]

def doobieCommitStrategy(rollbackCommitTimeout: FiniteDuration): Strategy = Transaction.defaultStrategy(rollbackCommitTimeout)
def doobieCommitStrategy(timeouts: Config.Timeouts): Strategy = Transaction.defaultStrategy(timeouts)

/**
* Surprisingly, for statements disallowed in transaction block we need to set autocommit
Expand Down Expand Up @@ -126,7 +126,7 @@ object StorageTarget {

override def connectionUrl: String = s"jdbc:databricks://$host:$port"

override def doobieCommitStrategy(t: FiniteDuration): Strategy = Strategy.void
override def doobieCommitStrategy(t: Config.Timeouts): Strategy = Strategy.void
override def doobieNoCommitStrategy: Strategy = Strategy.void
override def withAutoCommit = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ object StateMonitoring {
val passed = (now.toEpochMilli - lastUpdated.toEpochMilli).milli
val timeout = if (isLoading(state.loading)) timeouts.loading else timeouts.nonLoading
// Rollback/Commit can be used in all statements therefore their timeout should be added in all cases
passed > (timeout + timeouts.rollbackCommit)
passed > (timeout + timeouts.totalTimeToRollBack)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl

import scala.concurrent.duration.FiniteDuration
import cats.~>
import cats.arrow.FunctionK
import cats.implicits._

import cats.effect.{Async, Outcome, Resource, Sync}
import cats.effect.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.kernel.Spawn
import cats.effect.std.Dispatcher
import doobie._
import doobie.free.connection.{isValid => cxnIsValid, unit => cxnUnit}
import doobie.implicits._
import doobie.util.transactor.Strategy
import doobie.hikari._
Expand Down Expand Up @@ -135,13 +134,23 @@ object Transaction {
Transaction.jdbcRealInterpreter[F](target, timeouts, xa)
}

def defaultStrategy(rollbackCommitTimeout: FiniteDuration): Strategy = {
val timeoutSeconds = rollbackCommitTimeout.toSeconds.toInt
val rollback = fr"ROLLBACK".execWith {
HPS.setQueryTimeout(timeoutSeconds).flatMap(_ => HPS.executeUpdate)
}.void
def defaultStrategy(timeouts: Config.Timeouts): Strategy = {
val isValidTimeout = timeouts.connectionIsValid.toSeconds.toInt
val rollbackTimeout = timeouts.rollbackCommit.toSeconds.toInt
val commitTimeout = timeouts.nonLoading.toSeconds.toInt
val rollback = cxnIsValid(isValidTimeout).flatMap {
case true =>
// An exception happened, but the connection is still valid. Roll back.
fr"ROLLBACK".execWith {
HPS.setQueryTimeout(rollbackTimeout).flatMap(_ => HPS.executeUpdate)
}.void
case false =>
// false is expected if Hikari has evicted the connection due to a fatal exception.
// No point in doing a rollback if we don't have a valid connection.
cxnUnit
}
val commit = fr"COMMIT".execWith {
HPS.setQueryTimeout(timeoutSeconds).flatMap(_ => HPS.executeUpdate)
HPS.setQueryTimeout(commitTimeout).flatMap(_ => HPS.executeUpdate)
}.void
Strategy.default.copy(after = commit, oops = rollback)
}
Expand All @@ -157,28 +166,15 @@ object Transaction {
conn.copy(strategy0 = target.doobieNoCommitStrategy)

val DefaultTransactor: Transactor[F] =
conn.copy(strategy0 = target.doobieCommitStrategy(timeouts.rollbackCommit))
conn.copy(strategy0 = target.doobieCommitStrategy(timeouts))

new Transaction[F, ConnectionIO] {

// The start/join is a trick to fix stack traces in exceptions raised by the transaction
// See https://github.com/snowplow/snowplow-rdb-loader/issues/1045
implicit class ErrorAdaption[A](f: F[A]) {
def withErrorAdaption: F[A] =
f.start
.bracketCase(_.joinWith(Async[F].raiseError(new Exception("Transaction cancelled")))) {
case (_, Outcome.Succeeded(_) | Outcome.Errored(_)) =>
Async[F].unit
case (fiber, Outcome.Canceled()) =>
fiber.cancel.timeout(timeouts.rollbackCommit)
}
}

def transact[A](io: ConnectionIO[A]): F[A] =
DefaultTransactor.trans.apply(io).withErrorAdaption
DefaultTransactor.trans.apply(io)

def run[A](io: ConnectionIO[A]): F[A] =
NoCommitTransactor.trans.apply(io).withErrorAdaption
NoCommitTransactor.trans.apply(io)

def arrowBack: F ~> ConnectionIO =
new FunctionK[F, ConnectionIO] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object ConfigSpec {
)
val defaultSchedules: Config.Schedules =
Config.Schedules(Nil, Some(Cron.unsafeParse("0 0 0 ? * *")), Some(Cron.unsafeParse("0 0 5 ? * *")))
val exampleTimeouts: Config.Timeouts = Config.Timeouts(45.minutes, 10.minutes, 5.minutes, 20.minutes)
val exampleTimeouts: Config.Timeouts = Config.Timeouts(45.minutes, 10.minutes, 5.minutes, 20.minutes, 30.seconds)
val exampleRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, Some(10.minutes))
val exampleTempCreds = StorageTarget.LoadAuthMethod.TempCreds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class StateMonitoringSpec extends Specification {

object StateMonitoringSpec {

val Timeouts = Config.Timeouts(1.hour, 10.minutes, 5.minutes, 20.minutes)
val Timeouts = Config.Timeouts(1.hour, 10.minutes, 5.minutes, 20.minutes, 30.seconds)

def checkRun(init: State => State): (Option[String], List[String]) = {

Expand Down

0 comments on commit 45b9751

Please sign in to comment.