diff --git a/src/main/resources/db/migration/treehub/V19__static_deltas_status_change.sql b/src/main/resources/db/migration/treehub/V19__static_deltas_status_change.sql new file mode 100644 index 0000000..dad3481 --- /dev/null +++ b/src/main/resources/db/migration/treehub/V19__static_deltas_status_change.sql @@ -0,0 +1,4 @@ +ALTER TABLE `static_deltas` +CHANGE `status` `status` +ENUM('uploading', 'available', 'deleted') NOT NULL +; \ No newline at end of file diff --git a/src/main/scala/com/advancedtelematic/data/DataType.scala b/src/main/scala/com/advancedtelematic/data/DataType.scala index 1e2376f..b8356a6 100644 --- a/src/main/scala/com/advancedtelematic/data/DataType.scala +++ b/src/main/scala/com/advancedtelematic/data/DataType.scala @@ -35,6 +35,7 @@ object DataType { case object Uploading extends Status case object Available extends Status + case object Deleted extends Status } } diff --git a/src/main/scala/com/advancedtelematic/treehub/Boot.scala b/src/main/scala/com/advancedtelematic/treehub/Boot.scala index a2ff3fb..802d451 100644 --- a/src/main/scala/com/advancedtelematic/treehub/Boot.scala +++ b/src/main/scala/com/advancedtelematic/treehub/Boot.scala @@ -13,7 +13,7 @@ import com.advancedtelematic.libats.slick.db.{BootMigrations, CheckMigrations, D import com.advancedtelematic.libats.slick.monitoring.DatabaseMetrics import com.advancedtelematic.metrics.prometheus.PrometheusMetricsSupport import com.advancedtelematic.metrics.{AkkaHttpConnectionMetrics, AkkaHttpRequestMetrics, MetricsSupport} -import com.advancedtelematic.treehub.daemon.StaleObjectArchiveActor +import com.advancedtelematic.treehub.daemon.{DeletedDeltaCleanupActor, StaleObjectArchiveActor} import com.advancedtelematic.treehub.delta_store.StaticDeltas import com.advancedtelematic.treehub.http.TreeHubRoutes import com.advancedtelematic.treehub.object_store.{LocalFsBlobStore, ObjectStore, S3BlobStore} @@ -84,6 +84,8 @@ class TreehubBoot(override val globalConfig: Config, system.actorOf(StaleObjectArchiveActor.withBackOff(objectStorage, staleObjectExpireAfter, autoStart = true), "stale-objects-archiver") } + system.actorOf(DeletedDeltaCleanupActor.withBackOff(deltaBlobStorage, autoStart = true), "deleted-deltas-cleanup") + val routes: Route = (versionHeaders(version) & requestMetrics(metricRegistry) & logResponseMetrics(projectName)) { prometheusMetricsRoutes ~ diff --git a/src/main/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActor.scala b/src/main/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActor.scala new file mode 100644 index 0000000..d5ed986 --- /dev/null +++ b/src/main/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActor.scala @@ -0,0 +1,75 @@ +package com.advancedtelematic.treehub.daemon + +import akka.actor.{Actor, ActorLogging, Props} +import akka.actor.Status.Failure +import akka.pattern.{BackoffOpts, BackoffSupervisor} +import com.advancedtelematic.data.DataType.StaticDeltaMeta +import com.advancedtelematic.treehub.daemon.DeletedDeltaCleanupActor.{Done, Tick, defaultTickInterval} +import com.advancedtelematic.treehub.db.StaticDeltaMetaRepositorySupport +import com.advancedtelematic.treehub.object_store.BlobStore +import slick.jdbc.MySQLProfile.api.* + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +object DeletedDeltaCleanupActor { + case object Tick + case class Done(count: Long, failedCount: Long) + + private val defaultTickInterval = 10.minutes + + def props(storage: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit db: Database, ec: ExecutionContext) = + Props(new DeletedDeltaCleanupActor(storage, tickInterval, autoStart)) + + def withBackOff(blobStore: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit db: Database, ec: ExecutionContext) = + BackoffSupervisor.props(BackoffOpts.onFailure(props(blobStore, tickInterval, autoStart), "deleted-deltas-worker", 10.minutes, 1.hour, 0.25)) +} + +class DeletedDeltaCleanupActor(storage: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit val db: Database, ec: ExecutionContext) extends Actor + with StaticDeltaMetaRepositorySupport + with ActorLogging { + + import akka.pattern.pipe + + import scala.async.Async.* + + override def preStart(): Unit = { + if(autoStart) + self ! Tick + } + + def cleanupDelta(delta: StaticDeltaMeta): Future[Boolean] = + storage.deleteObjects(delta.namespace, delta.id.asPrefixedPath) + .flatMap { _ => + staticDeltaMetaRepository.delete(delta.namespace, delta.id) + .map { _ => true } + } + .recover { case ex => + log.error(ex, s"Failed to delete delta ${delta.id}") + false + } + + def processAll: Future[Done] = async { + val deltas = await { staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted) } + + val failedCount = await { Future.sequence(deltas.map(cleanupDelta)) }.count(_ == false) + + Done(deltas.length, failedCount) + } + + override def receive: Receive = { + case Done(count, failedCount) => + if(count > 0) + log.info(s"Batch done, processed ${count - failedCount} objects successfully - with $failedCount failures") + else + log.debug("Tick, scheduling next execution") + + context.system.scheduler.scheduleOnce(tickInterval, self, Tick) + + case Failure(ex) => + throw ex + + case Tick => + processAll.pipeTo(sender()) + } +} \ No newline at end of file diff --git a/src/main/scala/com/advancedtelematic/treehub/db/StaticDeltaMetaRepository.scala b/src/main/scala/com/advancedtelematic/treehub/db/StaticDeltaMetaRepository.scala index 7cff177..bb85fab 100644 --- a/src/main/scala/com/advancedtelematic/treehub/db/StaticDeltaMetaRepository.scala +++ b/src/main/scala/com/advancedtelematic/treehub/db/StaticDeltaMetaRepository.scala @@ -38,11 +38,18 @@ protected class StaticDeltaMetaRepository()(implicit db: Database, ec: Execution staticDeltas .filter(_.namespace === ns) .filter(_.id === deltaId) + .filter(_.status.inSet(Seq(StaticDeltaMeta.Status.Uploading, StaticDeltaMeta.Status.Available))) // filter out if deleted? .result .headOption .failIfNone(Errors.StaticDeltaDoesNotExist) } + def findByStatus(status: StaticDeltaMeta.Status): Future[Seq[StaticDeltaMeta]] = db.run { + staticDeltas + .filter(_.status === status) + .result + } + def findAll(ns: Namespace, status: StaticDeltaMeta.Status, offset: Long, limit: Long): Future[PaginationResult[StaticDelta]] = db.run { staticDeltas .filter(_.namespace === ns) @@ -85,6 +92,14 @@ protected class StaticDeltaMetaRepository()(implicit db: Database, ec: Execution .map(_ => ()) } + def delete(ns: Namespace, id: DeltaId): Future[Unit] = db.run { + staticDeltas + .filter(_.namespace === ns) + .filter(_.id === id) + .delete + .map(_ => ()) + } + def persistIfValid(ns: Namespace, id: DeltaId, to: Commit, from: Commit, superblockHash: SuperBlockHash): Future[StaticDeltaMeta] = { val io = staticDeltas .filter(_.namespace === ns) diff --git a/src/main/scala/com/advancedtelematic/treehub/delta_store/StaticDeltas.scala b/src/main/scala/com/advancedtelematic/treehub/delta_store/StaticDeltas.scala index 2e89c32..aa1dba2 100644 --- a/src/main/scala/com/advancedtelematic/treehub/delta_store/StaticDeltas.scala +++ b/src/main/scala/com/advancedtelematic/treehub/delta_store/StaticDeltas.scala @@ -54,6 +54,15 @@ class StaticDeltas(storage: BlobStore)(implicit val db: Database, ec: ExecutionC } } + def markDeleted(ns: Namespace, deltaId: DeltaId): Future[Unit] = async { + await { + staticDeltaMetaRepository.setStatus(ns, deltaId, StaticDeltaMeta.Status.Deleted).recover { + case err => + log.error(s"could not delete object $ns/$deltaId", err) + } + } + } + def getAll(ns: Namespace, offset: Option[Long] = None, limit: Option[Long] = None): Future[PaginationResult[StaticDelta]] = staticDeltaMetaRepository.findAll(ns, StaticDeltaMeta.Status.Available, offset.orDefaultOffset, limit.orDefaultLimit) diff --git a/src/main/scala/com/advancedtelematic/treehub/http/DeltaResource.scala b/src/main/scala/com/advancedtelematic/treehub/http/DeltaResource.scala index e51c6a0..47a7c13 100644 --- a/src/main/scala/com/advancedtelematic/treehub/http/DeltaResource.scala +++ b/src/main/scala/com/advancedtelematic/treehub/http/DeltaResource.scala @@ -77,6 +77,11 @@ class DeltaResource(namespace: Directive1[Namespace], complete(f) } } ~ + (delete & path(PrefixedDeltaIdPath)) { id => + val f = staticDeltas.markDeleted(ns, id) + complete(f.map(_ => StatusCodes.Accepted)) + } + ~ (pathEnd & parameters(Symbol("from").as[Commit], Symbol("to").as[Commit])) { (from, to) => val deltaId = (from, to).toDeltaId val uri = Uri(s"/deltas/${deltaId.asPrefixedPath}") diff --git a/src/main/scala/com/advancedtelematic/treehub/object_store/BlobStore.scala b/src/main/scala/com/advancedtelematic/treehub/object_store/BlobStore.scala index 7fde79d..51eb72d 100644 --- a/src/main/scala/com/advancedtelematic/treehub/object_store/BlobStore.scala +++ b/src/main/scala/com/advancedtelematic/treehub/object_store/BlobStore.scala @@ -20,6 +20,8 @@ object BlobStore { trait BlobStore { def deleteObject(ns: Namespace, path: Path): Future[Done] + def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done] + def storeStream(namespace: Namespace, path: Path, size: Long, blob: Source[ByteString, _]): Future[Long] val supportsOutOfBandStorage: Boolean diff --git a/src/main/scala/com/advancedtelematic/treehub/object_store/LocalFsBlobStore.scala b/src/main/scala/com/advancedtelematic/treehub/object_store/LocalFsBlobStore.scala index 01c1729..1cf75a0 100644 --- a/src/main/scala/com/advancedtelematic/treehub/object_store/LocalFsBlobStore.scala +++ b/src/main/scala/com/advancedtelematic/treehub/object_store/LocalFsBlobStore.scala @@ -10,7 +10,9 @@ import com.advancedtelematic.libats.data.DataType.Namespace import com.advancedtelematic.treehub.http.Errors import org.slf4j.LoggerFactory -import java.nio.file.{Files, Path} +import java.io.IOException +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor} import scala.async.Async.{async, await} import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -85,10 +87,40 @@ class LocalFsBlobStore(root: Path)(implicit ec: ExecutionContext, mat: Materiali override def deleteObject(ns: Namespace, path: Path): Future[Done] = FastFuture.successful { val p = objectPath(ns, path) + Files.delete(p) Done } + override def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done] = FastFuture.successful { + val p = objectPath(ns, pathPrefix) + + log.info(s">>>> DELETE objects in path recursively: $p") + + Files.walkFileTree( + p, + new SimpleFileVisitor[Path] { + override def visitFile( + file: Path, + attrs: BasicFileAttributes + ): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + + override def postVisitDirectory( + dir: Path, + exc: IOException + ): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + ) + + Done + } + private def ensureDirExists(path: Path) = Try(Files.createDirectories(path.getParent)).failed.foreach { ex => log.warn(s"Could not create directories: $path", ex) diff --git a/src/main/scala/com/advancedtelematic/treehub/object_store/S3BlobStore.scala b/src/main/scala/com/advancedtelematic/treehub/object_store/S3BlobStore.scala index d2150d4..5bc256d 100644 --- a/src/main/scala/com/advancedtelematic/treehub/object_store/S3BlobStore.scala +++ b/src/main/scala/com/advancedtelematic/treehub/object_store/S3BlobStore.scala @@ -3,6 +3,7 @@ package com.advancedtelematic.treehub.object_store import akka.Done import akka.http.scaladsl.model.headers.Location import akka.http.scaladsl.model.{HttpResponse, StatusCodes, Uri} +import akka.http.scaladsl.util.FastFuture import akka.stream.Materializer import akka.stream.scaladsl.{Source, StreamConverters} import akka.util.ByteString @@ -14,6 +15,7 @@ import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider} import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.regions.Regions import com.amazonaws.services.s3.model.* +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import org.slf4j.LoggerFactory @@ -134,6 +136,39 @@ class S3BlobStore(s3Credentials: S3Credentials, s3client: AmazonS3, allowRedirec Done } } + + import scala.jdk.CollectionConverters._ + override def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done] = { + def loop(): Future[Done] = { + val listRequest = new ListObjectsRequest() + .withBucketName(bucketId) + .withPrefix(objectFilename(ns, pathPrefix)) + .withMaxKeys(100) + + val resultF = Future { + blocking(s3client.listObjects(listRequest)) + } + + resultF.flatMap { result => + val keys = result.getObjectSummaries.asScala.map(_.getKey).toList + Future { + blocking { + val deleteRequest = new DeleteObjectsRequest(bucketId) + deleteRequest.setKeys(keys.map(new KeyVersion(_)).asJava) + + s3client.deleteObjects(deleteRequest) + } + }.flatMap { _ => + if (result.isTruncated) + loop() + else + FastFuture.successful(Done) + } + } + } + + loop() + } } object S3Client { diff --git a/src/test/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActorSpec.scala b/src/test/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActorSpec.scala new file mode 100644 index 0000000..1c9a318 --- /dev/null +++ b/src/test/scala/com/advancedtelematic/treehub/daemon/DeletedDeltaCleanupActorSpec.scala @@ -0,0 +1,241 @@ +package com.advancedtelematic.treehub.daemon + +import akka.actor.{ActorRef, ActorSystem, PoisonPill} +import akka.stream.scaladsl.Source +import akka.testkit.{ImplicitSender, TestException, TestKitBase} +import akka.util.ByteString +import com.advancedtelematic.common.DigestCalculator +import com.advancedtelematic.data.DataType.{CommitTupleOps, StaticDeltaMeta, SuperBlockHash} +import com.advancedtelematic.libats.data.DataType +import com.advancedtelematic.libats.data.RefinedUtils.RefineTry +import com.advancedtelematic.libats.messaging_datatype.DataType.{Commit, ValidCommit} +import com.advancedtelematic.treehub.daemon.DeletedDeltaCleanupActor.{Done, Tick} +import com.advancedtelematic.treehub.db.StaticDeltaMetaRepositorySupport +import com.advancedtelematic.treehub.delta_store.StaticDeltas +import com.advancedtelematic.treehub.http.Errors +import com.advancedtelematic.treehub.object_store.{BlobStore, LocalFsBlobStore} +import com.advancedtelematic.util.{DatabaseSpec, LongTest, TreeHubSpec} +import eu.timepit.refined.api.RefType +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Seconds, Span} +import slick.jdbc.MySQLProfile.api.* + +import java.nio.file.{Files, Path} +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.DurationInt +import scala.util.Random + +protected abstract class DeletedDeltaCleanupActorSpecUtil extends TreeHubSpec with TestKitBase with ImplicitSender with DatabaseSpec + with StaticDeltaMetaRepositorySupport with Eventually with LongTest with BeforeAndAfterEach { + + override implicit lazy val system: ActorSystem = ActorSystem(this.getClass.getSimpleName) + + val subject: ActorRef + + override def beforeEach() = { + db.run(sqlu"delete from `static_deltas`").futureValue + } + + override def afterAll(): Unit = { + subject ! PoisonPill + } +} + +class DeletedDeltaCleanupActorSpec extends DeletedDeltaCleanupActorSpecUtil { + + import system.dispatcher + + lazy val storage = new LocalFsBlobStore(Files.createTempDirectory("deleted-deltas-store")) + lazy val deltas = new StaticDeltas(storage) + + lazy val subject = system.actorOf(DeletedDeltaCleanupActor.withBackOff(storage, tickInterval=5.seconds, autoStart = false)) + + test("deletes deleted deltas") { + val superblockHash = RefType.applyRef[SuperBlockHash](randomHash()).toOption.get + val deltaId = (randomCommit(), randomCommit()).toDeltaId + + val bytes = ByteString("some static delta data") + + // Store delta parts + deltas.store(defaultNs, deltaId, "0", Source.single(bytes), bytes.size, superblockHash).futureValue + deltas.store(defaultNs, deltaId, "superblock", Source.single(bytes), bytes.size, superblockHash).futureValue + + // Validate delta is stored to repository (DB) + val result = staticDeltaMetaRepository.find(defaultNs, deltaId).futureValue + result.id shouldBe deltaId + result.status shouldBe StaticDeltaMeta.Status.Available + + // Validate delta parts exist in storage + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("0")).futureValue shouldBe true + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("superblock")).futureValue shouldBe true + + // Mark delta for deletion + deltas.markDeleted(defaultNs, deltaId) + + // Validate delta is not available anymore + staticDeltaMetaRepository.find(defaultNs, deltaId).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + + // Validate delta is marked for deletion + val deleted = staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue + deleted.length shouldBe 1 + deleted.head.id shouldBe deltaId + + // Trigger actor action + subject ! Tick + val done = expectMsgType[Done] + done.count shouldBe deleted.length + + // Validate delta parts were deleted from storage + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("0")).futureValue shouldBe false + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("superblock")).futureValue shouldBe false + + // Validate delta was removed from repository (DB) + staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue.length shouldBe 0 + + staticDeltaMetaRepository.find(defaultNs, deltaId).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + } + + test("schedules new run when receiving Done") { + val superblockHash = RefType.applyRef[SuperBlockHash](randomHash()).toOption.get + val deltaId = (randomCommit(), randomCommit()).toDeltaId + + val bytes = ByteString("some static delta data") + + // Store delta parts + deltas.store(defaultNs, deltaId, "0", Source.single(bytes), bytes.size, superblockHash).futureValue + deltas.store(defaultNs, deltaId, "superblock", Source.single(bytes), bytes.size, superblockHash).futureValue + + subject ! Done(0, 0) + + deltas.markDeleted(defaultNs, deltaId) + + eventually({ + // Validate delta parts were deleted from storage + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("0")).futureValue shouldBe false + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("superblock")).futureValue shouldBe false + + // Validate delta was removed from repository (DB) + staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue.length shouldBe 0 + + staticDeltaMetaRepository.find(defaultNs, deltaId).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + })(patienceConfig.copy(timeout = Span(6, Seconds)), implicitly) + } + + test("does nothing to deltas that are not deleted") { + val superblockHash = RefType.applyRef[SuperBlockHash](randomHash()).toOption.get + val deltaId = (randomCommit(), randomCommit()).toDeltaId + + val bytes = ByteString("some static delta data") + + // Store delta parts + deltas.store(defaultNs, deltaId, "0", Source.single(bytes), bytes.size, superblockHash).futureValue + deltas.store(defaultNs, deltaId, "superblock", Source.single(bytes), bytes.size, superblockHash).futureValue + + // Validate delta is stored to repository (DB) + val result = staticDeltaMetaRepository.find(defaultNs, deltaId).futureValue + result.id shouldBe deltaId + result.status shouldBe StaticDeltaMeta.Status.Available + + // Validate delta parts exist in storage + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("0")).futureValue shouldBe true + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("superblock")).futureValue shouldBe true + + subject ! Tick + val done = expectMsgType[Done] + done.count shouldBe 0 + + val result1 = staticDeltaMetaRepository.find(defaultNs, deltaId).futureValue + result1.id shouldBe deltaId + result1.status shouldBe StaticDeltaMeta.Status.Available + + // Validate delta parts exist in storage + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("0")).futureValue shouldBe true + storage.exists(defaultNs, deltaId.asPrefixedPath.resolve("superblock")).futureValue shouldBe true + } + + def randomCommit(): Commit = + randomHash().refineTry[ValidCommit].get + + def randomHash() = + DigestCalculator.digest()(new Random().nextString(10)) +} + +class DeletedDeltaCleanupActorMockStorageSpec extends DeletedDeltaCleanupActorSpecUtil { + + import system.dispatcher + + lazy val mockStorage = new MockBlobStore() + lazy val deltas = new StaticDeltas(mockStorage) + + lazy val subject = system.actorOf(DeletedDeltaCleanupActor.withBackOff(mockStorage, tickInterval = 5.seconds, autoStart = false)) + + def randomCommit(): Commit = + randomHash().refineTry[ValidCommit].get + + def randomHash() = + DigestCalculator.digest()(new Random().nextString(10)) + + test("fails to delete deleted deltas when exception from storage") { + val superblockHash1 = RefType.applyRef[SuperBlockHash](randomHash()).toOption.get + val deltaId1 = (randomCommit(), randomCommit()).toDeltaId + + val superblockHash2 = RefType.applyRef[SuperBlockHash](randomHash()).toOption.get + val deltaId2 = (randomCommit(), randomCommit()).toDeltaId + + val bytes = ByteString("some static delta data") + + // Store delta parts + deltas.store(defaultNs, deltaId1, "0", Source.single(bytes), bytes.size, superblockHash1).futureValue + deltas.store(defaultNs, deltaId1, "superblock", Source.single(bytes), bytes.size, superblockHash1).futureValue + + deltas.store(defaultNs, deltaId2, "1", Source.single(bytes), bytes.size, superblockHash2).futureValue + deltas.store(defaultNs, deltaId2, "superblock", Source.single(bytes), bytes.size, superblockHash2).futureValue + + // Validate deltas are stored to repository (DB) + val result1 = staticDeltaMetaRepository.find(defaultNs, deltaId1).futureValue + result1.id shouldBe deltaId1 + result1.status shouldBe StaticDeltaMeta.Status.Available + + val result2 = staticDeltaMetaRepository.find(defaultNs, deltaId2).futureValue + result2.id shouldBe deltaId2 + result2.status shouldBe StaticDeltaMeta.Status.Available + + // Mark deltas for deletion + deltas.markDeleted(defaultNs, deltaId1) + deltas.markDeleted(defaultNs, deltaId2) + + // Validate deltas are not available anymore + staticDeltaMetaRepository.find(defaultNs, deltaId1).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + staticDeltaMetaRepository.find(defaultNs, deltaId2).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + + // Validate deltas are marked for deletion + staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue.length shouldBe 2 + + mockStorage.addPathToFailingPaths(deltaId1.asPrefixedPath) + + // Trigger actor action + subject ! Tick + val done = expectMsgType[Done] + done.count shouldBe 2 + done.failedCount shouldBe 1 + + // Validate failed delta was not removed from repository (DB) + val deleted = staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue + deleted.length shouldBe 1 + deleted.head.id shouldBe deltaId1 + + staticDeltaMetaRepository.find(defaultNs, deltaId1).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + staticDeltaMetaRepository.find(defaultNs, deltaId2).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + + mockStorage.resetFailingPaths() + + subject ! Tick + val done2 = expectMsgType[Done] + done2.count shouldBe 1 + done2.failedCount shouldBe 0 + + staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted).futureValue.length shouldBe 0 + staticDeltaMetaRepository.find(defaultNs, deltaId1).failed.futureValue shouldBe Errors.StaticDeltaDoesNotExist + } +} diff --git a/src/test/scala/com/advancedtelematic/treehub/daemon/MockBlobStore.scala b/src/test/scala/com/advancedtelematic/treehub/daemon/MockBlobStore.scala new file mode 100644 index 0000000..04cec13 --- /dev/null +++ b/src/test/scala/com/advancedtelematic/treehub/daemon/MockBlobStore.scala @@ -0,0 +1,36 @@ +package com.advancedtelematic.treehub.daemon + +import akka.stream.scaladsl.Source +import akka.testkit.TestException +import akka.util.ByteString +import com.advancedtelematic.libats.data.DataType +import com.advancedtelematic.treehub.object_store.BlobStore + +import java.nio.file.Path +import scala.concurrent.{ExecutionContext, Future} + +class MockBlobStore(implicit ec: ExecutionContext) extends BlobStore { + private var failingPaths = Seq[Path]() + def addPathToFailingPaths(path: Path): Unit = { + failingPaths = failingPaths :+ path + } + + def resetFailingPaths(): Unit = { + failingPaths = Seq() + } + + // BlobStore trait overrides + override def deleteObject(ns: DataType.Namespace, path: Path) = Future { throw TestException("failed") } + override def deleteObjects(ns: DataType.Namespace, pathPrefix: Path) = Future { + if (failingPaths.contains(pathPrefix)) + throw TestException("failed") + else + akka.Done + } + override def storeStream(namespace: DataType.Namespace, path: Path, size: Long, blob: Source[ByteString, _]) = Future { 123L } + override val supportsOutOfBandStorage = false + override def storeOutOfBand(namespace: DataType.Namespace, path: Path) = Future { throw TestException("failed") } + override def buildResponse(namespace: DataType.Namespace, path: Path) = Future { throw TestException("failed") } + override def readFull(namespace: DataType.Namespace, path: Path) = Future { throw TestException("failed") } + override def exists(namespace: DataType.Namespace, path: Path) = Future { throw TestException("failed") } +} \ No newline at end of file diff --git a/src/test/scala/com/advancedtelematic/treehub/http/DeltaResourceSpec.scala b/src/test/scala/com/advancedtelematic/treehub/http/DeltaResourceSpec.scala index f4fdbb2..f9edad7 100644 --- a/src/test/scala/com/advancedtelematic/treehub/http/DeltaResourceSpec.scala +++ b/src/test/scala/com/advancedtelematic/treehub/http/DeltaResourceSpec.scala @@ -247,6 +247,32 @@ class DeltaResourceSpec extends TreeHubSpec with ResourceSpec with ObjectReposit } } + test("DELETE on deltas makes the static delta unavailable") { + implicit val superBlockHash = RefType.applyRef[SuperBlockHash](DigestCalculator.digest()(new Random().nextString(10))).toOption.get + + val commit1 = RefType.applyRef[Commit]("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef").toOption.get + val commit2 = RefType.applyRef[Commit]("234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef1").toOption.get + val deltaId1 = (commit1, commit2).toDeltaId + + val blob1 = "superblock data 1".getBytes() + + Post(apiUri(s"deltas/${deltaId1.asPrefixedPath}/superblock"), blob1).withSuperblockHash() ~> routes ~> check { + status shouldBe StatusCodes.OK + } + + Get(apiUri(s"deltas/${deltaId1.asPrefixedPath}/superblock")) ~> routes ~> check { + status shouldBe StatusCodes.OK + } + + Delete(apiUri(s"deltas/${deltaId1.asPrefixedPath}")) ~> routes ~> check { + status shouldBe StatusCodes.Accepted + } + + Get(apiUri(s"deltas/${deltaId1.asPrefixedPath}/superblock")) ~> routes ~> check { + status shouldBe StatusCodes.NotFound + } + } + test("publishes usage to bus") { val deltaId = "6qdddbmoaLtYLmZg2Q8OZm7syoxvAOFs0fAXbavojKY-k_F8QpdRP9KlPD6wiS2HNF7WuL1sgfu1tLaJXV6GjIU".refineTry[ValidDeltaId].get val blob = "some other data".getBytes diff --git a/src/test/scala/com/advancedtelematic/treehub/http/TreehubRoutesSpec.scala b/src/test/scala/com/advancedtelematic/treehub/http/TreehubRoutesSpec.scala index 86f1155..8ba6403 100644 --- a/src/test/scala/com/advancedtelematic/treehub/http/TreehubRoutesSpec.scala +++ b/src/test/scala/com/advancedtelematic/treehub/http/TreehubRoutesSpec.scala @@ -22,6 +22,9 @@ class TreehubRoutesSpec extends TreeHubSpec with ResourceSpec { override def deleteObject(ns: DataType.Namespace, path: Path): Future[Done] = FastFuture.failed(new RuntimeException("[test] delete failed")) + override def deleteObjects(ns: DataType.Namespace, pathPrefix: Path): Future[Done] = + FastFuture.failed(new RuntimeException("[test] delete failed")) + override def storeStream(namespace: DataType.Namespace, path: Path, size: Long, blob: Source[ByteString, _]): Future[Long] = ??? diff --git a/src/test/scala/com/advancedtelematic/treehub/object_store/S3BlobStoreIntegrationSpec.scala b/src/test/scala/com/advancedtelematic/treehub/object_store/S3BlobStoreIntegrationSpec.scala index 814bff7..f65e8ca 100644 --- a/src/test/scala/com/advancedtelematic/treehub/object_store/S3BlobStoreIntegrationSpec.scala +++ b/src/test/scala/com/advancedtelematic/treehub/object_store/S3BlobStoreIntegrationSpec.scala @@ -1,5 +1,6 @@ package com.advancedtelematic.treehub.object_store +import akka.Done import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes import akka.stream.scaladsl.Source @@ -97,4 +98,27 @@ class S3BlobStoreIntegrationSpec extends TreeHubSpec { response.status shouldBe StatusCodes.OK response.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).futureValue.utf8String shouldBe "this is byte. Call me. maybe." } -} + + test("correctly deletes objects from blob storage") {} + val basePath = "some_base_path" + val pathsWithBase = Seq(s"$basePath/superblock", s"$basePath/0", s"$basePath/1", s"$basePath/2/123", s"$basePath/random") + val otherPaths = Seq("some_other_path/superblock", "some_path/1") + + (pathsWithBase ++ otherPaths).foreach { pathString => + val blob = ByteString(s"this is byte for path $pathString") + val source = Source.single(blob) + val path = Paths.get(pathString) + + s3BlobStore.storeStream(ns, path, blob.size, source).futureValue shouldBe blob.size + s3BlobStore.exists(ns, path).futureValue shouldBe true + } + + s3BlobStore.deleteObjects(ns, Paths.get(basePath)).futureValue shouldBe Done + + pathsWithBase.foreach { pathString => + s3BlobStore.exists(ns, Paths.get(pathString)).futureValue shouldBe false + } + otherPaths.foreach { pathString => + s3BlobStore.exists(ns, Paths.get(pathString)).futureValue shouldBe true + } + }