diff --git a/3-enrich/spark-enrich/build.sbt b/3-enrich/spark-enrich/build.sbt
index 0b898da303..09ce30767f 100644
--- a/3-enrich/spark-enrich/build.sbt
+++ b/3-enrich/spark-enrich/build.sbt
@@ -12,10 +12,11 @@
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
-lazy val root = project.in(file("."))
+lazy val root = project
+ .in(file("."))
.settings(
- name := "snowplow-spark-enrich",
- version := "1.13.0",
+ name := "snowplow-spark-enrich",
+ version := "1.13.0",
description := "The Snowplow Spark Enrichment process"
)
.settings(BuildSettings.formatting)
@@ -42,4 +43,6 @@ lazy val root = project.in(file("."))
)
)
-shellPrompt := { _ => "spark-enrich> " }
+shellPrompt := { _ =>
+ "spark-enrich> "
+}
diff --git a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala
index a063ffb0c7..ecb7a9856a 100644
--- a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala
+++ b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJob.scala
@@ -58,8 +58,10 @@ object EnrichJob extends SparkJob {
classOf[Array[EnrichedEvent]],
classOf[com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload],
classOf[Array[com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload]],
- Class.forName("com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields"),
- Class.forName("[Lcom.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields;"),
+ Class.forName(
+ "com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields"),
+ Class.forName(
+ "[Lcom.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload$_Fields;"),
classOf[com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent],
classOf[Array[com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent]],
Class.forName("com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent$_Fields"),
@@ -74,11 +76,12 @@ object EnrichJob extends SparkJob {
classOf[org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage],
classOf[org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult]
)
- override def sparkConfig(): SparkConf = new SparkConf()
- .setAppName(getClass().getSimpleName())
- .setIfMissing("spark.master", "local[*]")
- .set("spark.serializer", classOf[KryoSerializer].getName())
- .registerKryoClasses(classesToRegister)
+ override def sparkConfig(): SparkConf =
+ new SparkConf()
+ .setAppName(getClass().getSimpleName())
+ .setIfMissing("spark.master", "local[*]")
+ .set("spark.serializer", classOf[KryoSerializer].getName())
+ .registerKryoClasses(classesToRegister)
override def run(spark: SparkSession, args: Array[String]): Unit = {
val job = EnrichJob(spark, args)
@@ -114,8 +117,11 @@ object EnrichJob extends SparkJob {
def enrich(line: Any, config: ParsedEnrichJobConfig): (Any, List[ValidatedEnrichedEvent]) = {
import singleton._
val registry = RegistrySingleton.get(config.igluConfig, config.enrichments, config.local)
- val loader = LoaderSingleton.get(config.inFormat).asInstanceOf[Loader[Any]]
- val event = EtlPipeline.processEvents(registry, etlVersion, config.etlTstamp,
+ val loader = LoaderSingleton.get(config.inFormat).asInstanceOf[Loader[Any]]
+ val event = EtlPipeline.processEvents(
+ registry,
+ etlVersion,
+ config.etlTstamp,
loader.toCollectorPayload(line))(ResolverSingleton.get(config.igluConfig))
(line, event)
}
@@ -156,10 +162,12 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
hadoopConfig.set("io.compression.codec.lzo.class", classOf[LzoCodec].getName())
// Job configuration
- private val enrichConfig = EnrichJobConfig.loadConfigFrom(args).fold(
- e => throw FatalEtlError(e.map(_.toString)),
- identity
- )
+ private val enrichConfig = EnrichJobConfig
+ .loadConfigFrom(args)
+ .fold(
+ e => throw FatalEtlError(e.map(_.toString)),
+ identity
+ )
/**
* Run the enrich job by:
@@ -189,14 +197,16 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
// Handling of malformed rows
val bad = common
.map { case (line, enriched) => (line, projectBads(enriched)) }
- .flatMap { case (line, errors) =>
- val originalLine = line match {
- case bytes: Array[Byte] => new String(Base64.encodeBase64(bytes), "UTF-8")
- case other => other.toString
- }
- errors.map(e => Row(BadRow(originalLine, e).toCompactJson))
+ .flatMap {
+ case (line, errors) =>
+ val originalLine = line match {
+ case bytes: Array[Byte] => new String(Base64.encodeBase64(bytes), "UTF-8")
+ case other => other.toString
+ }
+ errors.map(e => Row(BadRow(originalLine, e).toCompactJson))
}
- spark.createDataFrame(bad, StructType(StructField("_", StringType, true) :: Nil))
+ spark
+ .createDataFrame(bad, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
.text(enrichConfig.badFolder)
@@ -204,10 +214,15 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
// Handling of properly-formed rows
val good = common
.flatMap { case (_, enriched) => projectGoods(enriched) }
- spark.createDataset(good)(Encoders.bean(classOf[EnrichedEvent]))
+ spark
+ .createDataset(good)(Encoders.bean(classOf[EnrichedEvent]))
.toDF()
// hack to preserve the order of the fields in the csv, otherwise it's alphabetical
- .select(classOf[EnrichedEvent].getDeclaredFields().filterNot(_.getName.equals("pii")).map(f => col(f.getName())): _*)
+ .select(
+ classOf[EnrichedEvent]
+ .getDeclaredFields()
+ .filterNot(_.getName.equals("pii"))
+ .map(f => col(f.getName())): _*)
.write
.option("sep", "\t")
.option("escape", "")
@@ -221,17 +236,16 @@ class EnrichJob(@transient val spark: SparkSession, args: Array[String]) extends
* @param inFormat Collector format in which the data is coming in
* @return A RDD containing strings or byte arrays
*/
- private def getInputRDD(inFormat: String, path: String): RDD[_] = {
+ private def getInputRDD(inFormat: String, path: String): RDD[_] =
inFormat match {
case "thrift" =>
MultiInputFormat.setClassConf(classOf[Array[Byte]], hadoopConfig)
sc.newAPIHadoopFile[
- LongWritable,
- BinaryWritable[Array[Byte]],
- MultiInputFormat[Array[Byte]]
- ](path)
+ LongWritable,
+ BinaryWritable[Array[Byte]],
+ MultiInputFormat[Array[Byte]]
+ ](path)
.map(_._2.get())
case _ => sc.textFile(path)
}
- }
}
diff --git a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobConfig.scala b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobConfig.scala
index c45f586d32..07222e20d0 100644
--- a/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobConfig.scala
+++ b/3-enrich/spark-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobConfig.scala
@@ -82,28 +82,43 @@ case class ParsedEnrichJobConfig(
object EnrichJobConfig {
private val parser = new scopt.OptionParser[RawEnrichJobConfig]("EnrichJob") {
head("EnrichJob")
- opt[String]("input-folder").required().valueName("")
+ opt[String]("input-folder")
+ .required()
+ .valueName("")
.action((f, c) => c.copy(inFolder = f))
.text("Folder where the input events are located")
- opt[String]("input-format").required().valueName("")
+ opt[String]("input-format")
+ .required()
+ .valueName("")
.action((f, c) => c.copy(inFormat = f))
.text("The format in which the collector is saving data")
- opt[String]("output-folder").required().valueName("