Skip to content

Commit

Permalink
Loader: Avoid errors for "Connection is not available" (close #1223)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Mar 25, 2023
1 parent 6075bfd commit d0bec60
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 12 deletions.
2 changes: 2 additions & 0 deletions config/loader/aws/databricks.config.reference.hocon
Expand Up @@ -195,6 +195,8 @@
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
Expand Down
2 changes: 2 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Expand Up @@ -184,6 +184,8 @@
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
Expand Down
2 changes: 2 additions & 0 deletions config/loader/aws/snowflake.config.reference.hocon
Expand Up @@ -190,6 +190,8 @@
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
Expand Down
2 changes: 2 additions & 0 deletions config/loader/gcp/databricks.config.reference.hocon
Expand Up @@ -151,6 +151,8 @@
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
Expand Down
2 changes: 2 additions & 0 deletions config/loader/gcp/snowflake.config.reference.hocon
Expand Up @@ -158,6 +158,8 @@
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Expand Up @@ -23,6 +23,7 @@
"readyCheck": {
"backoff": "15 seconds",
"strategy": "CONSTANT"
"cumulativeBound": "10 minutes"
},
"initRetries": {
"backoff": "30 seconds",
Expand Down
Expand Up @@ -25,6 +25,7 @@ import doobie._
import doobie.implicits._
import doobie.util.transactor.Strategy
import doobie.hikari._
import com.zaxxer.hikari.HikariConfig

import java.sql.SQLException
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
Expand Down Expand Up @@ -87,6 +88,27 @@ object Transaction {

def apply[F[_], C[_]](implicit ev: Transaction[F, C]): Transaction[F, C] = ev

def configureHikari[F[_]: Sync](target: StorageTarget, ds: HikariConfig): F[Unit] =
Sync[F].delay {
ds.setAutoCommit(target.withAutoCommit)
ds.setMaximumPoolSize(PoolSize)

// This disables the pool's fast failure feature. We don't need fast failure because the
// loader already handles failures to get warehouse connections.
//
// Fast failure at startup yields a whole different set of possible exceptions at startup,
// compared with failures at later stages. We disable it so that exceptions during startup
// are more consistent with exceptions encountered at later stages of running the app.
ds.setInitializationFailTimeout(-1)

// Setting this to zero prevents the pool from periodically re-connecting to the warehouse
// when a connection gets old. For Databricks, this stops the loader from re-starting the
// cluster unnecessarily when there are no events to be loaded.
ds.setMinimumIdle(0)

ds.setDataSourceProperties(target.properties)
}

def buildPool[F[_]: Async: SecretStore](
target: StorageTarget
): Resource[F, Transactor[F]] =
Expand All @@ -100,13 +122,7 @@ object Transaction {
}
xa <- HikariTransactor
.newHikariTransactor[F](target.driver, target.connectionUrl, target.username, password, ce)
_ <- Resource.eval(xa.configure { ds =>
Sync[F].delay {
ds.setAutoCommit(target.withAutoCommit)
ds.setMaximumPoolSize(PoolSize)
ds.setDataSourceProperties(target.properties)
}
})
_ <- Resource.eval(xa.configure(configureHikari[F](target, _)))
xa <- target.sshTunnel.fold(Resource.pure[F, Transactor[F]](xa))(SSH.transactor(_, xa))
} yield xa

Expand Down
Expand Up @@ -16,6 +16,8 @@ import cats.{Applicative, MonadThrow}
import cats.implicits._
import retry._

import java.sql.SQLTransientConnectionException

import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.db.Statement
import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Logging, Transaction}
Expand All @@ -37,15 +39,17 @@ object TargetCheck {
val onError = (e: Throwable, d: RetryDetails) => log(e, d)
val retryPolicy = Retry.getRetryPolicy[F](readyCheckConfig)
val fa: F[Unit] = Transaction[F, C].run(DAO[C].executeQuery[Unit](Statement.ReadyCheck)).void
retryingOnSomeErrors(retryPolicy, { t: Throwable => isWorth(t).pure[F] }, onError)(fa)
val isWorthF = isWorth.andThen(_.pure[F])
retryingOnSomeErrors(retryPolicy, isWorthF, onError)(fa)
}

def log[F[_]: Logging: Applicative](e: Throwable, d: RetryDetails): F[Unit] =
Logging[F].info(show"Target is not ready. $d") *>
Logging[F].debug(show"Caught exception during target check: ${e.toString}")

/** Check if error is worth retrying */
def isWorth(e: Throwable): Boolean =
e.toString.toLowerCase.contains("(700100) connection timeout expired. details: none") ||
e.toString.toLowerCase.contains("(500051) error processing query/statement")
def isWorth: Throwable => Boolean = {
case e: SQLTransientConnectionException if Option(e.getCause).isEmpty => true
case _ => false
}
}
Expand Up @@ -172,7 +172,7 @@ object ConfigSpec {
Config.Schedules(Nil, Some(Cron.unsafeParse("0 0 0 ? * *")), Some(Cron.unsafeParse("0 0 5 ? * *")))
val exampleTimeouts: Config.Timeouts = Config.Timeouts(45.minutes, 10.minutes, 5.minutes, 20.minutes)
val exampleRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, None)
val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, Some(10.minutes))
val exampleTempCreds = StorageTarget.LoadAuthMethod.TempCreds(
"test_role_arn",
"test_role_session_name",
Expand Down

0 comments on commit d0bec60

Please sign in to comment.