Skip to content

Commit

Permalink
Spark Enrich: apply automated code formatting (closes #3655)
Browse files Browse the repository at this point in the history
  • Loading branch information
knservis committed Mar 9, 2018
1 parent 68a1604 commit 0e5f769
Show file tree
Hide file tree
Showing 51 changed files with 492 additions and 338 deletions.
11 changes: 7 additions & 4 deletions 3-enrich/spark-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
lazy val root = project.in(file("."))
lazy val root = project
.in(file("."))
.settings(
name := "snowplow-spark-enrich",
version := "1.12.0",
name := "snowplow-spark-enrich",
version := "1.12.0",
description := "The Snowplow Spark Enrichment process"
)
.settings(BuildSettings.formatting)
Expand All @@ -41,4 +42,6 @@ lazy val root = project.in(file("."))
)
)

shellPrompt := { _ => "spark-enrich> " }
shellPrompt := { _ =>
"spark-enrich> "
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ object EnrichJob extends SparkJob {
classOf[Array[EnrichedEvent]],
classOf[com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload],
classOf[Array[com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload]],
Class.forName("com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields"),
Class.forName("[Lcom.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields;"),
Class.forName(
"com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields"),
Class.forName(
"[Lcom.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields;"),
classOf[com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent],
classOf[Array[com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent]],
Class.forName("com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent$_Fields"),
Expand All @@ -74,11 +76,12 @@ object EnrichJob extends SparkJob {
classOf[org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage],
classOf[org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult]
)
override def sparkConfig(): SparkConf = new SparkConf()
.setAppName(getClass().getSimpleName())
.setIfMissing("spark.master", "local[*]")
.set("spark.serializer", classOf[KryoSerializer].getName())
.registerKryoClasses(classesToRegister)
override def sparkConfig(): SparkConf =
new SparkConf()
.setAppName(getClass().getSimpleName())
.setIfMissing("spark.master", "local[*]")
.set("spark.serializer", classOf[KryoSerializer].getName())
.registerKryoClasses(classesToRegister)

override def run(spark: SparkSession, args: Array[String]): Unit = {
val job = EnrichJob(spark, args)
Expand Down Expand Up @@ -114,8 +117,11 @@ object EnrichJob extends SparkJob {
def enrich(line: Any, config: ParsedEnrichJobConfig): (Any, List[ValidatedEnrichedEvent]) = {
import singleton._
val registry = RegistrySingleton.get(config.igluConfig, config.enrichments, config.local)
val loader = LoaderSingleton.get(config.inFormat).asInstanceOf[Loader[Any]]
val event = EtlPipeline.processEvents(registry, etlVersion, config.etlTstamp,
val loader = LoaderSingleton.get(config.inFormat).asInstanceOf[Loader[Any]]
val event = EtlPipeline.processEvents(
registry,
etlVersion,
config.etlTstamp,
loader.toCollectorPayload(line))(ResolverSingleton.get(config.igluConfig))
(line, event)
}
Expand Down Expand Up @@ -156,10 +162,12 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
hadoopConfig.set("io.compression.codec.lzo.class", classOf[LzoCodec].getName())

// Job configuration
private val enrichConfig = EnrichJobConfig.loadConfigFrom(args).fold(
e => throw FatalEtlError(e.map(_.toString)),
identity
)
private val enrichConfig = EnrichJobConfig
.loadConfigFrom(args)
.fold(
e => throw FatalEtlError(e.map(_.toString)),
identity
)

/**
* Run the enrich job by:
Expand Down Expand Up @@ -189,25 +197,32 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
// Handling of malformed rows
val bad = common
.map { case (line, enriched) => (line, projectBads(enriched)) }
.flatMap { case (line, errors) =>
val originalLine = line match {
case bytes: Array[Byte] => new String(Base64.encodeBase64(bytes), "UTF-8")
case other => other.toString
}
errors.map(e => Row(BadRow(originalLine, e).toCompactJson))
.flatMap {
case (line, errors) =>
val originalLine = line match {
case bytes: Array[Byte] => new String(Base64.encodeBase64(bytes), "UTF-8")
case other => other.toString
}
errors.map(e => Row(BadRow(originalLine, e).toCompactJson))
}
spark.createDataFrame(bad, StructType(StructField("_", StringType, true) :: Nil))
spark
.createDataFrame(bad, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
.text(enrichConfig.badFolder)

// Handling of properly-formed rows
val good = common
.flatMap { case (_, enriched) => projectGoods(enriched) }
spark.createDataset(good)(Encoders.bean(classOf[EnrichedEvent]))
spark
.createDataset(good)(Encoders.bean(classOf[EnrichedEvent]))
.toDF()
// hack to preserve the order of the fields in the csv, otherwise it's alphabetical
.select(classOf[EnrichedEvent].getDeclaredFields().filterNot(_.getName.equals("pii")).map(f => col(f.getName())): _*)
.select(
classOf[EnrichedEvent]
.getDeclaredFields()
.filterNot(_.getName.equals("pii"))
.map(f => col(f.getName())): _*)
.write
.option("sep", "\t")
.option("escape", "")
Expand All @@ -221,17 +236,16 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
* @param inFormat Collector format in which the data is coming in
* @return A RDD containing strings or byte arrays
*/
private def getInputRDD(inFormat: String, path: String): RDD[_] = {
private def getInputRDD(inFormat: String, path: String): RDD[_] =
inFormat match {
case "thrift" =>
MultiInputFormat.setClassConf(classOf[Array[Byte]], hadoopConfig)
sc.newAPIHadoopFile[
LongWritable,
BinaryWritable[Array[Byte]],
MultiInputFormat[Array[Byte]]
](path)
LongWritable,
BinaryWritable[Array[Byte]],
MultiInputFormat[Array[Byte]]
](path)
.map(_._2.get())
case _ => sc.textFile(path)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,43 @@ case class ParsedEnrichJobConfig(
object EnrichJobConfig {
private val parser = new scopt.OptionParser[RawEnrichJobConfig]("EnrichJob") {
head("EnrichJob")
opt[String]("input-folder").required().valueName("<input folder>")
opt[String]("input-folder")
.required()
.valueName("<input folder>")
.action((f, c) => c.copy(inFolder = f))
.text("Folder where the input events are located")
opt[String]("input-format").required().valueName("<input format>")
opt[String]("input-format")
.required()
.valueName("<input format>")
.action((f, c) => c.copy(inFormat = f))
.text("The format in which the collector is saving data")
opt[String]("output-folder").required().valueName("<output folder>")
opt[String]("output-folder")
.required()
.valueName("<output folder>")
.action((f, c) => c.copy(outFolder = f))
.text("Output folder where the enriched events will be stored")
opt[String]("bad-folder").required().valueName("<bad folder>")
opt[String]("bad-folder")
.required()
.valueName("<bad folder>")
.action((f, c) => c.copy(badFolder = f))
.text("Output folder where the malformed events will be stored")
opt[String]("enrichments").required().valueName("<enrichments>")
opt[String]("enrichments")
.required()
.valueName("<enrichments>")
.action((e, c) => c.copy(enrichments = e))
.text("Directory where the JSONs describing the enrichments are stored")
opt[String]("iglu-config").required().valueName("<iglu config>")
opt[String]("iglu-config")
.required()
.valueName("<iglu config>")
.action((i, c) => c.copy(igluConfig = i))
.text("Iglu resolver configuration")
opt[Long]("etl-timestamp").required().valueName("<ETL timestamp>")
opt[Long]("etl-timestamp")
.required()
.valueName("<ETL timestamp>")
.action((t, c) => c.copy(etlTstamp = t))
.text("Timestamp at which the job was launched, in milliseconds")
opt[Unit]("local").hidden()
opt[Unit]("local")
.hidden()
.action((_, c) => c.copy(local = true))
.text("Whether to build a local enrichment registry")
help("help").text("Prints this usage text")
Expand All @@ -118,11 +133,20 @@ object EnrichJobConfig {
val resolver = ResolverSingleton.getIgluResolver(c.igluConfig)
val registry = resolver
.flatMap(RegistrySingleton.getEnrichmentRegistry(c.enrichments, c.local)(_))
val loader = Loader.getLoader(c.inFormat)
val loader = Loader
.getLoader(c.inFormat)
.fold(_.toProcessingMessage.failureNel, _.successNel)
(resolver |@| registry |@| loader) { (_, reg, _) =>
ParsedEnrichJobConfig(c.inFolder, c.inFormat, c.outFolder, c.badFolder,
c.enrichments, c.igluConfig, c.local, new DateTime(c.etlTstamp), filesToCache(reg))
ParsedEnrichJobConfig(
c.inFolder,
c.inFormat,
c.outFolder,
c.badFolder,
c.enrichments,
c.igluConfig,
c.local,
new DateTime(c.etlTstamp),
filesToCache(reg))
}
}

Expand All @@ -133,12 +157,11 @@ object EnrichJobConfig {
*/
def loadConfigFrom(
args: Array[String]
): ValidatedNelMessage[ParsedEnrichJobConfig] = {
): ValidatedNelMessage[ParsedEnrichJobConfig] =
parser.parse(args, RawEnrichJobConfig()).map(transform) match {
case Some(c) => c
case _ => "Parsing of the configuration failed".toProcessingMessage.failureNel
case _ => "Parsing of the configuration failed".toProcessingMessage.failureNel
}
}

/**
* Build the list of enrichment files to cache.
Expand All @@ -148,6 +171,6 @@ object EnrichJobConfig {
private def filesToCache(registry: EnrichmentRegistry): List[(URI, String)] =
registry.getIpLookupsEnrichment match {
case Some(ipLookups) => ipLookups.dbsToCache
case None => Nil
case None => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ trait SparkJob {

def main(args: Array[String]): Unit = {
val config = sparkConfig()
val spark = SparkSession.builder()
val spark = SparkSession
.builder()
.config(config)
.getOrCreate()
run(spark, args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import iglu.client.validation.ProcessingMessageMethods._

/** Singletons needed for unserializable classes. */
object singleton {

/** Singleton for Iglu's Resolver to maintain one Resolver per node. */
object ResolverSingleton {
@volatile private var instance: Resolver = _

/**
* Retrieve or build an instance of Iglu's Resolver.
* @param igluConfig JSON representing the Iglu configuration
Expand Down Expand Up @@ -67,7 +69,8 @@ object singleton {
/** Singleton for EnrichmentRegistry. */
object RegistrySingleton {
@volatile private var instance: EnrichmentRegistry = _
@volatile private var enrichments: String = _
@volatile private var enrichments: String = _

/**
* Retrieve or build an instance of EnrichmentRegistry.
* @param igluConfig JSON representing the Iglu configuration
Expand Down Expand Up @@ -95,21 +98,21 @@ object singleton {
* @param resolver (implicit) The Iglu resolver used for schema lookup and validation
* @return An EnrichmentRegistry or one or more error messages boxed in a Scalaz ValidationNel
*/
private[spark] def getEnrichmentRegistry(enrichments: String, local: Boolean
)(implicit resolver: Resolver): ValidatedNelMessage[EnrichmentRegistry] = {
private[spark] def getEnrichmentRegistry(enrichments: String, local: Boolean)(
implicit resolver: Resolver): ValidatedNelMessage[EnrichmentRegistry] =
for {
node <- base64ToJsonNode(enrichments, "enrichments")
.toValidationNel: ValidatedNelMessage[JsonNode]
node <- base64ToJsonNode(enrichments, "enrichments").toValidationNel: ValidatedNelMessage[
JsonNode]
reg <- EnrichmentRegistry.parse(fromJsonNode(node), local)
} yield reg
}
}

/** Singleton for Loader. */
object LoaderSingleton {
import common.loaders.Loader
@volatile private var instance: Loader[_] = _
@volatile private var inFormat: String = _
@volatile private var inFormat: String = _

/**
* Retrieve or build an instance of EnrichmentRegistry.
* @param inFormat Collector format in which the data is coming in
Expand All @@ -118,7 +121,8 @@ object singleton {
if (instance == null || this.inFormat != inFormat) {
synchronized {
if (instance == null || this.inFormat != inFormat) {
instance = Loader.getLoader(inFormat)
instance = Loader
.getLoader(inFormat)
.valueOr(e => throw new FatalEtlError(e.toString))
this.inFormat = inFormat
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.specs2.specification.Step
* TODO: To remove once specs2 has been updated.
*/
trait BeforeAfterAll extends SpecificationLike {
override def map(fragments: =>Fragments) =
override def map(fragments: => Fragments) =
Step(beforeAll) ^ fragments ^ Step(afterAll)

def beforeAll(): Unit
Expand Down

0 comments on commit 0e5f769

Please sign in to comment.