Skip to content

Commit

Permalink
Merge f37d1a7 into d0bec60
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Mar 28, 2023
2 parents d0bec60 + f37d1a7 commit 5127013
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ object Loader {
initQueryResult <- initQuery[F, C, I](target)
_ <- FolderMonitoring.run[F, C, I](
config.monitoring.folders,
config.readyCheck,
control.isBusy,
initQueryResult,
target.prepareAlertTable
Expand Down Expand Up @@ -109,7 +108,7 @@ object Loader {

def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f)

val blockUntilReady = initRetry(TargetCheck.blockUntilReady[F, C](config.readyCheck)) *>
val blockUntilReady = initRetry(TargetCheck.prepareTarget[F, C]) *>
Logging[F].info("Target check is completed")
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")
Expand Down Expand Up @@ -189,14 +188,16 @@ object Loader {

val setStageC: Stage => C[Unit] =
stage => Transaction[F, C].arrowBack(control.setStage(stage))
val incrementAttemptsC: C[Unit] =
Transaction[F, C].arrowBack(control.incrementAttempts)
val addFailure: Throwable => F[Boolean] =
control.addFailure(config.retryQueue)(folder)(_)

val loading: F[Unit] = backgroundCheck {
for {
start <- Clock[F].realTimeInstant
_ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void
result <- Load.load[F, C, I](config, setStageC, control.incrementAttempts, discovery, initQueryResult, target)
result <- Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target)
attempts <- control.getAndResetAttempts
_ <- result match {
case Load.LoadSuccess(ingested) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ object Environment {
Monitoring.monitoringInterpreter[F](tracker, sentry, reporters, cli.config.monitoring.webhook, httpClient, periodicMetrics)
implicit0(secretStore: SecretStore[F]) = cloudServices.secretStore
implicit0(dispatcher: Dispatcher[F]) <- Dispatcher.parallel[F]
transaction <- Transaction.interpreter[F](cli.config.storage, cli.config.timeouts)
transaction <- Transaction.interpreter[F](cli.config.storage, cli.config.timeouts, cli.config.readyCheck)
transaction <- Resource.pure(RetryingTransaction.wrap(cli.config.retries, transaction))
telemetry <- Telemetry.build[F](
cli.config.telemetry,
appName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ object FolderMonitoring {
* `shredding_complete.json` and turned into corresponding `AlertPayload`
* @param loadFrom
* list shredded folders
* @param readyCheck
* config for retry logic
* @param initQueryResult
* results of the queries sent to warehouse when application is initialized
* @param prepareAlertTable
Expand All @@ -174,7 +172,6 @@ object FolderMonitoring {
*/
def check[F[_]: MonadThrow: BlobStorage: Sleep: Transaction[*[_], C]: Logging, C[_]: DAO: Monad: LoadAuthService, I](
loadFrom: BlobStorage.Folder,
readyCheck: Config.Retries,
initQueryResult: I,
prepareAlertTable: List[Statement]
): F[List[AlertPayload]] = {
Expand All @@ -186,7 +183,7 @@ object FolderMonitoring {
} yield onlyS3Batches

for {
_ <- TargetCheck.blockUntilReady[F, C](readyCheck)
_ <- TargetCheck.prepareTarget[F, C]
onlyS3Batches <- Transaction[F, C].transact(getBatches)
foldersWithChecks <- checkShreddingComplete[F](onlyS3Batches)
} yield foldersWithChecks.map { case (folder, exists) =>
Expand Down Expand Up @@ -218,14 +215,13 @@ object FolderMonitoring {
*/
def run[F[_]: Async: BlobStorage: Transaction[*[_], C]: Logging: Monitoring: MonadThrow, C[_]: DAO: LoadAuthService: Monad, I](
foldersCheck: Option[Config.Folders],
readyCheck: Config.Retries,
isBusy: Stream[F, Boolean],
initQueryResult: I,
prepareAlertTable: List[Statement]
): Stream[F, Unit] =
foldersCheck match {
case Some(folders) =>
stream[F, C, I](folders, readyCheck, isBusy, initQueryResult, prepareAlertTable)
stream[F, C, I](folders, isBusy, initQueryResult, prepareAlertTable)
case None =>
Stream.eval[F, Unit](Logging[F].info("Configuration for monitoring.folders hasn't been provided - monitoring is disabled"))
}
Expand All @@ -237,8 +233,6 @@ object FolderMonitoring {
*
* @param folders
* configuration for folders monitoring
* @param readyCheck
* configuration for target ready check
* @param isBusy
* discrete stream signalling when folders monitoring should not work
* @param initQueryResult
Expand All @@ -248,7 +242,6 @@ object FolderMonitoring {
*/
def stream[F[_]: Transaction[*[_], C]: Async: BlobStorage: Logging: Monitoring: MonadThrow, C[_]: DAO: LoadAuthService: Monad, I](
folders: Config.Folders,
readyCheck: Config.Retries,
isBusy: Stream[F, Boolean],
initQueryResult: I,
prepareAlertTable: List[Statement]
Expand All @@ -263,7 +256,7 @@ object FolderMonitoring {
Logging[F].info("Monitoring shredded folders") *>
sinkFolders[F](folders.since, folders.until, folders.transformerOutput, outputFolder).ifM(
for {
alerts <- check[F, C, I](outputFolder, readyCheck, initQueryResult, prepareAlertTable)
alerts <- check[F, C, I](outputFolder, initQueryResult, prepareAlertTable)
_ <- alerts.traverse_ { payload =>
val warn = payload.base match {
case Some(folder) => Logging[F].warning(s"${payload.message} $folder")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl

import cats.{MonadThrow, ~>}
import cats.implicits._
import retry._

import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.loading.Retry
import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._
import com.snowplowanalytics.snowplow.rdbloader.transactors.RetryingTransactor

object RetryingTransaction {

/** A Transaction-handler that retries the io if there is an exception */
def wrap[F[_]: MonadThrow: Logging: Sleep, C[_]](
retries: Config.Retries,
inner: Transaction[F, C]
): Transaction[F, C] = {
val policy = Retry.getRetryPolicy[F](retries)
new Transaction[F, C] {

def transact[A](io: C[A]): F[A] =
withErrorAdaption(policy) {
inner.transact(io)
}

def run[A](io: C[A]): F[A] =
withErrorAdaption(policy) {
inner.run(io)
}

def arrowBack: F ~> C = inner.arrowBack
}
}

private def withErrorAdaption[F[_]: MonadThrow: Sleep: Logging, A](policy: RetryPolicy[F])(io: F[A]): F[A] =
retryingOnSomeErrors(policy, isWorthRetry.andThen(_.pure[F]), onError[F](_, _))(io)

private val isWorthRetry: Throwable => Boolean = {
case _: RetryingTransactor.ExceededRetriesException =>
// The relevant retry policy has already been applied and exceeded
false
case e =>
Retry.isWorth(e)
}

private def onError[F[_]: Logging](t: Throwable, d: RetryDetails): F[Unit] =
Logging[F].error(t)(show"Error executing transaction. $d")

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import doobie.implicits._
import doobie.util.transactor.Strategy
import doobie.hikari._
import com.zaxxer.hikari.HikariConfig
import retry.Sleep

import java.sql.SQLException
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.utils.SSH
import com.snowplowanalytics.snowplow.rdbloader.transactors.{RetryingTransactor, SSH}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.SecretStore

/**
Expand Down Expand Up @@ -62,11 +63,6 @@ trait Transaction[F[_], C[_]] {
*/
def run[A](io: C[A]): F[A]

/**
* Same as run, but narrowed down to transaction to allow migration error handling.
*/
def run_(io: C[Unit]): F[Unit]

/**
* A kind-function (`mapK`) to downcast `F` into `C` This is a very undesirable, but necessary
* hack that allows us to chain `F` effects (real side-effects) with `C` (DB) in both directions.
Expand Down Expand Up @@ -109,8 +105,9 @@ object Transaction {
ds.setDataSourceProperties(target.properties)
}

def buildPool[F[_]: Async: SecretStore](
target: StorageTarget
def buildPool[F[_]: Async: SecretStore: Logging: Sleep](
target: StorageTarget,
retries: Config.Retries
): Resource[F, Transactor[F]] =
for {
ce <- ExecutionContexts.fixedThreadPool[F](2)
Expand All @@ -123,6 +120,7 @@ object Transaction {
xa <- HikariTransactor
.newHikariTransactor[F](target.driver, target.connectionUrl, target.username, password, ce)
_ <- Resource.eval(xa.configure(configureHikari[F](target, _)))
xa <- Resource.pure(RetryingTransactor.wrap(retries, xa))
xa <- target.sshTunnel.fold(Resource.pure[F, Transactor[F]](xa))(SSH.transactor(_, xa))
} yield xa

Expand All @@ -131,11 +129,14 @@ object Transaction {
* close a JDBC connection. If connection could not be acquired, it will retry several times
* according to `retryPolicy`
*/
def interpreter[F[_]: Async: Dispatcher: Monitoring: SecretStore](
def interpreter[F[_]: Async: Dispatcher: Logging: Monitoring: SecretStore: Sleep](
target: StorageTarget,
timeouts: Config.Timeouts
timeouts: Config.Timeouts,
connectionRetries: Config.Retries
): Resource[F, Transaction[F, ConnectionIO]] =
buildPool[F](target).map(xa => Transaction.jdbcRealInterpreter[F](target, timeouts, xa))
buildPool[F](target, connectionRetries).map { xa =>
Transaction.jdbcRealInterpreter[F](target, timeouts, xa)
}

def defaultStrategy(rollbackCommitTimeout: FiniteDuration): Strategy = {
val timeoutSeconds = rollbackCommitTimeout.toSeconds.toInt
Expand Down Expand Up @@ -174,9 +175,8 @@ object Transaction {
case (fiber, Outcome.Canceled()) =>
fiber.cancel.timeout(timeouts.rollbackCommit)
}
.adaptError {
case e: SQLException => new TransactionException(s"${e.getMessage} - SqlState: ${e.getSQLState}", e)
case e => new TransactionException(e.getMessage, e)
.adaptError { case e: SQLException =>
new TransactionException(s"${e.getMessage} - SqlState: ${e.getSQLState}", e)
}
}

Expand All @@ -186,17 +186,6 @@ object Transaction {
def run[A](io: ConnectionIO[A]): F[A] =
NoCommitTransactor.trans.apply(io).withErrorAdaption

val awsColumnResizeError: String =
raw"""\[Amazon\]\(500310\) Invalid operation: cannot alter column "[^\s]+" of relation "[^\s]+", target column size should be different; - SqlState: 0A000"""

// If premigration was successful, but migration failed. It would leave the columns resized.
// This recovery makes it so resizing error would be ignored.
// Note: AWS will return 500310 error code other SQL errors (i.e. COPY errors), don't use for pattern matching.
def run_(io: ConnectionIO[Unit]): F[Unit] =
run[Unit](io).recoverWith {
case e: TransactionException if e.getMessage matches awsColumnResizeError => ().pure[F]
}

def arrowBack: F ~> ConnectionIO =
new FunctionK[F, ConnectionIO] {
def apply[A](fa: F[A]): ConnectionIO[A] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,15 @@
package com.snowplowanalytics.snowplow.rdbloader.dsl

import fs2.Stream
import retry._
import retry.syntax.all._
import cats.syntax.all._
import cats.effect.Concurrent
import cats.MonadThrow
import cats.effect.kernel.Async
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Statement
import eu.timepit.fs2cron.cron4s.Cron4sScheduler
import retry.RetryDetails.{GivingUp, WillDelayAndRetry}

import scala.concurrent.duration._

object VacuumScheduling {

def retryPolicy[F[_]: Concurrent]: RetryPolicy[F] =
RetryPolicies.fibonacciBackoff[F](1.minute) join RetryPolicies.limitRetries[F](10)

def logError[F[_]: Logging](err: Throwable, details: RetryDetails): F[Unit] = details match {

case WillDelayAndRetry(nextDelay: FiniteDuration, retriesSoFar: Int, cumulativeDelay: FiniteDuration) =>
Logging[F].warning(
s"Failed to vacuum with ${err.getMessage}. So far we have retried $retriesSoFar times over for $cumulativeDelay. Next attempt in $nextDelay."
)

case GivingUp(totalRetries: Int, totalDelay: FiniteDuration) =>
Logging[F].error(
s"Failed to vacuum with ${err.getMessage}. Giving up after $totalRetries retries after $totalDelay."
)
}

def run[F[_]: Transaction[*[_], C]: Async: Logging, C[_]: DAO: MonadThrow: Logging](
tgt: StorageTarget,
cfg: Config.Schedules
Expand All @@ -49,8 +27,9 @@ object VacuumScheduling {
Logging[C].info("initiating events vacuum") *> DAO[C].executeQuery(Statement.VacuumEvents) *> Logging[C]
.info("vacuum events complete")
)
.retryingOnAllErrors(retryPolicy[F], logError[F])
.orElse(().pure[F])
.recoverWith { case t: Throwable =>
Logging[F].error(t)("Failed to vacuum events table")
}
}
case _ => Stream.empty[F]
}
Expand All @@ -70,8 +49,9 @@ object VacuumScheduling {
Logging[C].info("initiating manifest vacuum") *> DAO[C].executeQuery(Statement.VacuumManifest) *> Logging[C]
.info("vacuum manifest complete")
)
.retryingOnAllErrors(retryPolicy[F], logError[F])
.orElse(().pure[F])
.recoverWith { case t: Throwable =>
Logging[F].error(t)("Failed to vacuum manifest table")
}
}
case _ => Stream.empty[F]
}
Expand Down
Loading

0 comments on commit 5127013

Please sign in to comment.