Skip to content

Commit

Permalink
Archiver flows cleanup (#2539)
Browse files Browse the repository at this point in the history
* clean up flows a little

* turn on tests

* fix tests

* start to publish to topic

* Apply auto-formatting rules

* send sns notifiction [wip]

* Apply auto-formatting rules

* Apply auto-formatting rules

* cleaning up a bit

* asserting publish to topic

* passing notiication test

* topic config

* update sns publish policy

* simplify tests and update queue permissions

* fmt

* Apply auto-formatting rules

* allow dupe messages with set uniqueness
  • Loading branch information
kenoir committed Aug 14, 2018
1 parent 58ab580 commit d9d5cf7
Show file tree
Hide file tree
Showing 33 changed files with 682 additions and 308 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Expand Up @@ -58,6 +58,8 @@ cache:
- sierra_adapter/sierra_bib_merger/target
- sierra_adapter/sierra_item_merger/target

- archive/archiver/target

# Based on instructions from
# https://www.scala-sbt.org/1.0/docs/Travis-CI-with-sbt.html#Caching
before_cache:
Expand Down Expand Up @@ -154,6 +156,9 @@ jobs:
- env: TASK=sierra_bib_merger-test
- env: TASK=sierra_item_merger-test

# Archive
- env: TASK=archiver-test

# (not under active development)

# nginx stack
Expand Down
Expand Up @@ -5,17 +5,22 @@ import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.scaladsl.Flow
import com.amazonaws.services.sns.AmazonSNSAsync
import com.google.inject.Injector
import grizzled.slf4j.Logging
import uk.ac.wellcome.messaging.sns.NotificationMessage
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.sns.{NotificationMessage, SNSConfig}
import uk.ac.wellcome.platform.archiver.flow.{
BagLocationFromNotificationFlow,
BagArchiveCompleteFlow,
DownloadZipFileFlow,
UploadAndVerifyBagFlow
}
import uk.ac.wellcome.platform.archiver.messaging.MessageStream
import uk.ac.wellcome.platform.archiver.models.BagUploaderConfig
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.storage.ObjectLocation

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

trait Archiver extends Logging {
val injector: Injector
Expand All @@ -26,21 +31,38 @@ trait Archiver extends Logging {
val bagUploaderConfig = injector.getInstance(classOf[BagUploaderConfig])

implicit val s3Client: S3Client = injector.getInstance(classOf[S3Client])
implicit val snsClient: AmazonSNSAsync =
injector.getInstance(classOf[AmazonSNSAsync])
implicit val actorSystem: ActorSystem =
injector.getInstance(classOf[ActorSystem])
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val adapter: LoggingAdapter =
Logging(actorSystem.eventStream, "customLogger")

val workFlow = Flow[NotificationMessage]
.log("notification")
.via(BagLocationFromNotificationFlow())
.log("download notice")
.via(DownloadZipFileFlow())
.log("download zip")
.via(UploadAndVerifyBagFlow(bagUploaderConfig))
.log("archive verified")
val snsConfig = injector.getInstance(classOf[SNSConfig])

val workFlow =
Flow[NotificationMessage]
.log("notification message")
.map(getObjectLocation)
.log("download location")
.via(DownloadZipFileFlow())
.log("download zip")
.via(UploadAndVerifyBagFlow(bagUploaderConfig))
.log("archive verified")
.via(BagArchiveCompleteFlow(snsConfig.topicArn))

messageStream.run("archiver", workFlow)
}

private def getObjectLocation(message: NotificationMessage) = {
fromJson[ObjectLocation](message.Message) match {
case Success(location) => location
case Failure(e) =>
throw new RuntimeException(
s"Failed to get object location from notification: ${e.getMessage}"
)
}
}
}
Expand Up @@ -14,6 +14,7 @@ object Main extends App with Archiver {
AkkaS3ClientModule,
CloudWatchClientModule,
SQSClientModule,
SNSAsyncClientModule,
MessageStreamModule
)

Expand Down
@@ -1,6 +1,7 @@
package uk.ac.wellcome.platform.archiver.config

import org.rogach.scallop.{ScallopConf, ScallopOption}
import uk.ac.wellcome.messaging.sns.SNSConfig
import uk.ac.wellcome.messaging.sqs.SQSConfig
import uk.ac.wellcome.monitoring.MetricsConfig
import uk.ac.wellcome.platform.archiver.models._
Expand All @@ -19,9 +20,15 @@ class ArgsConfigurator(arguments: Seq[String]) extends ScallopConf(arguments) {
val awsSqsRegion = opt[String](default = Some("eu-west-1"))
val awsSqsEndpoint = opt[String]()

val awsSnsAccessKey = opt[String]()
val awsSnsSecretKey = opt[String]()
val awsSnsRegion = opt[String](default = Some("eu-west-1"))
val awsSnsEndpoint = opt[String]()

val awsCloudwatchRegion = opt[String](default = Some("eu-west-1"))
val awsCloudwatchEndpoint = opt[String]()

val topicArn: ScallopOption[String] = opt[String](required = true)
val sqsQueueUrl: ScallopOption[String] = opt[String](required = true)
val sqsWaitTimeSeconds = opt[Int](required = true, default = Some(20))
val sqsMaxMessages = opt[Int](required = true, default = Some(10))
Expand All @@ -38,9 +45,13 @@ class ArgsConfigurator(arguments: Seq[String]) extends ScallopConf(arguments) {
verify()

val bagUploaderConfig = BagUploaderConfig(
uploadNamespace = uploadNamespace(),
uploadPrefix = uploadPrefix(),
digestDelimiterRegexp = digestDelimiterRegexp()
uploadConfig = UploadConfig(
uploadNamespace = uploadNamespace(),
uploadPrefix = uploadPrefix()
),
bagItConfig = BagItConfig(
digestDelimiterRegexp = digestDelimiterRegexp()
)
)

val s3ClientConfig = S3ClientConfig(
Expand Down Expand Up @@ -69,6 +80,17 @@ class ArgsConfigurator(arguments: Seq[String]) extends ScallopConf(arguments) {
sqsParallelism()
)

val snsClientConfig = SnsClientConfig(
accessKey = awsSnsAccessKey.toOption,
secretKey = awsSnsSecretKey.toOption,
region = awsSnsRegion(),
endpoint = awsSnsEndpoint.toOption
)

val snsConfig = SNSConfig(
topicArn(),
)

val metricsConfig = MetricsConfig(
namespace = metricsNamespace(),
flushInterval = metricsFlushIntervalSeconds() seconds
Expand All @@ -80,6 +102,8 @@ class ArgsConfigurator(arguments: Seq[String]) extends ScallopConf(arguments) {
cloudwatchClientConfig,
sqsClientConfig,
sqsConfig,
snsClientConfig,
snsConfig,
metricsConfig
)
}
@@ -0,0 +1,39 @@
package uk.ac.wellcome.platform.archiver.flow

import java.util.zip.ZipFile

import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.scaladsl.Source
import uk.ac.wellcome.platform.archiver.models.BagItConfig
import uk.ac.wellcome.storage.ObjectLocation

object ArchiveBagFlow {

def apply(zipFile: ZipFile, bagLocation: BagLocation, config: BagItConfig)(
implicit
materializer: ActorMaterializer,
s3Client: S3Client
) = {

val bagDigestItemFlow = BagDigestItemFlow(config.digestDelimiterRegexp)
val archiveItemFlow = ArchiveItemFlow()
val digestLocations = digestNames(bagLocation.bagName, config.digestNames)

Source
.fromIterator(() => digestLocations)
.log("digest location")
.map((_, bagLocation.bagName, zipFile))
.via(bagDigestItemFlow)
.log("bag digest item")
.map((bagLocation, _, zipFile))
.via(archiveItemFlow)
}

private def digestNames(bagName: BagName, digestNames: List[String]) =
digestNames
.map(digestName => {
ObjectLocation(bagName.value, digestName)
})
.toIterator
}
Expand Up @@ -7,32 +7,29 @@ import akka.stream.alpakka.s3.scaladsl.{MultipartUploadResult, S3Client}
import akka.stream.scaladsl.{Flow, Source}
import akka.{Done, NotUsed}
import grizzled.slf4j.Logging
import uk.ac.wellcome.platform.archiver.models.BagUploaderConfig
import uk.ac.wellcome.storage.ObjectLocation

object ArchiveItemFlow extends Logging {
def apply(config: BagUploaderConfig)(
def apply()(
implicit s3Client: S3Client,
materializer: ActorMaterializer
): Flow[(BagDigestItem, ZipFile), Done, NotUsed] = {
): Flow[(BagLocation, BagDigestItem, ZipFile), Done, NotUsed] = {

val uploadVerificationFlow = UploadVerificationFlow(config)
val uploadVerificationFlow = UploadVerificationFlow()
val downloadVerification = DownloadVerificationFlow()

Flow[(BagDigestItem, ZipFile)].flatMapConcat {
case (bagDigestItem, zipFile) =>
Flow[(BagLocation, BagDigestItem, ZipFile)].flatMapConcat {
case (bagLocation, bagDigestItem, zipFile) =>
Source
.single((bagDigestItem, zipFile))
.single((bagLocation, bagDigestItem, zipFile))
.log("uploading and verifying")
.via(uploadVerificationFlow)
.log("upload verified")
.map {
case MultipartUploadResult(_, bucket, key, _, _) =>
ObjectLocation(bucket, key)
(ObjectLocation(bucket, key), bagDigestItem.checksum)
}
.log("upload location")
.map(objectLocation => (objectLocation, bagDigestItem.checksum))
.log("downloading to complete verification")
.via(downloadVerification)
.log("download verified")
}
Expand Down
@@ -0,0 +1,31 @@
package uk.ac.wellcome.platform.archiver.flow

import akka.stream.alpakka.sns.scaladsl.SnsPublisher
import akka.stream.scaladsl.Flow
import com.amazonaws.services.sns.AmazonSNSAsync
import uk.ac.wellcome.json.JsonUtil._

import scala.util.{Failure, Success}

object BagArchiveCompleteFlow {
def apply(topicArn: String)(implicit snsClient: AmazonSNSAsync) =
Flow[BagLocation]
.map(createNotification)
.log("created notification")
.map(toJson(_))
.map {
case Success(json) => json
case Failure(e) => throw e
}
.log("notification serialised")
.via(SnsPublisher.flow(topicArn))
.log("published notification")

def createNotification(bagLocation: BagLocation) =
BagArchiveCompleteNotification(bagLocation)

}

case class BagArchiveCompleteNotification(
bagLocation: BagLocation
)
Expand Up @@ -2,36 +2,53 @@ package uk.ac.wellcome.platform.archiver.flow

import java.util.zip.ZipFile

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.scaladsl.{Flow, Framing, Source}
import akka.util.ByteString
import grizzled.slf4j.Logging
import uk.ac.wellcome.platform.archiver.models.BagUploaderConfig
import uk.ac.wellcome.storage.ObjectLocation

object BagDigestItemFlow extends Logging {
def apply(config: BagUploaderConfig)
: Flow[(ObjectLocation, BagName, ZipFile), BagDigestItem, NotUsed] = {

val fileSplitterFlow = FileSplitterFlow(config)
val framingDelimiter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 1024,
allowTruncation = true
)

def apply(digestDelimiter: String) =
Flow[(ObjectLocation, BagName, ZipFile)]
.log("digest location")
.flatMapConcat {
case (objectLocation, bagName, zipFile) =>
Source
.single((objectLocation, zipFile))
.via(fileSplitterFlow)
.map(stringArray => (stringArray, bagName))
.map {
case (Array(checksum: String, key: String), bag) =>
BagDigestItem(checksum, ObjectLocation(bag.value, key))
case (default, bag) =>
throw MalformedBagDigestException(
default.mkString(config.digestDelimiterRegexp),
bag)
}
.via(FileExtractorFlow())
.via(framingDelimiter)
.map(_.utf8String)
.filter(_.nonEmpty)
.map(createBagDigestItems(_, bagName, digestDelimiter))
}
.log("bag digest item")

private def createBagDigestItems(
fileChunk: String,
bagName: BagName,
delimiter: String
) = {
val splitChunk = fileChunk.split(delimiter).map(_.trim)

splitChunk match {
case Array(checksum: String, key: String) =>
BagDigestItem(
checksum,
ObjectLocation(bagName.value, key)
)
case default =>
throw MalformedBagDigestException(
default.mkString(delimiter),
bagName
)
}
}
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit d9d5cf7

Please sign in to comment.