Skip to content

Commit

Permalink
Merge branch 'OTA-2116' into 'master'
Browse files Browse the repository at this point in the history
OTA-2116:  Delete an existing static delta with a background process removing s3 objects marked for deletion

Closes OTA-2116

See merge request torizon-platform/treehub!117
  • Loading branch information
Raigi Gläser committed Feb 15, 2024
2 parents 409ea58 + e7c7374 commit 2231292
Show file tree
Hide file tree
Showing 15 changed files with 513 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE `static_deltas`
CHANGE `status` `status`
ENUM('uploading', 'available', 'deleted') NOT NULL
;
1 change: 1 addition & 0 deletions src/main/scala/com/advancedtelematic/data/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object DataType {

case object Uploading extends Status
case object Available extends Status
case object Deleted extends Status
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/advancedtelematic/treehub/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.advancedtelematic.libats.slick.db.{BootMigrations, CheckMigrations, D
import com.advancedtelematic.libats.slick.monitoring.DatabaseMetrics
import com.advancedtelematic.metrics.prometheus.PrometheusMetricsSupport
import com.advancedtelematic.metrics.{AkkaHttpConnectionMetrics, AkkaHttpRequestMetrics, MetricsSupport}
import com.advancedtelematic.treehub.daemon.StaleObjectArchiveActor
import com.advancedtelematic.treehub.daemon.{DeletedDeltaCleanupActor, StaleObjectArchiveActor}
import com.advancedtelematic.treehub.delta_store.StaticDeltas
import com.advancedtelematic.treehub.http.TreeHubRoutes
import com.advancedtelematic.treehub.object_store.{LocalFsBlobStore, ObjectStore, S3BlobStore}
Expand Down Expand Up @@ -84,6 +84,8 @@ class TreehubBoot(override val globalConfig: Config,
system.actorOf(StaleObjectArchiveActor.withBackOff(objectStorage, staleObjectExpireAfter, autoStart = true), "stale-objects-archiver")
}

system.actorOf(DeletedDeltaCleanupActor.withBackOff(deltaBlobStorage, autoStart = true), "deleted-deltas-cleanup")

val routes: Route =
(versionHeaders(version) & requestMetrics(metricRegistry) & logResponseMetrics(projectName)) {
prometheusMetricsRoutes ~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.advancedtelematic.treehub.daemon

import akka.actor.{Actor, ActorLogging, Props}
import akka.actor.Status.Failure
import akka.pattern.{BackoffOpts, BackoffSupervisor}
import com.advancedtelematic.data.DataType.StaticDeltaMeta
import com.advancedtelematic.treehub.daemon.DeletedDeltaCleanupActor.{Done, Tick, defaultTickInterval}
import com.advancedtelematic.treehub.db.StaticDeltaMetaRepositorySupport
import com.advancedtelematic.treehub.object_store.BlobStore
import slick.jdbc.MySQLProfile.api.*

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

object DeletedDeltaCleanupActor {
case object Tick
case class Done(count: Long, failedCount: Long)

private val defaultTickInterval = 10.minutes

def props(storage: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit db: Database, ec: ExecutionContext) =
Props(new DeletedDeltaCleanupActor(storage, tickInterval, autoStart))

def withBackOff(blobStore: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit db: Database, ec: ExecutionContext) =
BackoffSupervisor.props(BackoffOpts.onFailure(props(blobStore, tickInterval, autoStart), "deleted-deltas-worker", 10.minutes, 1.hour, 0.25))
}

class DeletedDeltaCleanupActor(storage: BlobStore, tickInterval: FiniteDuration = defaultTickInterval, autoStart: Boolean = false)(implicit val db: Database, ec: ExecutionContext) extends Actor
with StaticDeltaMetaRepositorySupport
with ActorLogging {

import akka.pattern.pipe

import scala.async.Async.*

override def preStart(): Unit = {
if(autoStart)
self ! Tick
}

def cleanupDelta(delta: StaticDeltaMeta): Future[Boolean] =
storage.deleteObjects(delta.namespace, delta.id.asPrefixedPath)
.flatMap { _ =>
staticDeltaMetaRepository.delete(delta.namespace, delta.id)
.map { _ => true }
}
.recover { case ex =>
log.error(ex, s"Failed to delete delta ${delta.id}")
false
}

def processAll: Future[Done] = async {
val deltas = await { staticDeltaMetaRepository.findByStatus(StaticDeltaMeta.Status.Deleted) }

val failedCount = await { Future.sequence(deltas.map(cleanupDelta)) }.count(_ == false)

Done(deltas.length, failedCount)
}

override def receive: Receive = {
case Done(count, failedCount) =>
if(count > 0)
log.info(s"Batch done, processed ${count - failedCount} objects successfully - with $failedCount failures")
else
log.debug("Tick, scheduling next execution")

context.system.scheduler.scheduleOnce(tickInterval, self, Tick)

case Failure(ex) =>
throw ex

case Tick =>
processAll.pipeTo(sender())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ protected class StaticDeltaMetaRepository()(implicit db: Database, ec: Execution
staticDeltas
.filter(_.namespace === ns)
.filter(_.id === deltaId)
.filter(_.status.inSet(Seq(StaticDeltaMeta.Status.Uploading, StaticDeltaMeta.Status.Available))) // filter out if deleted?
.result
.headOption
.failIfNone(Errors.StaticDeltaDoesNotExist)
}

def findByStatus(status: StaticDeltaMeta.Status): Future[Seq[StaticDeltaMeta]] = db.run {
staticDeltas
.filter(_.status === status)
.result
}

def findAll(ns: Namespace, status: StaticDeltaMeta.Status, offset: Long, limit: Long): Future[PaginationResult[StaticDelta]] = db.run {
staticDeltas
.filter(_.namespace === ns)
Expand Down Expand Up @@ -85,6 +92,14 @@ protected class StaticDeltaMetaRepository()(implicit db: Database, ec: Execution
.map(_ => ())
}

def delete(ns: Namespace, id: DeltaId): Future[Unit] = db.run {
staticDeltas
.filter(_.namespace === ns)
.filter(_.id === id)
.delete
.map(_ => ())
}

def persistIfValid(ns: Namespace, id: DeltaId, to: Commit, from: Commit, superblockHash: SuperBlockHash): Future[StaticDeltaMeta] = {
val io = staticDeltas
.filter(_.namespace === ns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ class StaticDeltas(storage: BlobStore)(implicit val db: Database, ec: ExecutionC
}
}

def markDeleted(ns: Namespace, deltaId: DeltaId): Future[Unit] = async {
await {
staticDeltaMetaRepository.setStatus(ns, deltaId, StaticDeltaMeta.Status.Deleted).recover {
case err =>
log.error(s"could not delete object $ns/$deltaId", err)
}
}
}

def getAll(ns: Namespace, offset: Option[Long] = None, limit: Option[Long] = None): Future[PaginationResult[StaticDelta]] =
staticDeltaMetaRepository.findAll(ns, StaticDeltaMeta.Status.Available, offset.orDefaultOffset, limit.orDefaultLimit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DeltaResource(namespace: Directive1[Namespace],
complete(f)
}
} ~
(delete & path(PrefixedDeltaIdPath)) { id =>
val f = staticDeltas.markDeleted(ns, id)
complete(f.map(_ => StatusCodes.Accepted))
}
~
(pathEnd & parameters(Symbol("from").as[Commit], Symbol("to").as[Commit])) { (from, to) =>
val deltaId = (from, to).toDeltaId
val uri = Uri(s"/deltas/${deltaId.asPrefixedPath}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ object BlobStore {
trait BlobStore {
def deleteObject(ns: Namespace, path: Path): Future[Done]

def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done]

def storeStream(namespace: Namespace, path: Path, size: Long, blob: Source[ByteString, _]): Future[Long]

val supportsOutOfBandStorage: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import com.advancedtelematic.libats.data.DataType.Namespace
import com.advancedtelematic.treehub.http.Errors
import org.slf4j.LoggerFactory

import java.nio.file.{Files, Path}
import java.io.IOException
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor}
import scala.async.Async.{async, await}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
Expand Down Expand Up @@ -85,10 +87,40 @@ class LocalFsBlobStore(root: Path)(implicit ec: ExecutionContext, mat: Materiali

override def deleteObject(ns: Namespace, path: Path): Future[Done] = FastFuture.successful {
val p = objectPath(ns, path)

Files.delete(p)
Done
}

override def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done] = FastFuture.successful {
val p = objectPath(ns, pathPrefix)

log.info(s">>>> DELETE objects in path recursively: $p")

Files.walkFileTree(
p,
new SimpleFileVisitor[Path] {
override def visitFile(
file: Path,
attrs: BasicFileAttributes
): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(
dir: Path,
exc: IOException
): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}
)

Done
}

private def ensureDirExists(path: Path) =
Try(Files.createDirectories(path.getParent)).failed.foreach { ex =>
log.warn(s"Could not create directories: $path", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.advancedtelematic.treehub.object_store
import akka.Done
import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.model.{HttpResponse, StatusCodes, Uri}
import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
Expand All @@ -14,6 +15,7 @@ import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.*
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -134,6 +136,39 @@ class S3BlobStore(s3Credentials: S3Credentials, s3client: AmazonS3, allowRedirec
Done
}
}

import scala.jdk.CollectionConverters._
override def deleteObjects(ns: Namespace, pathPrefix: Path): Future[Done] = {
def loop(): Future[Done] = {
val listRequest = new ListObjectsRequest()
.withBucketName(bucketId)
.withPrefix(objectFilename(ns, pathPrefix))
.withMaxKeys(100)

val resultF = Future {
blocking(s3client.listObjects(listRequest))
}

resultF.flatMap { result =>
val keys = result.getObjectSummaries.asScala.map(_.getKey).toList
Future {
blocking {
val deleteRequest = new DeleteObjectsRequest(bucketId)
deleteRequest.setKeys(keys.map(new KeyVersion(_)).asJava)

s3client.deleteObjects(deleteRequest)
}
}.flatMap { _ =>
if (result.isTruncated)
loop()
else
FastFuture.successful(Done)
}
}
}

loop()
}
}

object S3Client {
Expand Down
Loading

0 comments on commit 2231292

Please sign in to comment.