Skip to content

Commit

Permalink
Beam Enrich: support enrichments relying on local files (closes #3736)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Aug 29, 2018
1 parent 5409104 commit b318030
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 85 deletions.
21 changes: 18 additions & 3 deletions 3-enrich/beam-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ lazy val compilerOptions = Seq(
"-Xfuture"
)

lazy val resolutionRepos = Seq(
// For Snowplow
"Snowplow Analytics Maven releases repo" at "http://maven.snplow.com/releases/",
// For ua-parser
"user-agent-parser repo" at "https://clojars.org/repo/"
)

// we fork a JVM per test in order to not reuse enrichment registries
import Tests._
{
def oneJVMPerTest(tests: Seq[TestDefinition]) =
tests.map(t => new Group(t.name, Seq(t), SubProcess(ForkOptions()))).toSeq
testGrouping in Test := oneJVMPerTest((definedTests in Test).value)
}

lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
organization := "com.snowplowanalytics",
version := "0.1.0-SNAPSHOT",
Expand Down Expand Up @@ -43,9 +58,9 @@ lazy val noPublishSettings = Seq(
publishArtifact := false
)

lazy val scioVersion = "0.5.2"
lazy val beamVersion = "2.4.0"
lazy val sceVersion = "0.32.0"
lazy val scioVersion = "0.5.7"
lazy val beamVersion = "2.5.0"
lazy val sceVersion = "0.35.0"
lazy val scalaMacrosVersion = "2.1.0"
lazy val slf4jVersion = "1.7.25"
lazy val scalatestVersion = "3.0.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ package snowplow
package enrich
package beam

import java.io.File
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Path, Paths}

import scala.util.Try

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.pubsub.PubSubAdmin
import com.spotify.scio.values.{DistCache, SCollection}
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.commons.codec.binary.Base64
import org.json4s.{JObject, JValue}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import scalaz._
Expand All @@ -39,7 +47,8 @@ import utils._
/*
sbt "runMain com.snowplowanalytics.snowplow.enrich.beam.Enrich
--project=[PROJECT] --runner=DataflowRunner --zone=[ZONE] --streaming=true
--input=[INPUT TOPIC]
--job-name=[JOB NAME]
--input=[INPUT SUBSCRIPTION]
--output=[OUTPUT TOPIC]
--bad=[BAD TOPIC]
--resolver=[RESOLVER FILE PATH]
Expand All @@ -51,12 +60,13 @@ object Enrich {
// the maximum record size in Google PubSub is 10Mb
private val MaxRecordSize = 10000000

implicit def enrichSCollection[T](collection: SCollection[T]) = new RichSCollection[T](collection)

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val parsedConfig = for {
config <- EnrichConfig(args)
_ = sc.setJobName(config.jobName)
_ <- checkTopicExists(sc, config.output)
_ <- checkTopicExists(sc, config.bad)
resolverJson <- parseResolver(config.resolver)
resolver <- Resolver.parse(resolverJson).leftMap(_.toList.mkString("\n"))
enrichmentRegistryJson <- parseEnrichmentRegistry(config.enrichments)(resolver)
Expand All @@ -65,7 +75,7 @@ object Enrich {

parsedConfig match {
case Failure(e) =>
System.err.println(s"An error occured: $e")
System.err.println(e)
System.exit(1)
case Success(config) =>
run(sc, config)
Expand All @@ -74,22 +84,31 @@ object Enrich {
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
val input: SCollection[Array[Byte]] = sc.pubsubTopic(config.input).withName("input")
// Path is not serializable
val cachedFiles: DistCache[List[Either[String, String]]] = {
val filesToCache = getFilesToCache(config.resolver, config.enrichmentRegistry)
sc.distCache(filesToCache.map(_._1.toString)) { files =>
createSymLinks(files.toList.zip(filesToCache.map(_._2))).map(_.map(_.toString))
}
}

val input: SCollection[Array[Byte]] = sc.pubsubSubscription(config.input).withName("input")
val enriched: SCollection[Validation[BadRow, EnrichedEvent]] = input
.map { rawEvent =>
cachedFiles()
implicit val resolver = ResolverSingleton.get(config.resolver)
enrich(rawEvent, EnrichmentRegistrySingleton.get(config.enrichmentRegistry))
}
.flatten
.withName("enriched")

val (successes, failures) = enriched.partition2(_.isSuccess)
val (successes, failures) = enriched.partition(_.isSuccess)
val (tooBigSuccesses, properlySizedsuccesses) = successes
.collect { case Success(enrichedEvent) =>
val formattedEnrichedEvent = tabSeparatedEnrichedEvent(enrichedEvent)
(formattedEnrichedEvent, getStringSize(formattedEnrichedEvent))
}
.partition2(_._2 >= MaxRecordSize)
.partition(_._2 >= MaxRecordSize)
properlySizedsuccesses.map(_._1).withName("enriched-good").saveAsPubsub(config.output)

val failureCollection: SCollection[BadRow] =
Expand All @@ -114,4 +133,41 @@ object Enrich {
BadRow(line, errors).failure
}
}

def createSymLinks(filesToCache: List[(File, String)]): List[Either[String, Path]] = filesToCache
.map { case (file, symLink) =>
val link = createSymLink(file, symLink)
link match {
case Right(p) => logger.info(s"File $file cached at $p")
case Left(e) => logger.warn(s"File $file could not be cached: $e")
}
link
}

def createSymLink(file: File, symLink: String): Either[String, Path] = {
val symLinkPath = Paths.get(symLink)
if (Files.notExists(symLinkPath)) {
Try(Files.createSymbolicLink(symLinkPath, file.toPath)) match {
case scala.util.Success(p) => Right(p)
case scala.util.Failure(t) => Left(t.getMessage)
}
} else Left(s"Symlink $symLinkPath already exists")
}

def getFilesToCache(resolverJson: JValue, registryJson: JObject): List[(URI, String)] = {
implicit val resolver = ResolverSingleton.get(resolverJson)
val registry = EnrichmentRegistrySingleton.get(registryJson)
registry.getIpLookupsEnrichment.map(_.dbsToCache).getOrElse(Nil)
}

private def checkTopicExists(sc: ScioContext, topicName: String): Validation[String, Unit] =
if (sc.isTest) {
().success
} else {
PubSubAdmin.topic(sc.options.as(classOf[PubsubOptions]), topicName) match {
case scala.util.Success(_) => ().success
case scala.util.Failure(e) =>
s"Output topic $topicName couldn't be retrieved: ${e.getMessage}".failure
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import iglu.client.Resolver

object config {
final case class EnrichConfig(
jobName: String,
input: String,
output: String,
bad: String,
Expand All @@ -43,12 +44,38 @@ object config {
)
object EnrichConfig {
def apply(args: Args): Validation[String, EnrichConfig] = for {
input <- args.optional("input").toSuccess("Missing `input` argument")
output <- args.optional("output").toSuccess("Missing `output` argument")
bad <- args.optional("bad").toSuccess("Missing `bad` argument")
resolver <- args.optional("resolver").toSuccess("Missing `resolver` argument")
} yield EnrichConfig(input, output, bad, resolver, args.optional("enrichments"))
_ <- if (args.optional("help").isDefined) helpString(configurations).failure else "".success
l <- configurations.collect {
case RequiredConfiguration(key, _) => args.optional(key).toSuccess(s"Missing `$key` argument").toValidationNel
}.sequenceU.leftMap(_.toList.mkString("\n"))
List(jobName, input, output, bad, resolver) = l
} yield EnrichConfig(jobName, input, output, bad, resolver, args.optional("enrichments"))

private val configurations = List(
RequiredConfiguration("job-name", "Name of the Dataflow job that will be launched"),
RequiredConfiguration("input", "Name of the subscription to the input topic projects/{project}/subscriptions/{subscription}"),
RequiredConfiguration("output", "Name of the output topic projects/{project}/topics/{topic}"),
RequiredConfiguration("bad", "Name of the bad topic projects/{project}/topics/{topic}"),
RequiredConfiguration("resolver", "Path to the resolver file"),
OptionalConfiguration("enrichments", "Path to the directory containing the enrichment files")
)

private def helpString(configs: List[Configuration]): String =
"Possible configuration are:\n" +
configs.map {
case OptionalConfiguration(key, desc) => s"--$key=VALUE, optional, $desc"
case RequiredConfiguration(key, desc) => s"--$key=VALUE, required, $desc"
}.mkString("\n") +
"\n--help, Display this message" +
"\nA full list of all the Beam CLI options can be found at: https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options"
}

sealed trait Configuration {
def key: String
def desc: String
}
final case class OptionalConfiguration(key: String, desc: String) extends Configuration
final case class RequiredConfiguration(key: String, desc: String) extends Configuration

final case class ParsedEnrichConfig(
input: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0",
"data": {
"name": "ip_lookups",
"vendor": "com.snowplowanalytics.snowplow",
"enabled": true,
"parameters": {
"geo": {
"database": "GeoLite2-City.mmdb",
"uri": "http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,49 @@ import SpecHelpers._
class ConfigSpec extends FreeSpec {
"the config object should" - {
"make an EnrichConfig smart ctor available" - {
"which fails if --job-name is not present" in {
EnrichConfig(Args(Array.empty)) shouldEqual Failure(
"Missing `job-name` argument\n" +
"Missing `input` argument\n" +
"Missing `output` argument\n" +
"Missing `bad` argument\n" +
"Missing `resolver` argument"
)
}
"which fails if --input is not present" in {
EnrichConfig(Args(Array.empty)) shouldEqual Failure("Missing `input` argument")
EnrichConfig(Args(Array("--job-name=j"))) shouldEqual Failure(
"Missing `input` argument\n" +
"Missing `output` argument\n" +
"Missing `bad` argument\n" +
"Missing `resolver` argument"
)
}
"which fails if --output is not present" in {
EnrichConfig(Args(Array("--input=i"))) shouldEqual Failure("Missing `output` argument")
EnrichConfig(Args(Array("--job-name=j", "--input=i"))) shouldEqual Failure(
"Missing `output` argument\n" +
"Missing `bad` argument\n" +
"Missing `resolver` argument"
)
}
"which fails if --bad is not present" in {
EnrichConfig(Args(Array("--input=i", "--output=o"))) shouldEqual
Failure("Missing `bad` argument")
EnrichConfig(Args(Array("--job-name=j", "--input=i", "--output=o"))) shouldEqual Failure(
"Missing `bad` argument\n" +
"Missing `resolver` argument"
)
}
"which fails if --resolver is not present" in {
EnrichConfig(Args(Array("--input=i", "--output=o", "--bad=b"))) shouldEqual
EnrichConfig(Args(Array("--job-name=j", "--input=i", "--output=o", "--bad=b"))) shouldEqual
Failure("Missing `resolver` argument")
}
"which succeeds otherwise" in {
EnrichConfig(Args(Array("--input=i", "--output=o", "--bad=b", "--resolver=r"))) shouldEqual
Success(EnrichConfig("i", "o", "b", "r", None))
EnrichConfig(Args(
Array("--job-name=j", "--input=i", "--output=o", "--bad=b", "--resolver=r"))) shouldEqual
Success(EnrichConfig("j", "i", "o", "b", "r", None))
}
"which succeeds if --enrichments is present" in {
val args = Args(
Array("--input=i", "--output=o", "--bad=b", "--resolver=r", "--enrichments=e"))
EnrichConfig(args) shouldEqual Success(EnrichConfig("i", "o", "b", "r", Some("e")))
val args = Args(Array(
"--job-name=j", "--input=i", "--output=o", "--bad=b", "--resolver=r", "--enrichments=e"))
EnrichConfig(args) shouldEqual Success(EnrichConfig("j", "i", "o", "b", "r", Some("e")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.beam

import java.nio.file.Paths
import java.nio.file.{Path, Paths}

import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64
Expand All @@ -33,7 +33,7 @@ class EnrichSpec extends PipelineSpec {
"ssc-0.1.0-stdout",
s"beam-enrich-${generated.BuildInfo.version}",
"alex 123",
"10.0.2.x",
"10.0.2.2",
"1804954790",
"3c1757544e39bca4",
"26",
Expand Down Expand Up @@ -82,9 +82,10 @@ class EnrichSpec extends PipelineSpec {

"Enrich" should "enrich a struct event" in {
JobTest[Enrich.type]
.args("--input=in", "--output=out", "--bad=bad",
.args("--job-name=j", "--input=in", "--output=out", "--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()))
.input(PubsubIO("in"), raw.map(Base64.decodeBase64))
.distCache(DistCacheIO(""), List.empty[Either[String, Path]])
.output(PubsubIO[String]("out"))(_ should satisfy { c: Iterable[String] =>
c.size == 1 && expected.forall(c.head.contains)
})
Expand Down

0 comments on commit b318030

Please sign in to comment.