Skip to content

Commit

Permalink
Archive registrar skeleton (#2532)
Browse files Browse the repository at this point in the history
* playing around with verification and upload in a worksheet

* rearrange, add skeleton test

* Make dependencies a bit more sensible

* wip

* adds missing files

* fixed downloader

* working with streams for upload/download

* end to end stream for single file

* move flows out to own files

* clean up a bit

* adds zip downloader

* flows everywhere

* add dockerfile and empty config

* thinking about config

* simple example using scallop for config

* slowly getting di working with config

* struggling towards running app

* connect flow to app

* update feature test

* adds gitignore excluded files

* simple end to end

* sort config

* add infra

* use new libs

* file prefix too short fix

* md5 -> sha256

* [ci skip] fix config

* drop some fat logs

* [ci skip] ignore .

* permissions and logging around download verification

* dont log success erroneously

* [ci skip] sha 256 everywhere

* [ci skip] better feature test

* removed graph dsl where possible (misuse of zip)

* script to trigger archive

* test with local zip

* better logs & fixed missing bits

* add models

* Apply auto-formatting rules

* use local paths

* default digest delimeter to ' +'

* split complex flow, test for continuation

* files

* Apply auto-formatting rules

* delete moved files

* delete moved files

* Apply auto-formatting rules

* bagname

* cleaning up a bit

* wip

* [ci skip] large bag test

* update for S3 dependency

* Apply auto-formatting rules

* log message processing

* Apply auto-formatting rules

* separate archiver

* Apply auto-formatting rules

* rename some flows

* Apply auto-formatting rules

* vhs terraform

* Simplify some flows

* (merged asset-lookup) integrating api gw terraform

* Apply auto-formatting rules

* reduce logging in lambda

* adding role policy for archive-asset-lookup lambda to access dynamo table.

* removing clipboard error.

* removed unused dynamo gsi

* pylint corrections for trigger_archive_bag script

* extending permission of archive-asset-lookup lambda to read items from storage manifest bucket.

* tidy up tests

* Apply auto-formatting rules

* troubleshooting test

* Apply auto-formatting rules

* configuration using regexp

* Apply auto-formatting rules

* share configuration

* reset for merge

* Adds skeleton registrar service

* clean up flows a little

* turn on tests

* fix tests

* start to publish to topic

* Apply auto-formatting rules

* send sns notifiction [wip]

* Apply auto-formatting rules

* Apply auto-formatting rules

* cleaning up a bit

* asserting publish to topic

* passing notiication test

* topic config

* update sns publish policy

* simplify tests and update queue permissions

* Apply auto-formatting rules

* fmt

* Apply auto-formatting rules
  • Loading branch information
kenoir committed Aug 14, 2018
1 parent d9d5cf7 commit 9d431ba
Show file tree
Hide file tree
Showing 65 changed files with 17,539 additions and 166 deletions.
10 changes: 8 additions & 2 deletions .travis.yml
Expand Up @@ -58,7 +58,10 @@ cache:
- sierra_adapter/sierra_bib_merger/target
- sierra_adapter/sierra_item_merger/target

- archive/archiver/target
- archive/commmon/target
- archive/archivist/target
- archive/registrar/target


# Based on instructions from
# https://www.scala-sbt.org/1.0/docs/Travis-CI-with-sbt.html#Caching
Expand Down Expand Up @@ -157,7 +160,10 @@ jobs:
- env: TASK=sierra_item_merger-test

# Archive
- env: TASK=archiver-test
- env: TASK=archivist-test
- env: TASK=registrar-test
- env: TASK=archivist-test


# (not under active development)

Expand Down
4 changes: 2 additions & 2 deletions archive/Makefile
Expand Up @@ -3,8 +3,8 @@ include $(ROOT)/functions.Makefile

STACK_ROOT = archive

SBT_APPS = archiver
SBT_DOCKER_LIBRARIES =
SBT_APPS = archivist registrar
SBT_DOCKER_LIBRARIES = archive_common
SBT_NO_DOCKER_LIBRARIES =

ECS_TASKS =
Expand Down
Expand Up @@ -5,7 +5,7 @@ ADD target/universal/stage /opt/docker
USER root
RUN chown -R daemon:daemon /opt/docker

ENV project=archiver
ENV project=archivist

USER daemon
CMD ["/run.sh"]
File renamed without changes.
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver
package uk.ac.wellcome.platform.archive.archivist

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
Expand All @@ -10,19 +10,19 @@ import com.google.inject.Injector
import grizzled.slf4j.Logging
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.sns.{NotificationMessage, SNSConfig}
import uk.ac.wellcome.platform.archiver.flow.{
import uk.ac.wellcome.platform.archive.archivist.flow.{
BagArchiveCompleteFlow,
DownloadZipFileFlow,
UploadAndVerifyBagFlow
}
import uk.ac.wellcome.platform.archiver.messaging.MessageStream
import uk.ac.wellcome.platform.archiver.models.BagUploaderConfig
import uk.ac.wellcome.platform.archive.archivist.models.BagUploaderConfig
import uk.ac.wellcome.platform.archive.common.messaging.MessageStream
import uk.ac.wellcome.storage.ObjectLocation

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

trait Archiver extends Logging {
trait Archivist extends Logging {
val injector: Injector

def run() = {
Expand Down Expand Up @@ -53,7 +53,7 @@ trait Archiver extends Logging {
.log("archive verified")
.via(BagArchiveCompleteFlow(snsConfig.topicArn))

messageStream.run("archiver", workFlow)
messageStream.run("archivist", workFlow)
}

private def getObjectLocation(message: NotificationMessage) = {
Expand Down
@@ -1,12 +1,16 @@
package uk.ac.wellcome.platform.archiver
package uk.ac.wellcome.platform.archive.archivist

import com.google.inject.{Guice, Injector}
import uk.ac.wellcome.platform.archiver.modules._
import uk.ac.wellcome.platform.archive.archivist.modules.{
AppConfigModule,
ConfigModule
}
import uk.ac.wellcome.platform.archive.common.modules._

import scala.concurrent.Await
import scala.concurrent.duration._

object Main extends App with Archiver {
object Main extends App with Archivist {
override val injector: Injector = Guice.createInjector(
new AppConfigModule(args),
ConfigModule,
Expand Down
@@ -1,10 +1,16 @@
package uk.ac.wellcome.platform.archiver.config
package uk.ac.wellcome.platform.archive.archivist.config

import org.rogach.scallop.{ScallopConf, ScallopOption}
import uk.ac.wellcome.messaging.sns.SNSConfig
import uk.ac.wellcome.messaging.sqs.SQSConfig
import uk.ac.wellcome.monitoring.MetricsConfig
import uk.ac.wellcome.platform.archiver.models._
import uk.ac.wellcome.platform.archive.archivist.models._
import uk.ac.wellcome.platform.archive.common.modules.{
CloudwatchClientConfig,
S3ClientConfig,
SQSClientConfig,
SnsClientConfig
}

import scala.concurrent.duration._

Expand Down
@@ -1,11 +1,11 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.scaladsl.Source
import uk.ac.wellcome.platform.archiver.models.BagItConfig
import uk.ac.wellcome.platform.archive.archivist.models.BagItConfig
import uk.ac.wellcome.storage.ObjectLocation

object ArchiveBagFlow {
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import akka.stream.alpakka.sns.scaladsl.SnsPublisher
import akka.stream.scaladsl.Flow
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.security.MessageDigest

Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import akka.stream.Materializer
import akka.stream.alpakka.s3.scaladsl.S3Client
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.io.File
import java.util.zip.ZipFile
Expand Down Expand Up @@ -26,7 +26,7 @@ object DownloadZipFileFlow extends Logging {
objectLocation.key
)

val tmpFile = File.createTempFile("archiver", ".tmp")
val tmpFile = File.createTempFile("archivist", ".tmp")
val fileSink = FileIO.toPath(tmpFile.toPath)

Source
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

Expand All @@ -7,7 +7,10 @@ import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import grizzled.slf4j.Logging
import uk.ac.wellcome.platform.archiver.models.{BagUploaderConfig, UploadConfig}
import uk.ac.wellcome.platform.archive.archivist.models.{
BagUploaderConfig,
UploadConfig
}

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver.flow
package uk.ac.wellcome.platform.archive.archivist.flow

import java.util.zip.ZipFile

Expand Down
@@ -1,8 +1,14 @@
package uk.ac.wellcome.platform.archiver.models
package uk.ac.wellcome.platform.archive.archivist.models

import uk.ac.wellcome.messaging.sns.SNSConfig
import uk.ac.wellcome.messaging.sqs.SQSConfig
import uk.ac.wellcome.monitoring.MetricsConfig
import uk.ac.wellcome.platform.archive.common.modules.{
CloudwatchClientConfig,
S3ClientConfig,
SQSClientConfig,
SnsClientConfig
}

case class AppConfig(
s3ClientConfig: S3ClientConfig,
Expand All @@ -15,32 +21,6 @@ case class AppConfig(
metricsConfig: MetricsConfig
)

case class S3ClientConfig(
accessKey: Option[String],
secretKey: Option[String],
endpoint: Option[String],
region: String
)

case class SQSClientConfig(
accessKey: Option[String],
secretKey: Option[String],
endpoint: Option[String],
region: String
)

case class SnsClientConfig(
accessKey: Option[String],
secretKey: Option[String],
endpoint: Option[String],
region: String
)

case class CloudwatchClientConfig(
endpoint: Option[String],
region: String
)

case class UploadConfig(uploadNamespace: String,
uploadPrefix: String = "archive")

Expand Down
@@ -1,8 +1,8 @@
package uk.ac.wellcome.platform.archiver.modules
package uk.ac.wellcome.platform.archive.archivist.modules

import com.google.inject.{AbstractModule, Provides}
import grizzled.slf4j.Logging
import uk.ac.wellcome.platform.archiver.config.ArgsConfigurator
import uk.ac.wellcome.platform.archive.archivist.config.ArgsConfigurator

class AppConfigModule(val args: Array[String])
extends AbstractModule
Expand Down
@@ -1,7 +1,7 @@
package uk.ac.wellcome.platform.archiver.modules
package uk.ac.wellcome.platform.archive.archivist.modules

import com.google.inject.{AbstractModule, Provides}
import uk.ac.wellcome.platform.archiver.models.AppConfig
import uk.ac.wellcome.platform.archive.archivist.models.AppConfig

object ConfigModule extends AbstractModule {
@Provides
Expand Down
@@ -1,31 +1,36 @@
package uk.ac.wellcome.platform.archiver
package uk.ac.wellcome.platform.archive.archivist

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FunSpec, Matchers}
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.monitoring.fixtures.MetricsSenderFixture
import uk.ac.wellcome.platform.archiver.flow.{
import uk.ac.wellcome.platform.archive.archivist.flow.{
BagArchiveCompleteNotification,
BagLocation
}
import uk.ac.wellcome.storage.utils.ExtendedPatience

import uk.ac.wellcome.platform.archive.archivist.fixtures.{
Archivist => ArchivistFixture
}

import uk.ac.wellcome.json.JsonUtil._

// TODO: Test file boundaries
// TODO: Test shutdown mid-stream does not succeed

class ArchiverFeatureTest
class ArchivistFeatureTest
extends FunSpec
with Matchers
with ScalaFutures
with fixtures.Archiver
with MetricsSenderFixture
with ArchivistFixture
with ExtendedPatience {

it("downloads, uploads and verifies a BagIt bag") {
withArchiver {
case (ingestBucket, storageBucket, queuePair, topic, archiver) =>
withArchivist {
case (ingestBucket, storageBucket, queuePair, topic, archivist) =>
sendFakeBag(ingestBucket, queuePair) { validBag =>
archiver.run()
archivist.run()
eventually {
listKeysInBucket(storageBucket) should have size 27

Expand All @@ -43,10 +48,10 @@ class ArchiverFeatureTest
}

it("fails when ingesting an invalid bag") {
withArchiver {
case (ingestBucket, storageBucket, queuePair, topic, archiver) =>
withArchivist {
case (ingestBucket, storageBucket, queuePair, topic, archivist) =>
sendFakeBag(ingestBucket, queuePair, false) { invalidBag =>
archiver.run()
archivist.run()
eventually {

assertQueuePairSizes(queuePair, 0, 1)
Expand All @@ -57,10 +62,10 @@ class ArchiverFeatureTest
}

it("continues after failure") {
withArchiver {
case (ingestBucket, storageBucket, queuePair, topic, archiver) =>
withArchivist {
case (ingestBucket, storageBucket, queuePair, topic, archivist) =>
sendFakeBag(ingestBucket, queuePair) { validBag1 =>
archiver.run()
archivist.run()
sendFakeBag(ingestBucket, queuePair, false) { invalidBag1 =>
sendFakeBag(ingestBucket, queuePair) { validBag2 =>
sendFakeBag(ingestBucket, queuePair, false) { invalidBag2 =>
Expand All @@ -71,10 +76,12 @@ class ArchiverFeatureTest
assertSnsReceives(
Set(
BagArchiveCompleteNotification(
BagLocation(storageBucket.name, "archive", validBag1)
flow
.BagLocation(storageBucket.name, "archive", validBag1)
),
BagArchiveCompleteNotification(
BagLocation(storageBucket.name, "archive", validBag2)
flow
.BagLocation(storageBucket.name, "archive", validBag2)
)
),
topic
Expand Down
@@ -1,4 +1,4 @@
package uk.ac.wellcome.platform.archiver
package uk.ac.wellcome.platform.archive.archivist

import java.util.zip.ZipFile

Expand All @@ -7,17 +7,21 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FunSpec, Matchers}
import uk.ac.wellcome.platform.archiver.flow.DownloadZipFileFlow
import uk.ac.wellcome.platform.archive.archivist.flow.DownloadZipFileFlow
import uk.ac.wellcome.storage.ObjectLocation

import scala.collection.JavaConverters._
import scala.concurrent.Future

import uk.ac.wellcome.platform.archive.archivist.fixtures.{
Archivist => ArchivistFixture
}

class DownloadZipFileFlowTest
extends FunSpec
with Matchers
with ScalaFutures
with fixtures.Archiver {
with ArchivistFixture {

implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
Expand Down

0 comments on commit 9d431ba

Please sign in to comment.