Skip to content

Commit

Permalink
[to squash] get RDB shredder discovery to work
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 11, 2021
1 parent 6e08e57 commit 3fccbeb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import java.net.URI

import com.amazonaws.services.s3.model.{GetObjectMetadataRequest, ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
import com.amazonaws.services.s3.AmazonS3
import scala.collection.JavaConverters._

import com.snowplowanalytics.snowplow.rdbloader.common.S3.Folder

object Cloud {
def list(client: AmazonS3, str: S3.Folder): List[S3ObjectSummary] = {
val (bucket, prefix) = S3.splitS3Path(str)
Expand All @@ -40,8 +40,8 @@ object Cloud {
keyUnfold(client.listObjectsV2(req)).filterNot(_.getSize == 0).toList
}

def listDirs(client: AmazonS3, str: URI): List[S3.Folder] = {
val (bucket, prefix) = S3.splitS3Path(S3.Folder.coerce(str.toString))
def listDirs(client: AmazonS3, f: Folder): List[S3.Folder] = {
val (bucket, prefix) = S3.splitS3Path(f.fixPrefix)

val req = new ListObjectsV2Request()
.withBucketName(bucket)
Expand All @@ -63,8 +63,6 @@ object Cloud {

def keyExists(client: AmazonS3, key: S3.Key): Boolean = {
val (bucket, s3Key) = S3.splitS3Key(key)
val request = new GetObjectMetadataRequest(bucket, s3Key)
val _ = client.getObjectMetadata(request)
true
client.doesObjectExist(bucket, s3Key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ object S3 {

def bucketName: String =
f.split("://").last.split("/").head

def fixPrefix: Folder =
Folder.coerce(S3.fixPrefix(f))
}


Expand All @@ -55,7 +58,7 @@ object S3 {

/** Turn proper `s3://bucket/path/` string into `Folder` */
def coerce(s: String): Folder =
apply(appendTrailingSlash(fixPrefix(s)).asInstanceOf[Folder])
apply(appendTrailingSlash(s).asInstanceOf[Folder])

def append(s3Bucket: Folder, s: String): Folder = {
val normalized = if (s.endsWith("/")) s else s + "/"
Expand Down Expand Up @@ -163,6 +166,7 @@ object S3 {
case _ => throw new IllegalArgumentException(s"Invalid S3 key [$key] was passed") // Impossible
}

/** Used only to list S3 directories, not to read and write data. */
private def fixPrefix(s: String): String =
if (s.startsWith("s3n")) "s3" + s.stripPrefix("s3n")
else if (s.startsWith("s3a")) "s3" + s.stripPrefix("s3a")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package com.snowplowanalytics.snowplow.shredder

import java.net.URI

import cats.syntax.either._

import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest, ClientConfiguration}
Expand All @@ -26,6 +24,7 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Cloud, LoaderMessage, Semver}
import com.snowplowanalytics.snowplow.rdbloader.common.S3.Folder


object Discovery {
Expand All @@ -36,7 +35,7 @@ object Discovery {
final val RetryBaseDelay = 1000 // milliseconds
final val RetryMaxDelay = 20 * 1000 // milliseconds

/** Common retry policy for S3 and SQS */
/** Common retry policy for S3 and SQS (jitter) */
final val RetryPolicy =
new RetryPolicy(
(_: AmazonWebServiceRequest, _: AmazonClientException, retriesAttempted: Int) =>
Expand All @@ -54,14 +53,24 @@ object Discovery {
LoaderMessage.Processor(ProjectMetadata.shredderName, MessageProcessorVersion)


def getUnprocessed(region: String, enriched: URI, shredded: URI): Set[Either[S3.Folder, String]] = {
/** @return Tuple containing list of folders with incomplete shredding and list of unshredded folders */
def getState(region: String, enrichedFolder: Folder, shreddedFolder: Folder): (Set[S3.Folder], Set[S3.Folder]) = {
val client = createS3Client(region)
val enrichedDirs = Cloud.listDirs(client, enriched).toSet
val shreddedDirs = Cloud.listDirs(client, shredded).toSet
(enrichedDirs -- shreddedDirs).map { folder =>
val key = folder.withKey(FinalKeyName)
Either.cond(Cloud.keyExists(client, key), folder.folderName, folder)
val enrichedDirs = Cloud.listDirs(client, enrichedFolder).toSet
val shreddedDirs = Cloud.listDirs(client, shreddedFolder).toSet

val enrichedFolderNames = enrichedDirs.map(Folder.coerce).map(_.folderName)
val shreddedFolderNames = shreddedDirs.map(Folder.coerce).map(_.folderName)

val incomplete = enrichedFolderNames.intersect(shreddedFolderNames).collect {
case folder if !Cloud.keyExists(client, shreddedFolder.append(folder).withKey(FinalKeyName)) =>
enrichedFolder.append(folder)
}

val unshredded = enrichedFolderNames.diff(shreddedFolderNames)
.map(enrichedFolder.append)

(incomplete, unshredded)
}

def sendMessage(message: LoaderMessage.ShreddingComplete,
Expand All @@ -77,7 +86,12 @@ object Discovery {
.withMessageGroupId("shredding")

val (bucket, key) = S3.splitS3Key(message.base.withKey(FinalKeyName))
s3Client.putObject(bucket, key, message.selfDescribingData.asString)
Either.catchNonFatal(s3Client.putObject(bucket, key, message.selfDescribingData.asString)) match {
case Left(e) =>
throw new RuntimeException(s"RDB Shredder could not write ${message.base.withKey(FinalKeyName)}", e)
case _ =>
()
}

Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match {
case Left(e) =>
Expand All @@ -95,7 +109,7 @@ object Discovery {
.withClientConfiguration(new ClientConfiguration().withRetryPolicy(RetryPolicy))
.build()

/** Create SQS client with built-in retry mechanism (jitter) */
/** Create S3 client with built-in retry mechanism (jitter) */
def createS3Client(region: String): AmazonS3 =
AmazonS3ClientBuilder
.standard()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig

import com.snowplowanalytics.snowplow.rdbloader.common._
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._
import com.snowplowanalytics.snowplow.rdbloader.common.S3.Folder

import com.snowplowanalytics.snowplow.shredder.Discovery.MessageProcessor
import com.snowplowanalytics.snowplow.shredder.transformation.{FinalRow, EventUtils}
Expand Down Expand Up @@ -60,16 +61,15 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: CliConfig) exten
* - writing out JSON contexts as well as properly-formed and malformed events
*/
def run(shredder: Config.Shredder,
folderName: String,
folderToShred: Folder,
atomicLengths: Map[String, Int],
eventsManifest: Option[EventsManifestConfig]): LoaderMessage.ShreddingComplete = {
val jobStarted: Instant = Instant.now()
val inputFolder: S3.Folder = S3.Folder.coerce(shredder.input.toString).append(folderName)
val outFolder: S3.Folder = S3.Folder.coerce(shredder.output.toString).append(folderName)
val badFolder: S3.Folder = S3.Folder.coerce(shredder.outputBad.toString).append(folderName)
val outFolder: S3.Folder = S3.Folder.coerce(shredder.output.toString).append(folderToShred.folderName)
val badFolder: S3.Folder = S3.Folder.coerce(shredder.outputBad.toString).append(folderToShred.folderName)

// Enriched TSV lines along with their shredded components
val common = sc.textFile(inputFolder)
val common = sc.textFile(folderToShred)
.map(line => EventUtils.loadAndShred(IgluSingleton.get(shredConfig.igluConfig), line))
.setName("common")
.cache()
Expand Down Expand Up @@ -163,23 +163,28 @@ object ShredJob {
def run(spark: SparkSession, cli: CliConfig): Unit = {
val atomicLengths = EventUtils.getAtomicLengths(cli.igluConfig).fold(err => throw err, identity)

val inputFolders = Discovery
.getUnprocessed(cli.config.region, cli.config.shredder.input, cli.config.shredder.output)
.collect { case Right(folder) => folder }
.toList
val enrichedFolder = Folder.coerce(cli.config.shredder.input.toString)
val shreddedFolder = Folder.coerce(cli.config.shredder.output.toString)

val (incomplete, unshredded) = Discovery
.getState(cli.config.region, enrichedFolder, shreddedFolder)

val eventsManifest: Option[EventsManifestConfig] = cli.duplicateStorageConfig.map { json =>
val config = EventsManifestConfig
.parseJson[Id](singleton.IgluSingleton.get(cli.igluConfig), json)
.valueOr(err => throw new IllegalArgumentException(err))
val _ = singleton.DuplicateStorageSingleton.get(Some(config)) // Just to check it can be initialized
val _ = singleton.DuplicateStorageSingleton.get(Some(config)) // Just to check it can be initialized
config
}

inputFolders.foreach { folderName =>
System.out.println(s"RDB Shredder: processing $folderName")
incomplete.toList.foreach { folder =>
System.err.println(s"$folder was not successfully shredded")
}

unshredded.toList.foreach { folder =>
System.out.println(s"RDB Shredder: processing $folder")
val job = new ShredJob(spark, cli)
val completed = job.run(cli.config.shredder, folderName, atomicLengths, eventsManifest)
val completed = job.run(cli.config.shredder, folder, atomicLengths, eventsManifest)
Discovery.sendMessage(completed,cli.config.region, cli.config.messageQueue)
}
}
Expand Down

0 comments on commit 3fccbeb

Please sign in to comment.