Skip to content

Commit

Permalink
Loader: Disable warnings on incomplete shredding for the streaming tr…
Browse files Browse the repository at this point in the history
…ansformer (close #967)
  • Loading branch information
pondzix committed May 3, 2023
1 parent b60c2b0 commit 6fc63a0
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cats.effect.kernel.{Async, Clock, Ref, Sync, Temporal}
import cats.effect.std.Semaphore
import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Folder
import doobie.util.Get
import fs2.Stream
import fs2.text.utf8
Expand Down Expand Up @@ -142,21 +143,6 @@ object FolderMonitoring {
.drain *> ref.get.map(size => size != 0)
}

/**
* Check if folders have shredding_complete.json file inside,
* i.e. checking if they were fully processed by shredder This function uses a blocking S3 call
* and expects `folders` to be a small list (usually 0-length) because external system (Redshift
* `MINUS` statements) already filtered these folders as problematic, i.e. missing in `manifest`
* table
* @param folders
* list of folders missing in `manifest` table
* @return
* same list of folders with attached `true` if the folder has `shredding_complete.json` thus
* processed, but unloaded and `false` if shredder hasn't been fully processed
*/
def checkShreddingComplete[F[_]: Applicative: BlobStorage](folders: List[BlobStorage.Folder]): F[List[(BlobStorage.Folder, Boolean)]] =
folders.traverse(folder => BlobStorage[F].keyExists(folder.withKey(ShreddingComplete)).tupleLeft(folder))

/**
* List all folders in `loadFrom`, load the list into temporary Redshift table and check if they
* exist in `manifest` table. Ones that don't exist are checked for existence of
Expand Down Expand Up @@ -185,13 +171,31 @@ object FolderMonitoring {
for {
_ <- TargetCheck.prepareTarget[F, C]
onlyS3Batches <- Transaction[F, C].transact(getBatches)
foldersWithChecks <- checkShreddingComplete[F](onlyS3Batches)
} yield foldersWithChecks.map { case (folder, exists) =>
if (exists) Monitoring.AlertPayload.warn("Unloaded batch", folder)
else Monitoring.AlertPayload.warn("Incomplete shredding", folder)
}
alerts <- createAlerts[F](onlyS3Batches)
} yield alerts
}

private def createAlerts[F[_]: Applicative: BlobStorage](folders: List[BlobStorage.Folder]): F[List[AlertPayload]] =
folders
.traverseFilter { folder =>
shreddingCompleteExistsIn(folder).ifF(
ifTrue = Some(Monitoring.AlertPayload.warn("Unloaded batch", folder)),
ifFalse = alertForIncompleteShredding(folder)
)
}

private def shreddingCompleteExistsIn[F[_]: BlobStorage](folder: Folder): F[Boolean] =
BlobStorage[F].keyExists(folder.withKey(ShreddingComplete))

private def alertForIncompleteShredding(folder: Folder): Option[AlertPayload] =
if (isProducedByBatchTransformer(folder))
Some(Monitoring.AlertPayload.warn("Incomplete shredding", folder))
else
None

private def isProducedByBatchTransformer(folder: Folder): Boolean =
folder.folderName.length == s"run=$TimePattern".length

/** Get stream of S3 folders emitted with configured interval */
def getOutputKeys[F[_]: Temporal: Functor](folders: Config.Folders): Stream[F, BlobStorage.Folder] = {
val getKey = Clock[F].realTimeInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,100 +41,36 @@ class FolderMonitoringSpec extends Specification {
import FolderMonitoringSpec._

"check" should {
"return a single element returned by MINUS statement (shredding_complete doesn't exist)" in {
implicit val jdbc: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults))
implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val aws: BlobStorage[Pure] = PureAWS.blobStorage(PureAWS.init)
implicit val logging: Logging[Pure] = PureLogging.interpreter()
implicit val loadAuthService: LoadAuthService[Pure] = PureLoadAuthService.interpreter
val loadFrom = BlobStorage.Folder.coerce("s3://bucket/shredded/")

val expectedState = TestState(
List(
PureTransaction.CommitMessage,
TestState.LogEntry.Sql(Statement.FoldersMinusManifest),
TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds, ())),
TestState.LogEntry.Sql(Statement.CreateAlertingTempTable),
TestState.LogEntry.Sql(Statement.DropAlertingTempTable),
PureTransaction.StartMessage,
TestState.LogEntry.Sql(Statement.ReadyCheck),
PureTransaction.NoTransactionMessage
),
Map()
)
val ExpectedResult = List(
Monitoring.AlertPayload(
Monitoring.Application,
Some(BlobStorage.Folder.coerce("s3://bucket/shredded/run=2021-07-09-12-30-00/")),
Severity.Warning,
"Incomplete shredding",
Map.empty
)
"return 'Incomplete shredding' alert when -> shredding_complete file doesn't exist, no UUID in folder name" in {
assertGeneratedAlerts(
unloadedFolder = "s3://bucket/shredded/run=2021-07-09-12-30-00/",
shreddingCompleteExists = false,
expectedMessages = List("Incomplete shredding")
)

val (state, result) =
FolderMonitoring
.check[Pure, Pure, Unit](
loadFrom,
(),
Target.defaultPrepareAlertTable
)
.run

state must beEqualTo(expectedState)
result must beRight.like {
case ExpectedResult => ok
case alerts => ko(s"Unexpected alerts: ${alerts.asJson.noSpaces}")
}
}

"return a single element returned by MINUS statement (shredding_complete does exist)" in {
implicit val jdbc: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults))
implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val aws: BlobStorage[Pure] = PureAWS.blobStorage(PureAWS.init.withExistingKeys)
implicit val logging: Logging[Pure] = PureLogging.interpreter()
implicit val loadAuthService: LoadAuthService[Pure] = PureLoadAuthService.interpreter
val loadFrom = BlobStorage.Folder.coerce("s3://bucket/shredded/")

val expectedState = TestState(
List(
PureTransaction.CommitMessage,
TestState.LogEntry.Sql(Statement.FoldersMinusManifest),
TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds, ())),
TestState.LogEntry.Sql(Statement.CreateAlertingTempTable),
TestState.LogEntry.Sql(Statement.DropAlertingTempTable),
PureTransaction.StartMessage,
TestState.LogEntry.Sql(Statement.ReadyCheck),
PureTransaction.NoTransactionMessage
),
Map()
)
val ExpectedResult = List(
Monitoring.AlertPayload(
Monitoring.Application,
Some(BlobStorage.Folder.coerce("s3://bucket/shredded/run=2021-07-09-12-30-00/")),
Severity.Warning,
"Unloaded batch",
Map.empty
)
"not return 'Incomplete shredding' alert when -> shredding_complete file doesn't exist, UUID in folder name" in {
assertGeneratedAlerts(
unloadedFolder = "s3://bucket/shredded/run=2021-07-09-12-30-00-b4cac3e5-9948-40e3-bd68-38abcf01cdf9/",
shreddingCompleteExists = false,
expectedMessages = List.empty
)
}

val (state, result) =
FolderMonitoring
.check[Pure, Pure, Unit](
loadFrom,
(),
Target.defaultPrepareAlertTable
)
.run
"return 'Unloaded batch' alert when -> shredding_complete file exists, no UUID in folder name" in {
assertGeneratedAlerts(
unloadedFolder = "s3://bucket/shredded/run=2021-07-09-12-30-00/",
shreddingCompleteExists = true,
expectedMessages = List("Unloaded batch")
)
}

state must beEqualTo(expectedState)
result must beRight.like {
case ExpectedResult => ok
case alerts => ko(s"Unexpected alerts: ${alerts.asJson.noSpaces}")
}
"return 'Unloaded batch' alert when -> shredding_complete file exists, UUID in folder name" in {
assertGeneratedAlerts(
unloadedFolder = "s3://bucket/shredded/run=2021-07-09-12-30-00-b4cac3e5-9948-40e3-bd68-38abcf01cdf9/",
shreddingCompleteExists = true,
expectedMessages = List("Unloaded batch")
)
}
}

Expand Down Expand Up @@ -253,14 +189,71 @@ class FolderMonitoringSpec extends Specification {
}
}
}

def assertGeneratedAlerts(
unloadedFolder: String,
shreddingCompleteExists: Boolean,
expectedMessages: List[String]
) = {
val aws = if (shreddingCompleteExists) PureAWS.init.withExistingKeys else PureAWS.init
val loadFrom = BlobStorage.Folder.coerce("s3://bucket/shredded/")
val inputFolder = BlobStorage.Folder.coerce(unloadedFolder)

implicit val jdbc: DAO[Pure] = PureDAO.interpreter(PureDAO.custom(jdbcResults(inputFolder)))
implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter
implicit val sleep: Sleep[Pure] = PureSleep.interpreter
implicit val blobStorage: BlobStorage[Pure] = PureAWS.blobStorage(aws)
implicit val logging: Logging[Pure] = PureLogging.interpreter()
implicit val loadAuthService: LoadAuthService[Pure] = PureLoadAuthService.interpreter

val expectedState = TestState(
List(
PureTransaction.CommitMessage,
TestState.LogEntry.Sql(Statement.FoldersMinusManifest),
TestState.LogEntry.Sql(Statement.FoldersCopy(loadFrom, LoadAuthMethod.NoCreds, ())),
TestState.LogEntry.Sql(Statement.CreateAlertingTempTable),
TestState.LogEntry.Sql(Statement.DropAlertingTempTable),
PureTransaction.StartMessage,
TestState.LogEntry.Sql(Statement.ReadyCheck),
PureTransaction.NoTransactionMessage
),
Map()
)
val ExpectedResult = expectedMessages
.map { message =>
Monitoring.AlertPayload(
Monitoring.Application,
Some(inputFolder),
Severity.Warning,
message,
Map.empty
)
}

val (state, result) =
FolderMonitoring
.check[Pure, Pure, Unit](
loadFrom,
(),
Target.defaultPrepareAlertTable
)
.run

state must beEqualTo(expectedState)
result must beRight.like {
case ExpectedResult => ok
case alerts => ko(s"Unexpected alerts: ${alerts.asJson.noSpaces}")
}
}

}

object FolderMonitoringSpec {
def jdbcResults(state: TestState)(statement: Statement): Any = {
def jdbcResults(folder: String)(state: TestState)(statement: Statement): Any = {
val _ = state
statement match {
case Statement.FoldersMinusManifest =>
List(BlobStorage.Folder.coerce("s3://bucket/shredded/run=2021-07-09-12-30-00/"))
List(folder)
case Statement.ReadyCheck => 1
case _ => throw new IllegalArgumentException(s"Unexpected statement $statement with ${state.getLog}")
}
Expand Down

0 comments on commit 6fc63a0

Please sign in to comment.