Skip to content

Commit

Permalink
fix & improve scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jun 28, 2021
1 parent 2edaf84 commit 2ed3273
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ object S3 {

def bucketName: String =
f.split("://").last.split("/").head

def keyName: String =
f.split("://").last.split("/").tail.mkString("/")

def up(n: Int): Folder = {
val newKey = f.split("://").last.split("/").tail.dropRight(n).mkString("/")
Folder.coerce(s"s3://${f.bucketName}/${newKey}")
}
}


Expand Down Expand Up @@ -98,6 +106,12 @@ object S3 {
def join(folder: Folder, name: String): Key =
coerce(folder + name)

def bucketName(key: Key): String =
key.split("://").last.split("/").head

def keyName(key: Key): String =
key.split("://").last.split("/").tail.mkString("/")

def getParent(key: Key): Folder = {
val string = key.split("/").dropRight(1).mkString("/")
Folder.coerce(string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object Main extends IOApp {
}
}
}
case None => Stream.empty
case None => Stream.empty.covary[IO]
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, JDBC}
import com.snowplowanalytics.snowplow.rdbloader.dsl.alerts.Alerter.createAlertPayload
import doobie.util.Read
import fs2.compression.gzip
import fs2.text.utf8Encode


object Alerting {

implicit val folderRead: Read[S3.Folder] = Read[String].map(S3.Folder.coerce)

def putAlertScanToS3[F[_]: AWS: Sync](s3Store: S3Store[F], shredderOutput: S3.Folder): F[S3.Key] =
def putAlertScanToS3[F[_]: AWS: Sync](s3Store: S3Store[F], shredderOutput: S3.Folder): F[S3.Key] = {
val scanOutKey: S3.Key = shredderOutput.up(1).withKey("alert/scan.out")
AWS[F]
.listS3(shredderOutput, recursive = false)
.map(_.key)
.intersperse(AlertingScanResultSeparator)
.through(utf8Encode[F])
.through(gzip())
.through(s3Store.put(S3Path(shredderOutput, "alert/scan.txt.gz", None)))
.through(s3Store.put(S3Path(S3.Key.bucketName(scanOutKey), S3.Key.keyName(scanOutKey), None), overwrite = true))
.compile
.drain
.as(S3.Key.join(shredderOutput,"alert/scan.txt.gz" ))
.as(scanOutKey)
}

def checkShreddingComplete[F[_]: Applicative](aws: AWS[F], folders: List[S3.Folder]): F[List[(S3.Folder, Boolean)]] =
folders.traverse( folder => aws.keyExists(folder.withKey("/shredding_complete.json")).map(b => (folder, b)))
Expand Down

0 comments on commit 2ed3273

Please sign in to comment.