Skip to content

Commit

Permalink
Dist cache #3736
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed May 28, 2018
1 parent 3551913 commit 71c961e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ package snowplow
package enrich
package beam

import java.io.File
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Path, Paths}

import scala.util.Try

import com.spotify.scio._
import com.spotify.scio.values.SCollection
import com.spotify.scio.values.{DistCache, SCollection}
import org.apache.commons.codec.binary.Base64
import org.json4s.{JObject, JValue}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import scalaz._
Expand Down Expand Up @@ -65,7 +71,7 @@ object Enrich {

parsedConfig match {
case Failure(e) =>
System.err.println(s"An error occured: $e")
System.err.println(s"An error occured parsing the configuration: $e")
System.exit(1)
case Success(config) =>
run(sc, config)
Expand All @@ -74,6 +80,13 @@ object Enrich {
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
val cachedFiles: DistCache[List[Either[String, Path]]] = {
val filesToCache = getFilesToCache(config.resolver, config.enrichmentRegistry)
sc.distCache(filesToCache.map(_._1.toString)) { files =>
createSymLinks(files.toList.zip(filesToCache.map(_._2)))
}
}

val input: SCollection[Array[Byte]] = sc.pubsubTopic(config.input).withName("input")
val enriched: SCollection[Validation[BadRow, EnrichedEvent]] = input
.map { rawEvent =>
Expand Down Expand Up @@ -114,4 +127,30 @@ object Enrich {
BadRow(line, errors).failure
}
}

def createSymLinks(filesToCache: List[(File, String)]): List[Either[String, Path]] = filesToCache
.map { case (file, symLink) =>
val link = createSymLink(file, symLink)
link match {
case Right(p) => logger.info(s"File $file cached at $p")
case Left(e) => logger.warn(s"File $file could not be cached: $e")
}
link
}

def createSymLink(file: File, symLink: String): Either[String, Path] = {
val symLinkPath = Paths.get(symLink)
if (Files.notExists(symLinkPath)) {
Try(Files.createSymbolicLink(symLinkPath, file.toPath)) match {
case scala.util.Success(p) => Right(p)
case scala.util.Failure(t) => Left(t.getMessage)
}
} else Left(s"Symlink $symLinkPath already exists")
}

def getFilesToCache(resolverJson: JValue, registryJson: JObject): List[(URI, String)] = {
implicit val resolver = ResolverSingleton.get(resolverJson)
val registry = EnrichmentRegistrySingleton.get(registryJson)
registry.getIpLookupsEnrichment.map(_.dbsToCache).getOrElse(Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.beam

import java.nio.file.Paths
import java.nio.file.{Path, Paths}

import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64
Expand Down Expand Up @@ -85,6 +85,7 @@ class EnrichSpec extends PipelineSpec {
.args("--input=in", "--output=out", "--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()))
.input(PubsubIO("in"), raw.map(Base64.decodeBase64))
.distCache(DistCacheIO(""), List.empty[Either[String, Path]])
.output(PubsubIO[String]("out"))(_ should satisfy { c: Iterable[String] =>
c.size == 1 && expected.forall(c.head.contains)
})
Expand Down

0 comments on commit 71c961e

Please sign in to comment.