Skip to content

Commit

Permalink
Beam Enrich: support enrichments relying on local files (closes #3736)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Jul 9, 2018
1 parent 9193ce6 commit 76c0e51
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 4 deletions.
15 changes: 15 additions & 0 deletions 3-enrich/beam-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ lazy val compilerOptions = Seq(
"-Xfuture"
)

lazy val resolutionRepos = Seq(
// For Snowplow
"Snowplow Analytics Maven releases repo" at "http://maven.snplow.com/releases/",
// For ua-parser
"user-agent-parser repo" at "https://clojars.org/repo/"
)

// we fork a JVM per test in order to not reuse enrichment registries
import Tests._
{
def oneJVMPerTest(tests: Seq[TestDefinition]) =
tests.map(t => new Group(t.name, Seq(t), SubProcess(ForkOptions()))).toSeq
testGrouping in Test := oneJVMPerTest((definedTests in Test).value)
}

lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
organization := "com.snowplowanalytics",
version := "0.1.0-SNAPSHOT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ package snowplow
package enrich
package beam

import java.io.File
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Path, Paths}

import scala.util.Try

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.values.{DistCache, SCollection}
import org.apache.commons.codec.binary.Base64
import org.json4s.{JObject, JValue}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import scalaz._
Expand Down Expand Up @@ -65,7 +71,7 @@ object Enrich {

parsedConfig match {
case Failure(e) =>
System.err.println(s"An error occured: $e")
System.err.println(s"An error occured parsing the configuration: $e")
System.exit(1)
case Success(config) =>
run(sc, config)
Expand All @@ -74,9 +80,18 @@ object Enrich {
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
// Path is not serializable
val cachedFiles: DistCache[List[Either[String, String]]] = {
val filesToCache = getFilesToCache(config.resolver, config.enrichmentRegistry)
sc.distCache(filesToCache.map(_._1.toString)) { files =>
createSymLinks(files.toList.zip(filesToCache.map(_._2))).map(_.map(_.toString))
}
}

val input: SCollection[Array[Byte]] = sc.pubsubTopic(config.input).withName("input")
val enriched: SCollection[Validation[BadRow, EnrichedEvent]] = input
.map { rawEvent =>
cachedFiles()
implicit val resolver = ResolverSingleton.get(config.resolver)
enrich(rawEvent, EnrichmentRegistrySingleton.get(config.enrichmentRegistry))
}
Expand Down Expand Up @@ -114,4 +129,30 @@ object Enrich {
BadRow(line, errors).failure
}
}

def createSymLinks(filesToCache: List[(File, String)]): List[Either[String, Path]] = filesToCache
.map { case (file, symLink) =>
val link = createSymLink(file, symLink)
link match {
case Right(p) => logger.info(s"File $file cached at $p")
case Left(e) => logger.warn(s"File $file could not be cached: $e")
}
link
}

def createSymLink(file: File, symLink: String): Either[String, Path] = {
val symLinkPath = Paths.get(symLink)
if (Files.notExists(symLinkPath)) {
Try(Files.createSymbolicLink(symLinkPath, file.toPath)) match {
case scala.util.Success(p) => Right(p)
case scala.util.Failure(t) => Left(t.getMessage)
}
} else Left(s"Symlink $symLinkPath already exists")
}

def getFilesToCache(resolverJson: JValue, registryJson: JObject): List[(URI, String)] = {
implicit val resolver = ResolverSingleton.get(resolverJson)
val registry = EnrichmentRegistrySingleton.get(registryJson)
registry.getIpLookupsEnrichment.map(_.dbsToCache).getOrElse(Nil)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0",
"data": {
"name": "ip_lookups",
"vendor": "com.snowplowanalytics.snowplow",
"enabled": true,
"parameters": {
"geo": {
"database": "GeoLite2-City.mmdb",
"uri": "http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.beam

import java.nio.file.Paths
import java.nio.file.{Path, Paths}

import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64
Expand All @@ -33,7 +33,7 @@ class EnrichSpec extends PipelineSpec {
"ssc-0.1.0-stdout",
s"beam-enrich-${generated.BuildInfo.version}",
"alex 123",
"10.0.2.x",
"10.0.2.2",
"1804954790",
"3c1757544e39bca4",
"26",
Expand Down Expand Up @@ -85,6 +85,7 @@ class EnrichSpec extends PipelineSpec {
.args("--input=in", "--output=out", "--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()))
.input(PubsubIO("in"), raw.map(Base64.decodeBase64))
.distCache(DistCacheIO(""), List.empty[Either[String, Path]])
.output(PubsubIO[String]("out"))(_ should satisfy { c: Iterable[String] =>
c.size == 1 && expected.forall(c.head.contains)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.beam

import java.io.File
import java.net.URL
import java.nio.file.Paths

import scala.sys.process._

import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64

class EnrichWithLocalFileSpec extends PipelineSpec {

val raw = Seq("CwBkAAAADTM3LjIyOC4yMjUuMzIKAMgAAAFjiJGp1QsA0gAAAAVVVEYtOAsA3AAAABJzc2MtMC4xMy4wLXN0ZG91dCQLASwAAAALY3VybC83LjUwLjMLAUAAAAAjL2NvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy90cDILAVQAAAFpeyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9wYXlsb2FkX2RhdGEvanNvbnNjaGVtYS8xLTAtNCIsImRhdGEiOlt7InR2IjoidHJhY2tlcl92ZXJzaW9uIiwiZSI6InVlIiwicCI6IndlYiIsInVlX3ByIjoie1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy91bnN0cnVjdF9ldmVudC9qc29uc2NoZW1hLzEtMC0wXCIsXCJkYXRhXCI6e1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9zY3JlZW5fdmlldy9qc29uc2NoZW1hLzEtMC0wXCIsXCJkYXRhXCI6e1wibmFtZVwiOlwiaGVsbG8gZnJvbSBTbm93cGxvd1wifX19In1dfQ8BXgsAAAAFAAAAO0hvc3Q6IGVjMi0zNC0yNDUtMzItNDcuZXUtd2VzdC0xLmNvbXB1dGUuYW1hem9uYXdzLmNvbToxMjM0AAAAF1VzZXItQWdlbnQ6IGN1cmwvNy41MC4zAAAAC0FjY2VwdDogKi8qAAAAG1RpbWVvdXQtQWNjZXNzOiA8ZnVuY3Rpb24xPgAAABBhcHBsaWNhdGlvbi9qc29uCwFoAAAAEGFwcGxpY2F0aW9uL2pzb24LAZAAAAAwZWMyLTM0LTI0NS0zMi00Ny5ldS13ZXN0LTEuY29tcHV0ZS5hbWF6b25hd3MuY29tCwGaAAAAJDEwZDk2YmM3LWU0MDAtNGIyOS04YTQxLTY5MTFhZDAwZWU5OAt6aQAAAEFpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9Db2xsZWN0b3JQYXlsb2FkL3RocmlmdC8xLTAtMAA=")
val expected = List(
"web",
"2018-05-22 15:57:17.653",
"unstruct",
"tracker_version",
"ssc-0.13.0-stdout$",
s"beam-enrich-${generated.BuildInfo.version}",
"37.228.225.32",
"10d96bc7-e400-4b29-8a41-6911ad00ee98",
"IE",
"L",
"Dublin",
"D02",
"53.3331",
"-6.2489",
"Leinster",
"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/screen_view/jsonschema/1-0-0","data":{"name":"hello from Snowplow"}}}""",
"curl/7.50.3",
"com.snowplowanalytics.snowplow",
"screen_view",
"jsonschema",
"1-0-0"
)

"Enrich" should "enrich a unstruct event with geo ip information" in {
downloadLocalEnrichmentFile(
"http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind/GeoLite2-City.mmdb",
"./ip_geo"
)

JobTest[Enrich.type]
.args("--input=in", "--output=out", "--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()),
"--enrichments=" + Paths.get(getClass.getResource("/enrichments").toURI()))
.input(PubsubIO("in"), raw.map(Base64.decodeBase64))
.distCache(DistCacheIO("http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind/GeoLite2-City.mmdb"),
List(Right("./ip_geo")))
.output(PubsubIO[String]("out"))(_ should satisfy { c: Iterable[String] =>
c.size == 1 && expected.forall(c.head.contains)
})
.output(PubsubIO[String]("bad"))(_ should beEmpty)
.run()

deleteLocalFile("./ip_geo")
}

private def downloadLocalEnrichmentFile(remoteLocation: String, localLocation: String): Unit =
new URL(remoteLocation).#>(new File(localLocation)).!!

private def deleteLocalFile(location: String): Unit = new File(location).delete

}

0 comments on commit 76c0e51

Please sign in to comment.