Skip to content

Commit

Permalink
Stream Enrich: apply automated code formatting (closes #3651)
Browse files Browse the repository at this point in the history
  • Loading branch information
knservis committed Mar 20, 2018
1 parent 41e866e commit 739a6d6
Show file tree
Hide file tree
Showing 24 changed files with 442 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object EnrichApp {

val FilepathRegex = "^file:(.+)$".r
val DynamoDBRegex = "^dynamodb:([^/]*)/([^/]*)/([^/]*)$".r
val regexMsg = "'file:[filename]' or 'dynamodb:[region/table/key]'"
val regexMsg = "'file:[filename]' or 'dynamodb:[region/table/key]'"

case class FileConfig(
config: File = new File("."),
Expand All @@ -79,45 +79,58 @@ object EnrichApp {
head(generated.Settings.name, generated.Settings.version)
help("help")
version("version")
opt[File]("config").required().valueName("<filename>")
opt[File]("config")
.required()
.valueName("<filename>")
.action((f: File, c: FileConfig) => c.copy(config = f))
.validate(f =>
if (f.exists) success
else failure(s"Configuration file $f does not exist")
)
opt[String]("resolver").required().valueName("<resolver uri>")
else failure(s"Configuration file $f does not exist"))
opt[String]("resolver")
.required()
.valueName("<resolver uri>")
.text(s"Iglu resolver file, $regexMsg")
.action((r: String, c: FileConfig) => c.copy(resolver = r))
.validate(_ match {
case FilepathRegex(_) => success
case DynamoDBRegex(_, _, _) => success
case _ => failure(s"Resolver doesn't match accepted uris: $regexMsg")
})
opt[String]("enrichments").optional().valueName("<enrichment directory uri>")
opt[String]("enrichments")
.optional()
.valueName("<enrichment directory uri>")
.text(s"Directory of enrichment configuration JSONs, $regexMsg")
.action((e: String, c: FileConfig) => c.copy(enrichmentsDir = Some(e)))
.validate(_ match {
case FilepathRegex(_) | DynamoDBRegex(_, _, _) => success
case _ => failure(s"Enrichments directory doesn't match accepted uris: $regexMsg")
case _ => failure(s"Enrichments directory doesn't match accepted uris: $regexMsg")
})
opt[Unit]("force-ip-lookups-download")
.text("Invalidate the cached IP lookup files and download them anew")
.action((_, c) => c.copy(forceDownload = true))
}

implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))
val conf: Either[String, (EnrichConfig, String, Option[String], Boolean)] = utils.filterOrElse(
parser.parse(args, FileConfig())
.toRight("Error while parsing command-line arguments")
.right.flatMap { fc =>
utils.fold(Try(ConfigFactory.parseFile(fc.config).resolve()))(
t => Left(t.getMessage), c => Right((c, fc.resolver, fc.enrichmentsDir, fc.forceDownload)))
}
)(t => t._1.hasPath("enrich"), "No top-level \"enrich\" could be found in the configuration")
.flatMap { case (config, resolver, enrichments, forceDownload) =>
utils.fold(Try(loadConfigOrThrow[EnrichConfig](config.getConfig("enrich"))))(
t => Left(t.getMessage), Right(_))
.right.map(ec => (ec, resolver, enrichments, forceDownload))
val conf: Either[String, (EnrichConfig, String, Option[String], Boolean)] = utils
.filterOrElse(
parser
.parse(args, FileConfig())
.toRight("Error while parsing command-line arguments")
.right
.flatMap { fc =>
utils.fold(Try(ConfigFactory.parseFile(fc.config).resolve()))(
t => Left(t.getMessage),
c => Right((c, fc.resolver, fc.enrichmentsDir, fc.forceDownload)))
}
)(t => t._1.hasPath("enrich"), "No top-level \"enrich\" could be found in the configuration")
.flatMap {
case (config, resolver, enrichments, forceDownload) =>
utils
.fold(Try(loadConfigOrThrow[EnrichConfig](config.getConfig("enrich"))))(
t => Left(t.getMessage),
Right(_))
.right
.map(ec => (ec, resolver, enrichments, forceDownload))
}

conf match {
Expand All @@ -140,17 +153,17 @@ object EnrichApp {
val provider = ec.aws.provider
implicit val igluResolver: Resolver = (for {
parsedResolver <- extractResolver(provider, resolver)
json <- JsonUtils.extractJson("", parsedResolver)
resolver <- Resolver.parse(json).leftMap(_.toString)
json <- JsonUtils.extractJson("", parsedResolver)
resolver <- Resolver.parse(json).leftMap(_.toString)
} yield resolver) fold (
e => throw new RuntimeException(e),
s => s
)

val registry: EnrichmentRegistry = (for {
enrichmentConfig <- extractEnrichmentConfig(provider, enrichmentsDir)
registryConfig <- JsonUtils.extractJson("", enrichmentConfig)
reg <- EnrichmentRegistry.parse(fromJsonNode(registryConfig), false).leftMap(_.toString)
registryConfig <- JsonUtils.extractJson("", enrichmentConfig)
reg <- EnrichmentRegistry.parse(fromJsonNode(registryConfig), false).leftMap(_.toString)
} yield reg) fold (
e => throw new RuntimeException(e),
s => s
Expand All @@ -167,14 +180,15 @@ object EnrichApp {
): Unit = {

val source = ec.sourceType match {
case KafkaSource => new KafkaSource(ec, igluResolver, registry, tracker)
case KafkaSource => new KafkaSource(ec, igluResolver, registry, tracker)
case KinesisSource => new KinesisSource(ec, igluResolver, registry, tracker)
case StdinSource => new StdinSource(ec, igluResolver, registry, tracker)
case NsqSource => new NsqSource(ec, igluResolver, registry, tracker)
case StdinSource => new StdinSource(ec, igluResolver, registry, tracker)
case NsqSource => new NsqSource(ec, igluResolver, registry, tracker)
}
tracker.foreach(SnowplowTracking.initializeSnowplowTracking)
source.run
}

/**
* Download the IP lookup files locally.
* @param registry Enrichment registry
Expand All @@ -189,24 +203,26 @@ object EnrichApp {
): ValidationNel[String, List[Int]] =
registry.getIpLookupsEnrichment
.map(_.dbsToCache)
.toList.flatten
.map { case (uri, path) =>
(new java.net.URI(uri.toString.replaceAll("(?<!(http:|https:|s3:))//", "/")),
new File(path))
.toList
.flatten
.map {
case (uri, path) =>
(
new java.net.URI(uri.toString.replaceAll("(?<!(http:|https:|s3:))//", "/")),
new File(path))
}
.filter { case (_, targetFile) => forceDownload || targetFile.length == 0L }
.map { case (cleanURI, targetFile) =>
val downloadResult = cleanURI.getScheme match {
case "http" | "https" => (cleanURI.toURL #> targetFile).!.success
case "s3" => downloadFromS3(provider, cleanURI, targetFile).success
case s => s"Scheme $s for file $cleanURI not supported".failure
}
downloadResult
.flatMap {
case i if i != 0 => s"Attempt to download $cleanURI to $targetFile failed".failure
case o => o.success
.map {
case (cleanURI, targetFile) =>
val downloadResult = cleanURI.getScheme match {
case "http" | "https" => (cleanURI.toURL #> targetFile).!.success
case "s3" => downloadFromS3(provider, cleanURI, targetFile).success
case s => s"Scheme $s for file $cleanURI not supported".failure
}
.toValidationNel
downloadResult.flatMap {
case i if i != 0 => s"Attempt to download $cleanURI to $targetFile failed".failure
case o => o.success
}.toValidationNel
}
.sequenceU

Expand All @@ -229,7 +245,8 @@ object EnrichApp {
}
case DynamoDBRegex(region, table, key) =>
lookupDynamoDBConfig(provider, region, table, key).success
case _ => s"Resolver argument [$resolverArgument] must begin with 'file:' or 'dynamodb:'".failure
case _ =>
s"Resolver argument [$resolverArgument] must begin with 'file:' or 'dynamodb:'".failure
}

/**
Expand All @@ -241,15 +258,18 @@ object EnrichApp {
* @param key The value of the primary key for the configuration
* @return The JSON stored in DynamoDB
*/
def lookupDynamoDBConfig(provider: AWSCredentialsProvider,
region: String, table: String, key: String): String = {
def lookupDynamoDBConfig(
provider: AWSCredentialsProvider,
region: String,
table: String,
key: String): String = {
val dynamoDBClient = AmazonDynamoDBClientBuilder
.standard()
.withCredentials(provider)
.withEndpointConfiguration(new EndpointConfiguration(getDynamodbEndpoint(region), region))
.build()
val dynamoDB = new DynamoDB(dynamoDBClient)
val item = dynamoDB.getTable(table).getItem("id", key)
val item = dynamoDB.getTable(table).getItem("id", key)
item.getString("json")
}

Expand All @@ -263,20 +283,22 @@ object EnrichApp {
provider: AWSCredentialsProvider,
enrichmentArgument: Option[String]
): Validation[String, String] = {
val jsons: Validation[String, List[String]] = enrichmentArgument.map {
case FilepathRegex(dir) =>
new File(dir).listFiles
.filter(_.getName.endsWith(".json"))
.map(scala.io.Source.fromFile(_).mkString)
.toList
.success
case DynamoDBRegex(region, table, partialKey) =>
lookupDynamoDBEnrichments(provider, region, table, partialKey) match {
case Nil => s"No enrichments found with partial key $partialKey".failure
case js => js.success
}
case other => s"Enrichments argument [$other] must match $regexMsg".failure
}.getOrElse(Nil.success)
val jsons: Validation[String, List[String]] = enrichmentArgument
.map {
case FilepathRegex(dir) =>
new File(dir).listFiles
.filter(_.getName.endsWith(".json"))
.map(scala.io.Source.fromFile(_).mkString)
.toList
.success
case DynamoDBRegex(region, table, partialKey) =>
lookupDynamoDBEnrichments(provider, region, table, partialKey) match {
case Nil => s"No enrichments found with partial key $partialKey".failure
case js => js.success
}
case other => s"Enrichments argument [$other] must match $regexMsg".failure
}
.getOrElse(Nil.success)

jsons.map { js =>
val combinedJson = ("schema" -> "iglu:com.snowplowanalytics.snowplow/enrichments/jsonschema/1-0-0") ~
Expand All @@ -295,7 +317,9 @@ object EnrichApp {
*/
def lookupDynamoDBEnrichments(
provider: AWSCredentialsProvider,
region: String, table: String, partialKey: String
region: String,
table: String,
partialKey: String
): List[String] = {
val dynamoDBClient = AmazonDynamoDBClientBuilder
.standard()
Expand All @@ -306,22 +330,26 @@ object EnrichApp {
// Each scan can only return up to 1MB
// See http://techtraits.com/cloud/nosql/2012/06/27/Amazon-DynamoDB--Understanding-Query-and-Scan-operations/
@tailrec
def partialScan(sofar: List[Map[String, String]] = Nil, lastEvaluatedKey: java.util.Map[String, AttributeValue] = null): List[Map[String, String]] = {
def partialScan(
sofar: List[Map[String, String]] = Nil,
lastEvaluatedKey: java.util.Map[String, AttributeValue] = null): List[Map[String, String]] = {
val scanRequest = new ScanRequest().withTableName(table)
scanRequest.setExclusiveStartKey(lastEvaluatedKey)
val lastResult = dynamoDBClient.scan(scanRequest)
val combinedResults = sofar ++ lastResult.getItems.asScala.map(_.asScala.toMap.mapValues(_.getS))
val combinedResults = sofar ++ lastResult.getItems.asScala
.map(_.asScala.toMap.mapValues(_.getS))
lastResult.getLastEvaluatedKey match {
case null => combinedResults
case null => combinedResults
case startKey => partialScan(combinedResults, startKey)
}
}
val allItems = partialScan(Nil)
allItems filter { item => item.get("id") match {
allItems filter { item =>
item.get("id") match {
case Some(value) if value.startsWith(partialKey) => true
case _ => false
case _ => false
}
} flatMap(_.get("json"))
} flatMap (_.get("json"))
}

/**
Expand All @@ -339,7 +367,7 @@ object EnrichApp {
val bucket = uri.getHost
val key = uri.getPath match { // Need to remove leading '/'
case s if s.charAt(0) == '/' => s.substring(1)
case s => s
case s => s
}

try {
Expand All @@ -354,7 +382,7 @@ object EnrichApp {

private def getDynamodbEndpoint(region: String): String =
region match {
case cn@"cn-north-1" => s"https://dynamodb.$cn.amazonaws.com.cn"
case _ => s"https://dynamodb.$region.amazonaws.com"
case cn @ "cn-north-1" => s"https://dynamodb.$cn.amazonaws.com.cn"
case _ => s"https://dynamodb.$region.amazonaws.com"
}
}
Loading

0 comments on commit 739a6d6

Please sign in to comment.