Skip to content

Commit

Permalink
another tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jul 2, 2021
1 parent da4c32e commit 64327a0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

ThisBuild / version := "1.2.0-oguzhan-206488f-8"
ThisBuild / version := "1.2.0-oguzhan-206488f-10"

lazy val root = project.in(file("."))
.aggregate(common, aws, loader, shredder, streamShredder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ object Alerting {

implicit val folderRead: Read[S3.Folder] = Read[String].map(S3.Folder.coerce)

def putAlertScanToS3[F[_]: ConcurrentEffect: Sync: Timer](s3Store: S3Store[F], shredderOutput: S3.Folder): F[S3.Key] = {
def putAlertScanToS3[F[_]: AWS: Sync](s3Store: S3Store[F], shredderOutput: S3.Folder): F[S3.Key] = {
val scanOutKey: S3.Key = shredderOutput.up(1).withKey("alert/scan.out")
Sync[F].delay(println("putAlertScanToS3 starting")) >>
AWS
.s3Interpreter[F](s3Store)
AWS[F]
.listS3(shredderOutput, recursive = false)
.map(_.key)
.evalTap(_ => Sync[F].delay(println("listing is successful")))
.intersperse(AlertingScanResultSeparator)
.through(utf8Encode[F])
.through(s3Store.put(S3Path(S3.Key.bucketName(scanOutKey), S3.Key.keyName(scanOutKey), None), overwrite = true))
.evalTap(_ => Sync[F].delay(println("putting to s3 is successful")))
.compile
.drain
.as(scanOutKey)
Expand All @@ -53,19 +54,21 @@ object Alerting {
def checkShreddingComplete[F[_]: Applicative](aws: AWS[F], folders: List[S3.Folder]): F[List[(S3.Folder, Boolean)]] =
folders.traverse(folder => aws.keyExists(folder.withKey("shredding_complete.json")).map(b => (folder, b)))

def check[F[_]: AWS: JDBC: Sync](s3Store: S3Store[F], shredderOutput: S3.Folder, redshiftConfig: StorageTarget.Redshift,
tags: Map[String, String]): LoaderAction[F, List[AlertPayload]] =
def check[F[_]: ConcurrentEffect: JDBC: Sync: Timer](s3Store: S3Store[F], shredderOutput: S3.Folder, redshiftConfig: StorageTarget.Redshift,
tags: Map[String, String]): LoaderAction[F, List[AlertPayload]] = {
implicit val aws: AWS[F] = AWS.s3Interpreter(s3Store)
for {
s3Key <- LoaderAction.liftF(putAlertScanToS3(s3Store, shredderOutput))
_ = Sync[F].delay(println(s"putAlertScanToS3 finished with s3Key: ${s3Key}"))
_ <- JDBC[F].executeUpdate(CopyFromS3ToAlertingTemp(s3Key, redshiftConfig.roleArn))
onlyS3Batches <- JDBC[F].executeQueryList[S3.Folder](AlertingTempMinusManifest(redshiftConfig.schema))
foldersWithChecks <- LoaderAction.liftF(checkShreddingComplete(implicitly[AWS[F]], onlyS3Batches))
foldersWithChecks <- LoaderAction.liftF(checkShreddingComplete(aws, onlyS3Batches))
alerts = foldersWithChecks.map{ case (folder: S3.Folder, exists: Boolean) =>
if (exists) createAlertPayload(folder, "Unloaded Batch", tags)
else createAlertPayload(folder, "Corrupted Batch", tags)
}
} yield alerts
}

def alertStream[F[_]: Async: AWS: ConcurrentEffect: ContextShift: Logging: Monitoring: Parallel: Timer](cli: CliConfig, env: Environment[F]): Stream[F, Unit] =
cli.config.monitoring.webhook match {
Expand Down

0 comments on commit 64327a0

Please sign in to comment.