From 37911e03d866ada280ba689d28b94a23c0d41d40 Mon Sep 17 00:00:00 2001 From: Konstantinos Servis Date: Wed, 17 Jan 2018 14:42:54 +0200 Subject: [PATCH] Scala Common Enrich: extend PII Enrichment to include identification events in EnrichedEvent (closes #3580) --- 3-enrich/scala-common-enrich/.scalafmt.conf | 2 +- .../enrichments/EnrichmentRegistry.scala | 2 +- .../registry/PiiPseudonymizerEnrichment.scala | 384 ------------------ .../enrichments/registry/pii/Mutators.scala | 176 ++++++++ .../pii/PiiPseudonymizerEnrichment.scala | 289 +++++++++++++ .../registry/pii/Serializers.scala | 74 ++++ .../enrichments/registry/pii/package.scala | 114 ++++++ .../common/outputs/EnrichedEvent.scala | 3 + .../registry/EnrichmentConfigsSpec.scala | 29 +- .../PiiPseudonymizerEnrichmentSpec.scala | 200 ++++++--- 10 files changed, 801 insertions(+), 472 deletions(-) delete mode 100644 3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala create mode 100644 3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Mutators.scala create mode 100644 3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala create mode 100644 3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Serializers.scala create mode 100644 3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/package.scala rename 3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/{ => pii}/PiiPseudonymizerEnrichmentSpec.scala (69%) diff --git a/3-enrich/scala-common-enrich/.scalafmt.conf b/3-enrich/scala-common-enrich/.scalafmt.conf index f3d64fc27a..5ade6cc4c5 100644 --- a/3-enrich/scala-common-enrich/.scalafmt.conf +++ b/3-enrich/scala-common-enrich/.scalafmt.conf @@ -12,4 +12,4 @@ rewrite.rules = [ RedundantParens, PreferCurlyFors ] -align.tokens = ["|", "!", "!!", "||", "=>", "=", "->", "<-", "|@|", "//", "~", "/", "+"] +align.tokens = ["|", "!", "!!", "||", "=>", "=", "->", "<-", "|@|", "//", "/", "+"] diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index 8ffbf4af74..9b7d392aff 100644 --- a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -47,7 +47,6 @@ import registry.{ IpLookupsEnrichment, JavascriptScriptEnrichment, JavascriptScriptEnrichmentConfig, - PiiPseudonymizerEnrichment, RefererParserEnrichment, UaParserEnrichment, UaParserEnrichmentConfig, @@ -57,6 +56,7 @@ import registry.{ WeatherEnrichmentConfig } import registry.apirequest.{ApiRequestEnrichment, ApiRequestEnrichmentConfig} +import registry.pii.PiiPseudonymizerEnrichment import registry.sqlquery.{SqlQueryEnrichment, SqlQueryEnrichmentConfig} import utils.ScalazJson4sUtils diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala deleted file mode 100644 index 556afa5dde..0000000000 --- a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ - -package com.snowplowanalytics -package snowplow.enrich -package common.enrichments.registry - -// Scala -import scala.collection.JavaConverters._ - -// Scala libraries -import org.json4s -import org.json4s.JValue -import org.json4s.JsonAST._ -import org.json4s.jackson.JsonMethods -import org.json4s.jackson.JsonMethods.{compact, parse, render} -import org.json4s.DefaultFormats - -// Java -import org.apache.commons.codec.digest.DigestUtils - -// Java libraries -import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper, SerializationFeature} -import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode, TextNode} -import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider -import com.jayway.jsonpath.{Configuration, JsonPath => JJsonPath, Option => JOption} -import com.jayway.jsonpath.MapFunction - -// Scalaz -import scalaz._ -import Scalaz._ - -// Iglu -import iglu.client.validation.ProcessingMessageMethods._ -import iglu.client.{SchemaCriterion, SchemaKey} - -// This project -import common.ValidatedNelMessage -import common.utils.ScalazJson4sUtils -import common.outputs.EnrichedEvent - -object PiiConstants { - type DigestFunction = Function1[Array[Byte], String] - type Mutator = (EnrichedEvent, String => String) => Unit - - /** - * This and the next constant maps from a config field name to an EnrichedEvent mutator. The structure is such so that - * it preserves type safety, and it can be easily replaced in the future by generated code that will use the config as - * input. - */ - val ScalarMutators: Map[String, Mutator] = Map( - "user_id" -> { (event: EnrichedEvent, fn: String => String) => - event.user_id = fn(event.user_id) - }, - "user_ipaddress" -> { (event: EnrichedEvent, fn: String => String) => - event.user_ipaddress = fn(event.user_ipaddress) - }, - "user_fingerprint" -> { (event: EnrichedEvent, fn: String => String) => - event.user_fingerprint = fn(event.user_fingerprint) - }, - "domain_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.domain_userid = fn(event.domain_userid) - }, - "network_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.network_userid = fn(event.network_userid) - }, - "ip_organization" -> { (event: EnrichedEvent, fn: String => String) => - event.ip_organization = fn(event.ip_organization) - }, - "ip_domain" -> { (event: EnrichedEvent, fn: String => String) => - event.ip_domain = fn(event.ip_domain) - }, - "tr_orderid" -> { (event: EnrichedEvent, fn: String => String) => - event.tr_orderid = fn(event.tr_orderid) - }, - "ti_orderid" -> { (event: EnrichedEvent, fn: String => String) => - event.ti_orderid = fn(event.ti_orderid) - }, - "mkt_term" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_term = fn(event.mkt_term) - }, - "mkt_content" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_content = fn(event.mkt_content) - }, - "se_category" -> { (event: EnrichedEvent, fn: String => String) => - event.se_category = fn(event.se_category) - }, - "se_action" -> { (event: EnrichedEvent, fn: String => String) => - event.se_action = fn(event.se_action) - }, - "se_label" -> { (event: EnrichedEvent, fn: String => String) => - event.se_label = fn(event.se_label) - }, - "se_property" -> { (event: EnrichedEvent, fn: String => String) => - event.se_property = fn(event.se_property) - }, - "mkt_clickid" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_clickid = fn(event.mkt_clickid) - }, - "refr_domain_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.refr_domain_userid = fn(event.refr_domain_userid) - }, - "domain_sessionid" -> { (event: EnrichedEvent, fn: String => String) => - event.domain_sessionid = fn(event.domain_sessionid) - } - ) - - val JsonMutators: Map[String, Mutator] = Map( - "contexts" -> { (event: EnrichedEvent, fn: String => String) => - event.contexts = fn(event.contexts) - }, - "derived_contexts" -> { (event: EnrichedEvent, fn: String => String) => - event.derived_contexts = fn(event.derived_contexts) - }, - "unstruct_event" -> { (event: EnrichedEvent, fn: String => String) => - event.unstruct_event = fn(event.unstruct_event) - } - ) -} - -/** - * PiiField trait. This corresponds to a configuration top-level field (i.e. either a scalar or a JSON field) along with - * a function to apply that strategy to the EnrichedEvent POJO (A scalar field is represented in config py "pojo") - */ -sealed trait PiiField { - import PiiConstants.Mutator - - /** - * Strategy for this field - * - * @return PiiStrategy a strategy to be applied to this field - */ - def strategy: PiiStrategy - - /** - * The POJO mutator for this field - * - * @return fieldMutator - */ - def fieldMutator: Mutator - - /** - * Gets an enriched event from the enrichment manager and modifies it according to the specified strategy. - * - * @param event The enriched event - */ - def transform(event: EnrichedEvent): Unit = fieldMutator(event, applyStrategy) - - protected def applyStrategy(fieldValue: String): String -} - -/** - * PiiStrategy trait. This corresponds to a strategy to apply to a single field. Currently only String input is - * supported. - */ -sealed trait PiiStrategy { - def scramble(clearText: String): String -} - -/** - * Companion object. Lets us create a PiiPseudonymizerEnrichment - * from a JValue. - */ -object PiiPseudonymizerEnrichment extends ParseableEnrichment { - import PiiConstants._ - - implicit val json4sFormats = DefaultFormats - - override val supportedSchema = - SchemaCriterion("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", 1, 0, 0) - - def parse(config: JValue, schemaKey: SchemaKey): ValidatedNelMessage[PiiPseudonymizerEnrichment] = { - for { - conf <- matchesSchema(config, schemaKey) - enabled = ScalazJson4sUtils.extract[Boolean](conf, "enabled").toOption.getOrElse(false) - piiFields <- ScalazJson4sUtils.extract[List[JObject]](conf, "parameters", "pii").leftMap(_.getMessage) - strategyFunction <- extractStrategyFunction(config) - hashFunction <- getHashFunction(strategyFunction) - piiFieldList <- extractFields(piiFields, PiiStrategyPseudonymize(hashFunction)) - } yield if (enabled) PiiPseudonymizerEnrichment(piiFieldList) else PiiPseudonymizerEnrichment(List()) - }.leftMap(_.toProcessingMessageNel) - - private def getHashFunction(strategyFunction: String): Validation[String, DigestFunction] = - strategyFunction match { - case "MD2" => { (b: Array[Byte]) => - DigestUtils.md2Hex(b) - }.success - case "MD5" => { (b: Array[Byte]) => - DigestUtils.md5Hex(b) - }.success - case "SHA-1" => { (b: Array[Byte]) => - DigestUtils.sha1Hex(b) - }.success - case "SHA-256" => { (b: Array[Byte]) => - DigestUtils.sha256Hex(b) - }.success - case "SHA-384" => { (b: Array[Byte]) => - DigestUtils.sha384Hex(b) - }.success - case "SHA-512" => { (b: Array[Byte]) => - DigestUtils.sha512Hex(b) - }.success - case fName => s"Unknown function $fName".failure - } - - private def extractFields(piiFields: List[JObject], strategy: PiiStrategy): Validation[String, List[PiiField]] = - piiFields.map { - case field: JObject => - if (ScalazJson4sUtils.fieldExists(field, "pojo")) - extractString(field, "pojo", "field").flatMap(extractPiiScalarField(strategy, _)) - else if (ScalazJson4sUtils.fieldExists(field, "json")) extractPiiJsonField(strategy, field \ "json") - else s"PII Configuration: pii field does not include 'pojo' nor 'json' fields. Got: [${compact(field)}]".failure - case json => s"PII Configuration: pii field does not contain an object. Got: [${compact(json)}]".failure - }.sequenceU - - private def extractPiiScalarField(strategy: PiiStrategy, fieldName: String): Validation[String, PiiScalar] = - ScalarMutators - .get(fieldName) - .map(PiiScalar(strategy, _).success) - .getOrElse(s"The specified pojo field ${fieldName} is not supported".failure) - - private def extractPiiJsonField(strategy: PiiStrategy, jsonField: JValue): Validation[String, PiiJson] = - (extractString(jsonField, "field") - .flatMap( - fieldName => - JsonMutators - .get(fieldName) - .map(_.success) - .getOrElse(s"The specified json field ${compact(jsonField)} is not supported".failure)) |@| - extractString(jsonField, "schemaCriterion").flatMap(sc => SchemaCriterion.parse(sc).leftMap(_.getMessage)) |@| - extractString(jsonField, "jsonPath")) { (fieldMutator: Mutator, sc: SchemaCriterion, jsonPath: String) => - PiiJson(strategy, fieldMutator, sc, jsonPath) - } - - private def extractString(jValue: JValue, field: String, tail: String*): Validation[String, String] = - ScalazJson4sUtils.extract[String](jValue, field, tail: _*).leftMap(_.getMessage) - - private def extractStrategyFunction(config: JValue): Validation[String, String] = - ScalazJson4sUtils - .extract[String](config, "parameters", "strategy", "pseudonymize", "hashFunction") - .leftMap(_.getMessage) - - private def matchesSchema(config: JValue, schemaKey: SchemaKey): Validation[String, JValue] = - if (supportedSchema.matches(schemaKey)) { - config.success - } else { - "Schema key %s is not supported. A '%s' enrichment must have schema '%s'." - .format(schemaKey, supportedSchema.name, supportedSchema) - .failure - } -} - -/** - * The PiiPseudonymizerEnrichment runs after all other enrichments to find fields that are configured as PII (personally - * identifiable information) and apply some anonymization (currently only pseudonymization) on them. Currently a single - * strategy for all the fields is supported due to the config format, and there is only one implemented strategy, - * however the enrichment supports a strategy per field. - * - * The user may specify two types of fields POJO or JSON. A POJO field is effectively a scalar field in the - * EnrichedEvent, whereas a JSON is a "context" formatted field and it can be wither a scalar in the case of - * unstruct_event or an array in the case of derived_events and contexts - * - * @param fieldList a list of configured PiiFields - */ -case class PiiPseudonymizerEnrichment(fieldList: List[PiiField]) extends Enrichment { - def transformer(event: EnrichedEvent): Unit = fieldList.foreach(_.transform(event)) -} - -/** - * Specifies a scalar field in POJO and the strategy that should be applied to it. - * @param strategy the strategy that should be applied - * @param fieldMutator the field mutator where the strategy will be applied - */ -final case class PiiScalar(strategy: PiiStrategy, fieldMutator: PiiConstants.Mutator) extends PiiField { - override def applyStrategy(fieldValue: String): String = - if (fieldValue != null) strategy.scramble(fieldValue) else null -} - -/** - * Specifies a strategy to use, a field mutator where the JSON can be found in the EnrichedEvent POJO, a schema criterion to - * discriminate which contexts to apply this strategy to, and a json path within the contexts where this strategy will - * be applied (the path may correspond to multiple fields). - * - * @param strategy the strategy that should be applied - * @param fieldMutator the field mutator for the json field - * @param schemaCriterion the schema for which the strategy will be applied - * @param jsonPath the path where the strategy will be applied - */ -final case class PiiJson(strategy: PiiStrategy, - fieldMutator: PiiConstants.Mutator, - schemaCriterion: SchemaCriterion, - jsonPath: String) - extends PiiField { - implicit val json4sFormats = DefaultFormats - - override def applyStrategy(fieldValue: String): String = - if (fieldValue != null) { - compact(render(parse(fieldValue) match { - case JObject(jObject) => { - val jObjectMap = jObject.toMap - val updated = jObjectMap.filterKeys(_ == "data").mapValues { - case JArray(contexts) => - JArray(contexts.map { - case JObject(context) => modifyObjectIfSchemaMatches(context) - case x => x - }) - case JObject(unstructEvent) => modifyObjectIfSchemaMatches(unstructEvent) - case x => x - } - JObject((jObjectMap ++ updated).toList) - } - case x => x - })) - } else null - - private def modifyObjectIfSchemaMatches(context: List[(String, json4s.JValue)]): JObject = { - val fieldsObj = context.toMap - (for { - schema <- fieldsObj.get("schema") - parsedSchemaMatches <- SchemaKey.parse(schema.extract[String]).map(schemaCriterion.matches).toOption - data <- fieldsObj.get("data") - if parsedSchemaMatches - } yield JObject(fieldsObj.updated("schema", schema).updated("data", jsonPathReplace(data)).toList)) - .getOrElse(JObject(context)) - } - - // Configuration for JsonPath - private val JacksonNodeJsonObjectMapper = { - val objectMapper = new ObjectMapper() - objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) - objectMapper - } - private val JsonPathConf = - Configuration - .builder() - .options(JOption.SUPPRESS_EXCEPTIONS) - .jsonProvider(new JacksonJsonNodeJsonProvider(JacksonNodeJsonObjectMapper)) - .build() - - /** - * Replaces a value in the given context data with the result of applying the strategy that value. - */ - private def jsonPathReplace(jValue: JValue): JValue = { - val objectNode = JsonMethods.mapper.valueToTree[ObjectNode](jValue) - val documentContext = JJsonPath.using(JsonPathConf).parse(objectNode) - documentContext.map( - jsonPath, - ScrambleMapFunction(strategy) - ) - JsonMethods.fromJsonNode(documentContext.json[JsonNode]()) - } -} - -case class ScrambleMapFunction(val strategy: PiiStrategy) extends MapFunction { - override def map(currentValue: AnyRef, configuration: Configuration): AnyRef = currentValue match { - case s: String => strategy.scramble(s) - case a: ArrayNode => - a.elements.asScala.map { - case t: TextNode => strategy.scramble(t.asText()) - case default: AnyRef => default - } - case default: AnyRef => default - } -} - -/** - * Implements a pseudonymization strategy using any algorithm known to DigestFunction - * @param hashFunction the DigestFunction to apply - */ -case class PiiStrategyPseudonymize(hashFunction: PiiConstants.DigestFunction) extends PiiStrategy { - val TextEncoding = "UTF-8" - override def scramble(clearText: String): String = hash(clearText) - def hash(text: String): String = hashFunction(text.getBytes(TextEncoding)) -} diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Mutators.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Mutators.scala new file mode 100644 index 0000000000..beabe7a644 --- /dev/null +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Mutators.scala @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2017-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +object Mutators { + + /** + * This and the next constant maps from a configuration field name to an EnrichedEvent mutator. The structure is such so that + * it preserves type safety, and it can be easily replaced in the future by generated code that will use the configuration as + * input. + */ + val ScalarMutators: Map[String, Mutator] = Map( + "user_id" -> Mutator( + "user_id", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.user_id, strategy) + event.user_id = newValue + modifiedFields + } + ), + "user_ipaddress" -> Mutator( + "user_ipaddress", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.user_ipaddress, strategy) + event.user_ipaddress = newValue + modifiedFields + } + ), + "user_fingerprint" -> Mutator( + "user_fingerprint", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.user_fingerprint, strategy) + event.user_fingerprint = newValue + modifiedFields + } + ), + "domain_userid" -> Mutator( + "domain_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.domain_userid, strategy) + event.domain_userid = newValue + modifiedFields + } + ), + "network_userid" -> Mutator( + "network_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.network_userid, strategy) + event.network_userid = newValue + modifiedFields + } + ), + "ip_organization" -> Mutator( + "ip_organization", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.ip_organization, strategy) + event.ip_organization = newValue + modifiedFields + } + ), + "ip_domain" -> Mutator( + "ip_domain", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.ip_domain, strategy) + event.ip_domain = newValue + modifiedFields + } + ), + "tr_orderid" -> Mutator( + "tr_orderid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.tr_orderid, strategy) + event.tr_orderid = newValue + modifiedFields + } + ), + "ti_orderid" -> Mutator( + "ti_orderid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.ti_orderid, strategy) + event.ti_orderid = newValue + modifiedFields + } + ), + "mkt_term" -> Mutator( + "mkt_term", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.mkt_term, strategy) + event.mkt_term = newValue + modifiedFields + } + ), + "mkt_content" -> Mutator( + "mkt_content", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.mkt_content, strategy) + event.mkt_content = newValue + modifiedFields + } + ), + "se_category" -> Mutator( + "se_category", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.se_category, strategy) + event.se_category = newValue + modifiedFields + } + ), + "se_action" -> Mutator( + "se_action", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.se_action, strategy) + event.se_action = newValue + modifiedFields + } + ), + "se_label" -> Mutator( + "se_label", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.se_label, strategy) + event.se_label = newValue + modifiedFields + } + ), + "se_property" -> Mutator( + "se_property", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.se_property, strategy) + event.se_property = newValue + modifiedFields + } + ), + "mkt_clickid" -> Mutator( + "mkt_clickid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.mkt_clickid, strategy) + event.mkt_clickid = newValue + modifiedFields + } + ), + "refr_domain_userid" -> Mutator( + "refr_domain_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.refr_domain_userid, strategy) + event.refr_domain_userid = newValue + modifiedFields + } + ), + "domain_sessionid" -> Mutator( + "domain_sessionid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.domain_sessionid, strategy) + event.domain_sessionid = newValue + modifiedFields + } + ) + ) + + val JsonMutators: Map[String, Mutator] = Map( + "contexts" -> Mutator( + "contexts", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.contexts, strategy) + event.contexts = newValue + modifiedFields + } + ), + "derived_contexts" -> Mutator( + "derived_contexts", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.derived_contexts, strategy) + event.derived_contexts = newValue + modifiedFields + } + ), + "unstruct_event" -> Mutator( + "unstruct_event", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStrategyFn) => + val (newValue, modifiedFields) = fn(event.unstruct_event, strategy) + event.unstruct_event = newValue + modifiedFields + } + ) + ) +} diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala new file mode 100644 index 0000000000..4a9582aa87 --- /dev/null +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2017-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics +package snowplow.enrich +package common.enrichments.registry +package pii + +// Scala +import scala.collection.JavaConverters._ +import scala.collection.mutable.MutableList + +// Scala libraries +import org.json4s +import org.json4s.{DefaultFormats, JValue} +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.jackson.Serialization.write + +// Java +import org.apache.commons.codec.digest.DigestUtils + +// Java libraries +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode, TextNode} +import com.jayway.jsonpath.{Configuration, JsonPath => JJsonPath} +import com.jayway.jsonpath.MapFunction + +// Scalaz +import scalaz._ +import Scalaz._ + +// Iglu +import iglu.client.validation.ProcessingMessageMethods._ +import iglu.client.{SchemaCriterion, SchemaKey} + +// This project +import common.ValidatedNelMessage +import common.utils.ScalazJson4sUtils +import common.outputs.EnrichedEvent + +/** + * Companion object. Lets us create a PiiPseudonymizerEnrichment + * from a JValue. + */ +object PiiPseudonymizerEnrichment extends ParseableEnrichment { + + implicit val json4sFormats = DefaultFormats + + override val supportedSchema = + SchemaCriterion("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", 2, 0, 0) + + def parse(config: JValue, schemaKey: SchemaKey): ValidatedNelMessage[PiiPseudonymizerEnrichment] = { + for { + conf <- matchesSchema(config, schemaKey) + + emitIdentificationEvent = ScalazJson4sUtils + .extract[Boolean](conf, "emitEvent") + .toOption + .getOrElse(false) + piiFields <- ScalazJson4sUtils.extract[List[JObject]](conf, "parameters", "pii").leftMap(_.getMessage) + hashFunctionName <- extractStrategyFunction(config) + hashFunction <- getHashFunction(hashFunctionName) + piiFieldList <- extractFields(piiFields) + } yield + PiiPseudonymizerEnrichment(piiFieldList, + emitIdentificationEvent, + PiiStrategyPseudonymize(hashFunctionName, hashFunction)) + }.leftMap(_.toProcessingMessageNel) + + private[pii] def getHashFunction(strategyFunction: String): Validation[String, DigestFunction] = + strategyFunction match { + case "MD2" => { DigestUtils.md2Hex(_: Array[Byte]) }.success + case "MD5" => { DigestUtils.md5Hex(_: Array[Byte]) }.success + case "SHA-1" => { DigestUtils.sha1Hex(_: Array[Byte]) }.success + case "SHA-256" => { DigestUtils.sha256Hex(_: Array[Byte]) }.success + case "SHA-384" => { DigestUtils.sha384Hex(_: Array[Byte]) }.success + case "SHA-512" => { DigestUtils.sha512Hex(_: Array[Byte]) }.success + case fName => s"Unknown function $fName".failure + } + + private def extractFields(piiFields: List[JObject]): Validation[String, List[PiiField]] = + piiFields.map { + case field: JObject => + if (ScalazJson4sUtils.fieldExists(field, "pojo")) + extractString(field, "pojo", "field").flatMap(extractPiiScalarField) + else if (ScalazJson4sUtils.fieldExists(field, "json")) extractPiiJsonField(field \ "json") + else s"PII Configuration: pii field does not include 'pojo' nor 'json' fields. Got: [${compact(field)}]".failure + case json => s"PII Configuration: pii field does not contain an object. Got: [${compact(json)}]".failure + }.sequenceU + + private def extractPiiScalarField(fieldName: String): Validation[String, PiiScalar] = + ScalarMutators + .get(fieldName) + .map(PiiScalar(_).success) + .getOrElse(s"The specified pojo field $fieldName is not supported".failure) + + private def extractPiiJsonField(jsonField: JValue): Validation[String, PiiJson] = { + val schemaCriterion = extractString(jsonField, "schemaCriterion") + .flatMap(sc => SchemaCriterion.parse(sc).leftMap(_.getMessage)) + .toValidationNel + val jsonPath = extractString(jsonField, "jsonPath").toValidationNel + val mutator = extractString(jsonField, "field") + .flatMap(getJsonMutator) + .toValidationNel + val validatedNel = (mutator |@| schemaCriterion |@| jsonPath)(PiiJson.apply) + validatedNel.leftMap(x => s"Unable to extract PII JSON: ${x.list.mkString(",")}") + } + + private def getJsonMutator(fieldName: String): Validation[String, Mutator] = + JsonMutators + .get(fieldName) + .map(_.success) + .getOrElse(s"The specified json field $fieldName is not supported".failure) + + private def extractString(jValue: JValue, field: String, tail: String*): Validation[String, String] = + ScalazJson4sUtils.extract[String](jValue, field, tail: _*).leftMap(_.getMessage) + + private def extractStrategyFunction(config: JValue): Validation[String, String] = + ScalazJson4sUtils + .extract[String](config, "parameters", "strategy", "pseudonymize", "hashFunction") + .leftMap(_.getMessage) + + private def matchesSchema(config: JValue, schemaKey: SchemaKey): Validation[String, JValue] = + if (supportedSchema.matches(schemaKey)) + config.success + else + s"Schema key $schemaKey is not supported. A '${supportedSchema.name}' enrichment must have schema '$supportedSchema'.".failure +} + +/** + * Implements a pseudonymization strategy using any algorithm known to DigestFunction + * @param hashFunction the DigestFunction to apply + */ +final case class PiiStrategyPseudonymize(functionName: String, hashFunction: DigestFunction) extends PiiStrategy { + val TextEncoding = "UTF-8" + override def scramble(clearText: String): String = hash(clearText) + def hash(text: String): String = hashFunction(text.getBytes(TextEncoding)) +} + +/** + * The PiiPseudonymizerEnrichment runs after all other enrichments to find fields that are configured as PII (personally + * identifiable information) and apply some anonymization (currently only pseudonymization) on them. Currently a single + * strategy for all the fields is supported due to the configuration format, and there is only one implemented strategy, + * however the enrichment supports a strategy per field. + * + * The user may specify two types of fields in the config `pojo` or `json`. A `pojo` field is effectively a scalar field in the + * EnrichedEvent, whereas a `json` is a "context" formatted field and it can either contain a single value in the case of + * unstruct_event, or an array in the case of derived_events and contexts. + * + * @param fieldList a list of configured PiiFields + * @param emitIdentificationEvent whether to emit an identification event + * @param strategy the pseudonymization strategy to use + */ +case class PiiPseudonymizerEnrichment(fieldList: List[PiiField], + emitIdentificationEvent: Boolean, + strategy: PiiStrategy) + extends Enrichment { + implicit val json4sFormats = DefaultFormats + new PiiModifiedFieldsSerializer + new PiiStrategySerializer + def transformer(event: EnrichedEvent): Unit = { + val modifiedFields: ModifiedFields = fieldList.flatMap(_.transform(event, strategy)) + event.pii = if (modifiedFields.nonEmpty) write(PiiModifiedFields(modifiedFields, strategy)) else null + } +} + +/** + * Specifies a scalar field in POJO and the strategy that should be applied to it. + * @param fieldMutator the field mutator where the strategy will be applied + */ +final case class PiiScalar(fieldMutator: Mutator) extends PiiField { + override def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) = + if (fieldValue != null) { + val modifiedValue = strategy.scramble(fieldValue) + (modifiedValue, List(ScalarModifiedField(fieldMutator.fieldName, fieldValue, modifiedValue))) + } else (null, List()) +} + +/** + * Specifies a strategy to use, a field mutator where the JSON can be found in the EnrichedEvent POJO, a schema criterion to + * discriminate which contexts to apply this strategy to, and a JSON path within the contexts where this strategy will + * be applied (the path may correspond to multiple fields). + * + * @param fieldMutator the field mutator for the JSON field + * @param schemaCriterion the schema for which the strategy will be applied + * @param jsonPath the path where the strategy will be applied + */ +final case class PiiJson(fieldMutator: Mutator, schemaCriterion: SchemaCriterion, jsonPath: String) extends PiiField { + implicit val json4sFormats = DefaultFormats + + override def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) = { + val modifiedFields = MutableList[JsonModifiedField]() + if (fieldValue != null) { + val parsedAndSubistuted = parse(fieldValue) match { + case JObject(jObject) => + val jObjectMap = jObject.toMap + val updated = jObjectMap.filterKeys(_ == "data").mapValues { + case JArray(contexts) => + JArray(contexts.map { + case JObject(context) => modifyAndGetValues(context, strategy, modifiedFields) + case x => x + }) + case JObject(unstructEvent) => modifyAndGetValues(unstructEvent, strategy, modifiedFields) + case x => x + } + JObject((jObjectMap ++ updated).toList) + case x => x + } + val rendered = render(parsedAndSubistuted) + val compacted = compact(rendered) + (compacted, modifiedFields.toList) + } else (null, modifiedFields.toList) + } + + /** + * Modifies the object if applicable and adds to the list of modified values, if applicable. + */ + private def modifyAndGetValues(context: List[(String, json4s.JValue)], + strategy: PiiStrategy, + modifiedFields: MutableList[JsonModifiedField]): JObject = { + val (values, listOfModifiedValues) = modifyObjectIfSchemaMatches(context, strategy) + modifiedFields ++= listOfModifiedValues + values + } + + /** + * Tests whether the schema for this event matches the schema criterion and if it does modifies it. + */ + private def modifyObjectIfSchemaMatches(context: List[(String, json4s.JValue)], + strategy: PiiStrategy): (JObject, List[JsonModifiedField]) = { + val fieldsObj = context.toMap + (for { + schema <- fieldsObj.get("schema") + schemaStr = schema.extract[String] + parsedSchemaMatches <- SchemaKey.parse(schemaStr).map(schemaCriterion.matches).toOption + data <- fieldsObj.get("data") + if parsedSchemaMatches + updated = jsonPathReplace(data, strategy, schemaStr) + } yield (JObject(fieldsObj.updated("schema", schema).updated("data", updated._1).toList), updated._2)) + .getOrElse((JObject(context), List())) + } + + /** + * Replaces a value in the given context data with the result of applying the strategy that value. + */ + private def jsonPathReplace(jValue: JValue, + strategy: PiiStrategy, + schema: String): (JValue, List[JsonModifiedField]) = { + val objectNode = JsonMethods.mapper.valueToTree[ObjectNode](jValue) + val documentContext = JJsonPath.using(JsonPathConf).parse(objectNode) + val modifiedFields = MutableList[JsonModifiedField]() + documentContext.map(jsonPath, + new ScrambleMapFunction(strategy, modifiedFields, fieldMutator.fieldName, jsonPath, schema)) + (JsonMethods.fromJsonNode(documentContext.json[JsonNode]), modifiedFields.toList) + } +} + +private final class ScrambleMapFunction(strategy: PiiStrategy, + modifiedFields: MutableList[JsonModifiedField], + fieldName: String, + jsonPath: String, + schema: String) + extends MapFunction { + override def map(currentValue: AnyRef, configuration: Configuration): AnyRef = currentValue match { + case s: String => + val newValue = strategy.scramble(s) + modifiedFields += JsonModifiedField(fieldName, s, newValue, jsonPath, schema) + newValue + case a: ArrayNode => + a.elements.asScala.map { + case t: TextNode => + val originalValue = t.asText() + val newValue = strategy.scramble(originalValue) + modifiedFields += JsonModifiedField(fieldName, originalValue, newValue, jsonPath, schema) + newValue + case default: AnyRef => default + } + case default: AnyRef => default + } +} diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Serializers.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Serializers.scala new file mode 100644 index 0000000000..a7e702f112 --- /dev/null +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/Serializers.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics +package snowplow.enrich +package common.enrichments.registry +package pii + +// Json4s +import org.json4s.JsonDSL._ +import org.json4s.Extraction.decompose +import org.json4s.{CustomSerializer, JObject} + +// Scalaz +import scalaz.{Failure, Success} + +/** + * Custom serializer for PiiStrategy class + */ +private[pii] final class PiiStrategySerializer + extends CustomSerializer[PiiStrategy](formats => + ({ + case jo: JObject => + implicit val json4sFormats = formats + val function = (jo \ "pseudonymize" \ "hashFunction").extract[String] + PiiPseudonymizerEnrichment.getHashFunction(function) match { + case Success(hf) => PiiStrategyPseudonymize(function, hf) + case Failure(msg) => + println(msg); PiiStrategyPseudonymize("IDENTITY", (b: Array[Byte]) => b.mkString) // FIXME: What to do here? + } + }, { + case psp: PiiStrategyPseudonymize => + "pseudonymize" -> ("hashFunction" -> psp.functionName) + })) + +/** + * Custom serializer for PiiModifiedFields class + */ +private[pii] final class PiiModifiedFieldsSerializer + extends CustomSerializer[PiiModifiedFields](formats => { + val PiiTransformationSchema = "iglu:com.snowplowanalytics.snowplow/pii_transformation/jsonschema/1-0-0" + ({ + case jo: JObject => + implicit val json4sFormats = formats + val fields = (jo \ "data" \ "pii").extract[List[ModifiedField]] + val strategy = (jo \ "data" \ "strategy").extract[PiiStrategy] + PiiModifiedFields(fields, strategy) + }, { + case pmf: PiiModifiedFields => + implicit val json4sFormats = formats + ("schema" -> PiiTransformationSchema) ~ + ("data" -> + ("pii" -> decompose( + pmf.modifiedFields.foldLeft(Map.empty[String, List[ModifiedField]]) { + case (m, mf) => + mf match { + case s: ScalarModifiedField => + m + ("pojo" -> (s :: m.getOrElse("pojo", List.empty[ModifiedField]))) + case j: JsonModifiedField => m + ("json" -> (j :: m.getOrElse("json", List.empty[ModifiedField]))) + } + } + )) ~ + ("strategy" -> decompose(pmf.strategy))) + }) + }) diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/package.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/package.scala new file mode 100644 index 0000000000..ea46363c42 --- /dev/null +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/package.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2017-2018 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common +package enrichments.registry + +// Scala libraries +import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} +import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider +import com.jayway.jsonpath.{Configuration, Option => JOption} + +// Snowplow +import outputs.EnrichedEvent + +package object pii { + type DigestFunction = Function1[Array[Byte], String] + type ModifiedFields = List[ModifiedField] + type ApplyStrategyFn = (String, PiiStrategy) => (String, ModifiedFields) + type MutatorFn = (EnrichedEvent, PiiStrategy, ApplyStrategyFn) => ModifiedFields + + val JsonMutators = Mutators.JsonMutators + val ScalarMutators = Mutators.ScalarMutators + + // Configuration for JsonPath + // SerializationFeature.FAIL_ON_EMPTY_BEANS is required otherwise an invalid path causes an exception + private[pii] val JacksonNodeJsonObjectMapper = { + val objectMapper = new ObjectMapper() + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) + objectMapper + } + // SUPPRESS_EXCEPTIONS is useful here as we prefer an empty list to an exception when a path is not found. + private[pii] val JsonPathConf = + Configuration + .builder() + .options(JOption.SUPPRESS_EXCEPTIONS) + .jsonProvider(new JacksonJsonNodeJsonProvider(JacksonNodeJsonObjectMapper)) + .build() +} + +package pii { + + /** + * PiiStrategy trait. This corresponds to a strategy to apply to a single field. Currently only String input is + * supported. + */ + trait PiiStrategy { + def scramble(clearText: String): String + } + + /** + * The mutator class encapsulates the mutator function and the field name where the mutator will be applied. + */ + private[pii] final case class Mutator(fieldName: String, muatatorFn: MutatorFn) + + /** + * Parent class for classes that serialize the values that were modified during the PII enrichment. + */ + private[pii] final case class PiiModifiedFields(modifiedFields: ModifiedFields, strategy: PiiStrategy) + + /** + * Case class for capturing scalar field modifications. + */ + private[pii] final case class ScalarModifiedField(fieldName: String, originalValue: String, modifiedValue: String) + extends ModifiedField + + /** + * Case class for capturing JSON field modifications. + */ + private[pii] final case class JsonModifiedField(fieldName: String, + originalValue: String, + modifiedValue: String, + jsonPath: String, + schema: String) + extends ModifiedField + + /** + * PiiField trait. This corresponds to a configuration top-level field (i.e. either a scalar or a JSON field) along with + * a function to apply that strategy to the EnrichedEvent POJO (A scalar field is represented in config py "pojo") + */ + private[pii] trait PiiField { + + /** + * The POJO mutator for this field + * + * @return fieldMutator + */ + def fieldMutator: Mutator + + /** + * Gets an enriched event from the enrichment manager and modifies it according to the specified strategy. + * + * @param event The enriched event + */ + def transform(event: EnrichedEvent, strategy: PiiStrategy): ModifiedFields = + fieldMutator.muatatorFn(event, strategy, applyStrategy) + + protected def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) + } + + /** + * The modified field trait represents an item that is transformed in either the JSON or a scalar mutators. + */ + sealed trait ModifiedField + +} diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala index 3631f293c1..df324fda92 100644 --- a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala @@ -244,4 +244,7 @@ class EnrichedEvent extends Serializable { // True timestamp @BeanProperty var true_tstamp: String = _ + + // Fields modified form PII enrichemnt (JSON String) + @BeanProperty var pii: String = _ } diff --git a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala index 5ce3929145..56cc43997c 100644 --- a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala +++ b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala @@ -297,10 +297,11 @@ class EnrichmentConfigsSpec extends Specification with ValidationMatchers { "Parsing a valid pii_enrichment_config enrichment JSON" should { "successfully construct a PiiPsedonymizerEnrichment case object" in { - import PiiConstants._ + import pii._ val piiPseudonymizerEnrichmentJson = parse("""{ | "enabled": true, + | "emitEvent": true, | "parameters": { | "pii": [ | { @@ -325,38 +326,24 @@ class EnrichmentConfigsSpec extends Specification with ValidationMatchers { |}""".stripMargin) val schemaKey = - SchemaKey("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", "1-0-0") + SchemaKey("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", "2-0-0") val result = PiiPseudonymizerEnrichment.parse(piiPseudonymizerEnrichmentJson, schemaKey) - result must beSuccessful.like { case piiRes: PiiPseudonymizerEnrichment => { - (piiRes.fieldList.size must_== 2) and - (piiRes.fieldList(0) must haveClass[PiiScalar]) and - (piiRes.fieldList(0).asInstanceOf[PiiScalar].strategy must haveClass[PiiStrategyPseudonymize]) and - (piiRes - .fieldList(0) - .asInstanceOf[PiiScalar] - .strategy - .asInstanceOf[PiiStrategyPseudonymize] - .hashFunction("1234".getBytes("UTF-8")) + (piiRes.strategy must haveClass[PiiStrategyPseudonymize]) and + (piiRes.strategy.asInstanceOf[PiiStrategyPseudonymize].hashFunction("1234".getBytes("UTF-8")) must_== "03ac674216f3e15c761ee1a5e255f067953623c8b388b4459e13f978d7c846f4") and + (piiRes.fieldList.size must_== 2) and + (piiRes.fieldList(0) must haveClass[PiiScalar]) and (piiRes.fieldList(0).asInstanceOf[PiiScalar].fieldMutator must_== ScalarMutators.get("user_id").get) and - (piiRes.fieldList(1).asInstanceOf[PiiJson].strategy must haveClass[PiiStrategyPseudonymize]) and (piiRes.fieldList(1).asInstanceOf[PiiJson].fieldMutator must_== JsonMutators.get("contexts").get) and (piiRes .fieldList(1) .asInstanceOf[PiiJson] .schemaCriterion .toString must_== "iglu:com.acme/email_sent/jsonschema/1-*-*") and - (piiRes.fieldList(1).asInstanceOf[PiiJson].jsonPath must_== "$.emailAddress") and - (piiRes - .fieldList(1) - .asInstanceOf[PiiJson] - .strategy - .asInstanceOf[PiiStrategyPseudonymize] - .hashFunction("12345".getBytes("UTF-8")) - must_== "5994471abb01112afcc18159f6cc74b4f511b99806da59b3caf5a9c173cacfc5") + (piiRes.fieldList(1).asInstanceOf[PiiJson].jsonPath must_== "$.emailAddress") } } } diff --git a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala similarity index 69% rename from 3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala rename to 3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala index 199544798b..a350898ada 100644 --- a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala +++ b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala @@ -10,11 +10,12 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ - -package com.snowplowanalytics.snowplow.enrich +package com.snowplowanalytics +package snowplow.enrich package common package enrichments package registry +package pii // Specs2 & Scalaz-Specs2 import org.specs2.Specification @@ -31,19 +32,18 @@ import org.apache.commons.codec.digest.DigestUtils // Snowplow import common.loaders.{CollectorApi, CollectorContext, CollectorPayload, CollectorSource} import common.outputs.EnrichedEvent +import utils.TestResourcesRepositoryRef import common.SpecHelpers.toNameValuePairs -import common.utils.TestResourcesRepositoryRef // Iglu -import com.snowplowanalytics.iglu.client.SchemaCriterion -import com.snowplowanalytics.iglu.client.Resolver -import com.snowplowanalytics.iglu.client.repositories.{EmbeddedRepositoryRef, RepositoryRefConfig} +import iglu.client.SchemaCriterion +import iglu.client.Resolver +import iglu.client.repositories.RepositoryRefConfig +import iglu.client.validation.ValidatableJValue._ // Scalaz import scalaz.Scalaz._ -import PiiConstants._ - class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatchers { def is = s2""" This is a specification to test PII @@ -53,6 +53,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche Hashing configured JSON fields in POJO should work when multiple fields are matched through jsonpath $e4 Hashing configured JSON fields in POJO should work when multiple fields are matched through schemacriterion $e5 Hashing configured JSON fields in POJO should silently ignore unsupported types $e6 + Hashing configured JSON and scalar fields in POJO emits a correct pii_transformation event $e7 """ def commonSetup(enrichmentMap: EnrichmentMap): List[ValidatedEnrichedEvent] = { @@ -135,27 +136,32 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche EtlPipeline.processEvents(registry, s"spark-0.0.0", new DateTime(1500000000L), input) } - private val ipEnrichment = IpLookupsEnrichment(Some(("geo", new URI("/ignored-in-local-mode/"), "GeoIPCity.dat")), - Some(("isp", new URI("/ignored-in-local-mode/"), "GeoIPISP.dat")), - None, - None, - None, - true) + private val ipEnrichment = IpLookupsEnrichment( + Some(("geo", new URI("/ignored-in-local-mode/"), "GeoIPCity.dat")), + Some(("isp", new URI("/ignored-in-local-mode/"), "GeoIPISP.dat")), + None, + None, + None, + localMode = true + ) def e1 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), - fieldMutator = ScalarMutators.get("user_id").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), - fieldMutator = ScalarMutators.get("user_ipaddress").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), - fieldMutator = ScalarMutators.get("ip_domain").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), - fieldMutator = ScalarMutators.get("user_fingerprint").get) - ))) + PiiScalar(fieldMutator = ScalarMutators.get("user_id").get), + PiiScalar( + fieldMutator = ScalarMutators.get("user_ipaddress").get + ), + PiiScalar(fieldMutator = ScalarMutators.get("ip_domain").get), + PiiScalar( + fieldMutator = ScalarMutators.get("user_fingerprint").get + ) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -168,9 +174,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => (enrichedEvent.app_id must_== expected.app_id) and (enrichedEvent.user_id must_== expected.user_id) and (enrichedEvent.user_ipaddress must_== expected.user_ipaddress) and @@ -179,34 +185,33 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (enrichedEvent.geo_city must_== expected.geo_city) and (enrichedEvent.etl_tstamp must_== expected.etl_tstamp) and (enrichedEvent.collector_tstamp must_== expected.collector_tstamp) - } } } def e2 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, jsonPath = "$.emailAddress" ), PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-1-0").toOption.get, jsonPath = "$.data.emailAddress2" ), PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("unstruct_event").get, schemaCriterion = SchemaCriterion.parse("iglu:com.mailgun/message_clicked/jsonschema/1-0-0").toOption.get, jsonPath = "$.ip" ) - ))) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -220,9 +225,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) val unstructEventJ = parse(enrichedEvent.unstruct_event) @@ -242,22 +247,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (((unstructEventJ \ "data") \ "data" \ "ip") .extract[String] must_== "b5814ada7bb3abb2ed7f8713433a60ed3b3780f7d98a95c936cc62abb16f316f") and (((unstructEventJ \ "data") \ "data" \ "myVar2").extract[String] must_== "awesome") - } } } def e3 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-*").toOption.get, jsonPath = "$.field.that.does.not.exist.in.this.instance" ) - ))) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -271,31 +277,32 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress").extract[String] must_== "jim@acme.com") and (((contextJ \ "data")(0) \ "data" \ "emailAddress2").extract[String] must_== "bob@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e4 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, jsonPath = "$.['emailAddress', 'emailAddress2', 'emailAddressNonExistent']" // Last case throws an exeption if misconfigured ) - ))) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -309,9 +316,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -320,22 +327,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche .extract[String] must_== "405ac8384fa984f787f9486daf34d84d98f20c4d6a12e2cc4ed89be3bcb06ad6") and (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e5 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-0").toOption.get, jsonPath = "$.emailAddress" ) - ))) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -348,9 +356,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -360,22 +368,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (((contextJ \ "data")(1) \ "data" \ "emailAddress") .extract[String] must_== "663ea32adb6f26f7a025e3b6c850294d0d7755c3010c5e7a5fd690cfa5d2938f") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e6 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-*").toOption.get, jsonPath = "$.someInt" ) - ))) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -388,9 +397,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -401,7 +410,68 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche .extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") (((contextJ \ "data")(1) \ "data" \ "someInt").extract[Int] must_== 1) - } + } + } + + def e7 = { + val enrichmentMap = Map( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( + List( + PiiScalar(fieldMutator = ScalarMutators.get("user_id").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_ipaddress").get), + PiiScalar(fieldMutator = ScalarMutators.get("ip_domain").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_fingerprint").get), + PiiJson( + fieldMutator = JsonMutators.get("contexts").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, + jsonPath = "$.emailAddress" + ), + PiiJson( + fieldMutator = JsonMutators.get("contexts").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-1-0").toOption.get, + jsonPath = "$.data.emailAddress2" + ), + PiiJson( + fieldMutator = JsonMutators.get("unstruct_event").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.mailgun/message_clicked/jsonschema/1-0-0").toOption.get, + jsonPath = "$.ip" + ) + ), + false, + PiiStrategyPseudonymize("SHA-256", hashFunction = (b: Array[Byte]) => DigestUtils.sha256Hex(b)) + ) + ) + val output = commonSetup(enrichmentMap = enrichmentMap) + val expected = new EnrichedEvent() + expected.app_id = "ads" + expected.ip_domain = null + expected.geo_city = "Delray Beach" + expected.etl_tstamp = "1970-01-18 08:40:00.000" + expected.collector_tstamp = "2017-07-14 03:39:39.000" + expected.pii = + """{"schema":"iglu:com.snowplowanalytics.snowplow/pii_transformation/jsonschema/1-0-0","data":{"pii":{"pojo":[{"fieldName":"user_fingerprint","originalValue":"its_you_again!","modifiedValue":"9f9fc89b7a5428f2646347974404650fc8776f791afc2200efc8a82aa754e7e6"},{"fieldName":"user_ipaddress","originalValue":"70.46.123.145","modifiedValue":"36595ea260a82b7e2d7cf44121892bf31031a9c27077d8c802454464178456c2"},{"fieldName":"user_id","originalValue":"john@acme.com","modifiedValue":"4b2d8785b49bad23638b17d8db76857a79bf79441241a78a97d88cc64bbf766e"}],"json":[{"fieldName":"unstruct_event","originalValue":"50.56.129.169","modifiedValue":"b5814ada7bb3abb2ed7f8713433a60ed3b3780f7d98a95c936cc62abb16f316f","jsonPath":"$.ip","schema":"iglu:com.mailgun/message_clicked/jsonschema/1-0-0"},{"fieldName":"contexts","originalValue":"bob@acme.com","modifiedValue":"405ac8384fa984f787f9486daf34d84d98f20c4d6a12e2cc4ed89be3bcb06ad6","jsonPath":"$.data.emailAddress2","schema":"iglu:com.acme/email_sent/jsonschema/1-1-0"},{"fieldName":"contexts","originalValue":"jim@acme.com","modifiedValue":"3571b422ecb9ac85cb654b2fce521ae351d4695b0fb788aac75caf724e7881f0","jsonPath":"$.emailAddress","schema":"iglu:com.acme/email_sent/jsonschema/1-0-0"}]},"strategy":{"pseudonymize":{"hashFunction":"SHA-256"}}}}""" + + output.size must_== 1 + val out = output.head + out must beSuccessful.like { + case enrichedEvent => + implicit val formats = org.json4s.DefaultFormats + val contextJ = parse(enrichedEvent.contexts) + val unstructEventJ = parse(enrichedEvent.unstruct_event) + (enrichedEvent.pii must_== expected.pii) and // This is the important test, the rest just verify that nothing has changed. + (enrichedEvent.app_id must_== expected.app_id) and + (enrichedEvent.ip_domain must_== expected.ip_domain) and + (enrichedEvent.geo_city must_== expected.geo_city) and + (enrichedEvent.etl_tstamp must_== expected.etl_tstamp) and + (enrichedEvent.collector_tstamp must_== expected.collector_tstamp) and + (((contextJ \ "data")(0) \ "data" \ "emailAddress2").extract[String] must_== "bob@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "data" \ "emailAddress").extract[String] must_== "jim@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "schema") + .extract[String] must_== "iglu:com.acme/email_sent/jsonschema/1-0-0") and + (((unstructEventJ \ "data") \ "data" \ "myVar2").extract[String] must_== "awesome") } } }