diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala index 613b17f969..86726c33de 100644 --- a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/PiiPseudonymizerEnrichment.scala @@ -17,14 +17,17 @@ package common.enrichments.registry // Scala import scala.collection.JavaConverters._ +import scala.collection.mutable.MutableList // Scala libraries import org.json4s -import org.json4s.JValue +import org.json4s.{CustomSerializer, DefaultFormats, JValue} import org.json4s.JsonAST._ +import org.json4s.JsonDSL._ +import org.json4s.Extraction.decompose import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods.{compact, parse, render} -import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization.write // Java import java.security.{MessageDigest, NoSuchAlgorithmException} @@ -51,80 +54,166 @@ import common.utils.ScalazJson4sUtils import common.outputs.EnrichedEvent object PiiConstants { - type Mutator = (EnrichedEvent, String => String) => Unit + type ModifiedFields = List[ModifedField] + type ApplyStartegyFn = (String, PiiStrategy) => (String, ModifiedFields) + type MutatorFn = (EnrichedEvent, PiiStrategy, ApplyStartegyFn) => ModifiedFields /** - * This and the next constant maps from a config field name to an EnrichedEvent mutator. The structure is such so that - * it preserves type safety, and it can be easily replaced in the future by generated code that will use the config as + * This and the next constant maps from a configuration field name to an EnrichedEvent mutator. The structure is such so that + * it preserves type safety, and it can be easily replaced in the future by generated code that will use the configuration as * input. */ val ScalarMutators: Map[String, Mutator] = Map( - "user_id" -> { (event: EnrichedEvent, fn: String => String) => - event.user_id = fn(event.user_id) - }, - "user_ipaddress" -> { (event: EnrichedEvent, fn: String => String) => - event.user_ipaddress = fn(event.user_ipaddress) - }, - "user_fingerprint" -> { (event: EnrichedEvent, fn: String => String) => - event.user_fingerprint = fn(event.user_fingerprint) - }, - "domain_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.domain_userid = fn(event.domain_userid) - }, - "network_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.network_userid = fn(event.network_userid) - }, - "ip_organization" -> { (event: EnrichedEvent, fn: String => String) => - event.ip_organization = fn(event.ip_organization) - }, - "ip_domain" -> { (event: EnrichedEvent, fn: String => String) => - event.ip_domain = fn(event.ip_domain) - }, - "tr_orderid" -> { (event: EnrichedEvent, fn: String => String) => - event.tr_orderid = fn(event.tr_orderid) - }, - "ti_orderid" -> { (event: EnrichedEvent, fn: String => String) => - event.ti_orderid = fn(event.ti_orderid) - }, - "mkt_term" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_term = fn(event.mkt_term) - }, - "mkt_content" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_content = fn(event.mkt_content) - }, - "se_category" -> { (event: EnrichedEvent, fn: String => String) => - event.se_category = fn(event.se_category) - }, - "se_action" -> { (event: EnrichedEvent, fn: String => String) => - event.se_action = fn(event.se_action) - }, - "se_label" -> { (event: EnrichedEvent, fn: String => String) => - event.se_label = fn(event.se_label) - }, - "se_property" -> { (event: EnrichedEvent, fn: String => String) => - event.se_property = fn(event.se_property) - }, - "mkt_clickid" -> { (event: EnrichedEvent, fn: String => String) => - event.mkt_clickid = fn(event.mkt_clickid) - }, - "refr_domain_userid" -> { (event: EnrichedEvent, fn: String => String) => - event.refr_domain_userid = fn(event.refr_domain_userid) - }, - "domain_sessionid" -> { (event: EnrichedEvent, fn: String => String) => - event.domain_sessionid = fn(event.domain_sessionid) - } + "user_id" -> Mutator( + "user_id", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.user_id, strategy) + event.user_id = newValue + modifiedFields + } + ), + "user_ipaddress" -> Mutator( + "user_ipaddress", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.user_ipaddress, strategy) + event.user_ipaddress = newValue + modifiedFields + } + ), + "user_fingerprint" -> Mutator( + "user_fingerprint", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.user_fingerprint, strategy) + event.user_fingerprint = newValue + modifiedFields + } + ), + "domain_userid" -> Mutator( + "domain_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.domain_userid, strategy) + event.domain_userid = newValue + modifiedFields + } + ), + "network_userid" -> Mutator( + "network_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.network_userid, strategy) + event.network_userid = newValue + modifiedFields + } + ), + "ip_organization" -> Mutator( + "ip_organization", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.ip_organization, strategy) + event.ip_organization = newValue + modifiedFields + } + ), + "ip_domain" -> Mutator( + "ip_domain", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.ip_domain, strategy) + event.ip_domain = newValue + modifiedFields + } + ), + "tr_orderid" -> Mutator( + "tr_orderid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.tr_orderid, strategy) + event.tr_orderid = newValue + modifiedFields + } + ), + "ti_orderid" -> Mutator( + "ti_orderid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.ti_orderid, strategy) + event.ti_orderid = newValue + modifiedFields + } + ), + "mkt_term" -> Mutator( + "mkt_term", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.mkt_term, strategy) + event.mkt_term = newValue + modifiedFields + } + ), + "mkt_content" -> Mutator( + "mkt_content", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.mkt_content, strategy) + event.mkt_content = newValue + modifiedFields + } + ), + "se_category" -> Mutator( + "se_category", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.se_category, strategy) + event.se_category = newValue + modifiedFields + } + ), + "se_action" -> Mutator( + "se_action", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.se_action, strategy) + event.se_action = newValue + modifiedFields + } + ), + "se_label" -> Mutator( + "se_label", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.se_label, strategy) + event.se_label = newValue + modifiedFields + } + ), + "se_property" -> Mutator( + "se_property", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.se_property, strategy) + event.se_property = newValue + modifiedFields + } + ), + "mkt_clickid" -> Mutator( + "mkt_clickid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.mkt_clickid, strategy) + event.mkt_clickid = newValue + modifiedFields + } + ), + "refr_domain_userid" -> Mutator( + "refr_domain_userid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.refr_domain_userid, strategy) + event.refr_domain_userid = newValue + modifiedFields + } + ), + "domain_sessionid" -> Mutator( + "domain_sessionid", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.domain_sessionid, strategy) + event.domain_sessionid = newValue + modifiedFields + } + ) ) val JsonMutators: Map[String, Mutator] = Map( - "contexts" -> { (event: EnrichedEvent, fn: String => String) => - event.contexts = fn(event.contexts) - }, - "derived_contexts" -> { (event: EnrichedEvent, fn: String => String) => - event.derived_contexts = fn(event.derived_contexts) - }, - "unstruct_event" -> { (event: EnrichedEvent, fn: String => String) => - event.unstruct_event = fn(event.unstruct_event) - } + "contexts" -> Mutator( + "contexts", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.contexts, strategy) + event.contexts = newValue + modifiedFields + } + ), + "derived_contexts" -> Mutator( + "derived_contexts", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.derived_contexts, strategy) + event.derived_contexts = newValue + modifiedFields + } + ), + "unstruct_event" -> Mutator( + "unstruct_event", { (event: EnrichedEvent, strategy: PiiStrategy, fn: ApplyStartegyFn) => + val (newValue, modifiedFields) = fn(event.unstruct_event, strategy) + event.unstruct_event = newValue + modifiedFields + } + ) ) } @@ -133,14 +222,7 @@ object PiiConstants { * a function to apply that strategy to the EnrichedEvent POJO (A scalar field is represented in config py "pojo") */ sealed trait PiiField { - import PiiConstants.Mutator - - /** - * Strategy for this field - * - * @return PiiStrategy a strategy to be applied to this field - */ - def strategy: PiiStrategy + import PiiConstants.ModifiedFields /** * The POJO mutator for this field @@ -154,9 +236,9 @@ sealed trait PiiField { * * @param event The enriched event */ - def transform(event: EnrichedEvent): Unit = fieldMutator(event, applyStrategy) + def transform(event: EnrichedEvent, strategy: PiiStrategy): ModifiedFields = fieldMutator.muatatorFn(event, strategy, applyStrategy) - protected def applyStrategy(fieldValue: String): String + protected def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) } /** @@ -167,6 +249,11 @@ sealed trait PiiStrategy { def scramble(clearText: String): String } +/** + * The modified field trait represents an item that is transformed in either the JSON or a scalar mutators. + */ +sealed trait ModifedField + /** * Companion object. Lets us create a PiiPseudonymizerEnrichment * from a JValue. @@ -177,17 +264,21 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { implicit val json4sFormats = DefaultFormats override val supportedSchema = - SchemaCriterion("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", 1, 0, 0) + SchemaCriterion("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", 2, 0, 0) def parse(config: JValue, schemaKey: SchemaKey): ValidatedNelMessage[PiiPseudonymizerEnrichment] = { for { conf <- matchesSchema(config, schemaKey) - enabled = ScalazJson4sUtils.extract[Boolean](conf, "enabled").toOption.getOrElse(false) + enabled = ScalazJson4sUtils.extract[Boolean](conf, "enabled").toOption.getOrElse(false) + emitIdentificationEvent = ScalazJson4sUtils.extract[Boolean](conf, "emitIdentificationEvent").toOption.getOrElse(false) piiFields <- ScalazJson4sUtils.extract[List[JObject]](conf, "parameters", "pii").leftMap(_.getMessage) - strategyFunction <- extractStrategyFunction(config) - hashFunction <- getHashFunction(strategyFunction) - piiFieldList <- extractFields(piiFields, PiiStrategyPseudonymize(hashFunction)) - } yield if (enabled) PiiPseudonymizerEnrichment(piiFieldList) else PiiPseudonymizerEnrichment(List()) + hashFunctionName <- extractStrategyFunction(config) + hashFunction <- getHashFunction(hashFunctionName) + piiFieldList <- extractFields(piiFields) + } yield + if (enabled) + PiiPseudonymizerEnrichment(piiFieldList, emitIdentificationEvent, PiiStrategyPseudonymize(hashFunction)) + else PiiPseudonymizerEnrichment(List(), emitIdentificationEvent = false, PiiStrategyPseudonymize(hashFunction)) }.leftMap(_.toProcessingMessageNel) private def getHashFunction(strategyFunction: String): Validation[String, MessageDigest] = @@ -198,23 +289,23 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { s"Could not parse PII enrichment config: ${e.getMessage}".failure } - private def extractFields(piiFields: List[JObject], strategy: PiiStrategy): Validation[String, List[PiiField]] = + private def extractFields(piiFields: List[JObject]): Validation[String, List[PiiField]] = piiFields.map { case field: JObject => if (ScalazJson4sUtils.fieldExists(field, "pojo")) - extractString(field, "pojo", "field").flatMap(extractPiiScalarField(strategy, _)) - else if (ScalazJson4sUtils.fieldExists(field, "json")) extractPiiJsonField(strategy, field \ "json") + extractString(field, "pojo", "field").flatMap(extractPiiScalarField) + else if (ScalazJson4sUtils.fieldExists(field, "json")) extractPiiJsonField(field \ "json") else s"PII Configuration: pii field does not include 'pojo' nor 'json' fields. Got: [${compact(field)}]".failure case json => s"PII Configuration: pii field does not contain an object. Got: [${compact(json)}]".failure }.sequenceU - private def extractPiiScalarField(strategy: PiiStrategy, fieldName: String): Validation[String, PiiScalar] = + private def extractPiiScalarField(fieldName: String): Validation[String, PiiScalar] = ScalarMutators .get(fieldName) - .map(PiiScalar(strategy, _).success) - .getOrElse(s"The specified pojo field ${fieldName} is not supported".failure) + .map(PiiScalar(_).success) + .getOrElse(s"The specified pojo field $fieldName is not supported".failure) - private def extractPiiJsonField(strategy: PiiStrategy, jsonField: JValue): Validation[String, PiiJson] = + private def extractPiiJsonField(jsonField: JValue): Validation[String, PiiJson] = (extractString(jsonField, "field") .flatMap( fieldName => @@ -224,7 +315,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { .getOrElse(s"The specified json field ${compact(jsonField)} is not supported".failure)) |@| extractString(jsonField, "schemaCriterion").flatMap(sc => SchemaCriterion.parse(sc).leftMap(_.getMessage)) |@| extractString(jsonField, "jsonPath")) { (fieldMutator: Mutator, sc: SchemaCriterion, jsonPath: String) => - PiiJson(strategy, fieldMutator, sc, jsonPath) + PiiJson(fieldMutator, sc, jsonPath) } private def extractString(jValue: JValue, field: String, tail: String*): Validation[String, String] = @@ -248,7 +339,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { /** * The PiiPseudonymizerEnrichment runs after all other enrichments to find fields that are configured as PII (personally * identifiable information) and apply some anonymization (currently only pseudonymization) on them. Currently a single - * strategy for all the fields is supported due to the config format, and there is only one implemented strategy, + * strategy for all the fields is supported due to the configuration format, and there is only one implemented strategy, * however the enrichment supports a strategy per field. * * The user may specify two types of fields POJO or JSON. A POJO field is effectively a scalar field in the @@ -256,89 +347,126 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { * unstruct_event or an array in the case of derived_events and contexts * * @param fieldList a list of configured PiiFields + * @param emitIdentificationEvent whether to emit an identification event + * @param strategy the pseudonymization strategy to use */ -case class PiiPseudonymizerEnrichment(fieldList: List[PiiField]) extends Enrichment { - def transformer(event: EnrichedEvent): Unit = fieldList.foreach(_.transform(event)) +case class PiiPseudonymizerEnrichment(fieldList: List[PiiField], emitIdentificationEvent: Boolean, strategy: PiiStrategy) + extends Enrichment { + import PiiConstants.ModifiedFields + implicit val json4sFormats = DefaultFormats + new PiiModifiedFieldsSerializer + def transformer(event: EnrichedEvent): Unit = { + val modifiedFields: ModifiedFields = fieldList.flatMap(_.transform(event, strategy)) + event.pii = write(PiiModifiedFields(modifiedFields, strategy)) + } } /** * Specifies a scalar field in POJO and the strategy that should be applied to it. - * @param strategy the strategy that should be applied * @param fieldMutator the field mutator where the strategy will be applied */ -final case class PiiScalar(strategy: PiiStrategy, fieldMutator: PiiConstants.Mutator) extends PiiField { - override def applyStrategy(fieldValue: String): String = if (fieldValue != null) strategy.scramble(fieldValue) else null +final case class PiiScalar(fieldMutator: Mutator) extends PiiField { + import PiiConstants.ModifiedFields + override def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) = + if (fieldValue != null) { + val modifiedValue = strategy.scramble(fieldValue) + (modifiedValue, List(ScalarModifiedField(fieldMutator.fieldName, fieldValue, modifiedValue))) + } else (null, List()) } /** * Specifies a strategy to use, a field mutator where the JSON can be found in the EnrichedEvent POJO, a schema criterion to - * discriminate which contexts to apply this strategy to, and a json path within the contexts where this strategy will + * discriminate which contexts to apply this strategy to, and a JSON path within the contexts where this strategy will * be applied (the path may correspond to multiple fields). * - * @param strategy the strategy that should be applied - * @param fieldMutator the field mutator for the json field + * @param fieldMutator the field mutator for the JSON field * @param schemaCriterion the schema for which the strategy will be applied * @param jsonPath the path where the strategy will be applied */ -final case class PiiJson(strategy: PiiStrategy, fieldMutator: PiiConstants.Mutator, schemaCriterion: SchemaCriterion, jsonPath: String) - extends PiiField { +final case class PiiJson(fieldMutator: Mutator, schemaCriterion: SchemaCriterion, jsonPath: String) extends PiiField { + import PiiConstants.ModifiedFields implicit val json4sFormats = DefaultFormats - override def applyStrategy(fieldValue: String): String = + override def applyStrategy(fieldValue: String, strategy: PiiStrategy): (String, ModifiedFields) = { + val modifiedFields = MutableList[JsonModifiedField]() if (fieldValue != null) { - compact(render(parse(fieldValue) match { - case JObject(jObject) => { + (compact(render(parse(fieldValue) match { + case JObject(jObject) => val jObjectMap = jObject.toMap val updated = jObjectMap.filterKeys(_ == "data").mapValues { case JArray(contexts) => JArray(contexts.map { - case JObject(context) => modifyObjectIfSchemaMatches(context) - case x => x + case JObject(context) => + val (values, listOfModifiedValues) = modifyObjectIfSchemaMatches(context, strategy) + modifiedFields ++= listOfModifiedValues + values + case x => x }) - case JObject(unstructEvent) => modifyObjectIfSchemaMatches(unstructEvent) - case x => x + case JObject(unstructEvent) => + val (values, listOfModifiedValues) = modifyObjectIfSchemaMatches(unstructEvent, strategy) + modifiedFields ++= listOfModifiedValues + values + case x => x } JObject((jObjectMap ++ updated).toList) - } case x => x - })) - } else null + })), modifiedFields.toList) + } else (null, modifiedFields.toList) + } - private def modifyObjectIfSchemaMatches(context: List[(String, json4s.JValue)]): JObject = { + /** + * Tests whether the schema for this event matches the schema criterion and if it does modifies it. + */ + private def modifyObjectIfSchemaMatches(context: List[(String, json4s.JValue)], + strategy: PiiStrategy): (JObject, List[JsonModifiedField]) = { val fieldsObj = context.toMap (for { - schema <- fieldsObj.get("schema") - parsedSchemaMatches <- SchemaKey.parse(schema.extract[String]).map(schemaCriterion.matches).toOption + schema <- fieldsObj.get("schema") + schemaStr = schema.extract[String] + parsedSchemaMatches <- SchemaKey.parse(schemaStr).map(schemaCriterion.matches).toOption data <- fieldsObj.get("data") if parsedSchemaMatches - } yield JObject(fieldsObj.updated("schema", schema).updated("data", jsonPathReplace(data)).toList)).getOrElse(JObject(context)) + updated = jsonPathReplace(data, strategy, schemaStr) + } yield + (JObject(fieldsObj.updated("schema", schema).updated("data", updated._1).toList), updated._2)).getOrElse((JObject(context), List())) } // Configuration for JsonPath private val JsonPathConf = - Configuration.builder().options(JOption.SUPPRESS_EXCEPTIONS).jsonProvider(new JacksonJsonNodeJsonProvider()).build() + Configuration + .builder() + .options(JOption.SUPPRESS_EXCEPTIONS, JOption.ALWAYS_RETURN_LIST) + .jsonProvider(new JacksonJsonNodeJsonProvider()) + .build() /** * Replaces a value in the given context data with the result of applying the strategy that value. */ - private def jsonPathReplace(jValue: JValue): JValue = { + private def jsonPathReplace(jValue: JValue, strategy: PiiStrategy, schema: String): (JValue, List[JsonModifiedField]) = { val objectNode = JsonMethods.mapper.valueToTree[ObjectNode](jValue) val documentContext = JJsonPath.using(JsonPathConf).parse(objectNode) + val modifiedFields = MutableList[JsonModifiedField]() documentContext.map( jsonPath, new MapFunction { override def map(currentValue: AnyRef, configuration: Configuration): AnyRef = currentValue match { - case s: String => strategy.scramble(s) + case s: String => + val newValue = strategy.scramble(s) + modifiedFields += JsonModifiedField(fieldMutator.fieldName, s, newValue, jsonPath, schema) + newValue case a: ArrayNode => a.elements.asScala.map { - case t: TextNode => strategy.scramble(t.asText()) + case t: TextNode => + val originalValue = t.asText() + val newValue = strategy.scramble(originalValue) + modifiedFields += JsonModifiedField(fieldMutator.fieldName, originalValue, newValue, jsonPath, schema) + newValue case default: AnyRef => default } case default: AnyRef => default } } ) - JsonMethods.fromJsonNode(documentContext.json[JsonNode]()) + (JsonMethods.fromJsonNode(documentContext.json[JsonNode]), modifiedFields.toList) } } @@ -351,3 +479,69 @@ case class PiiStrategyPseudonymize(hashFunction: MessageDigest) extends PiiStrat override def scramble(clearText: String): String = hash(clearText) def hash(text: String): String = String.format("%064x", new java.math.BigInteger(1, hashFunction.digest(text.getBytes(TextEncoding)))) } + +/** + * The mutator class encapsulates the mutator function and the field name where the mutator will be applied. + */ +case class Mutator(fieldName: String, muatatorFn: PiiConstants.MutatorFn) + +/** + * Case class for capturing scalar field modifications. + */ +case class ScalarModifiedField(fieldName: String, originalValue: String, modifiedValue: String) extends ModifedField + +/** + * Case class for capturing JSON field modifications. + */ +case class JsonModifiedField(field: String, originalValue: String, modifiedValue: String, jsonPath: String, schema: String) + extends ModifedField + +/** + * Parent class for classes that serialize the values that were modified during the PII enrichment. + */ +case class PiiModifiedFields(val modifiedFields: PiiConstants.ModifiedFields, val strategy: PiiStrategy) + +/** + * Custom serializer for PiiStrategy class + */ +class PiiStrategySerializer + extends CustomSerializer[PiiStrategy](format => + ({ + case jo: JObject => + implicit val json4sFormats = DefaultFormats + val function = (jo \ "pseudonymize" \ "hashFunction").extract[String] + PiiStrategyPseudonymize(MessageDigest.getInstance(function)) + }, { + case psp: PiiStrategyPseudonymize => + "pseudonymize" -> ("hashFunction" -> psp.hashFunction.getAlgorithm) + })) + +/** + * Custom serializer for PiiModifiedFields class + */ +class PiiModifiedFieldsSerializer + extends CustomSerializer[PiiModifiedFields](format => { + val PiiTransformationSchema = "iglu:com.snowplowanalytics.snowplow/pii_transformation/jsonschema/1-0-0" + ({ + case jo: JObject => + implicit val json4sFormats = DefaultFormats + new PiiStrategySerializer + val fields = (jo \ "data" \ "pii").extract[List[ModifedField]] + val strategy = (jo \ "data" \ "strategy").extract[PiiStrategy] + PiiModifiedFields(fields, strategy) + }, { + case pmf: PiiModifiedFields => + implicit val json4sFormats = DefaultFormats + new PiiStrategySerializer + ("schema" -> PiiTransformationSchema) ~ + ("data" -> + ("pii" -> decompose( + pmf.modifiedFields + .map { + case s: ScalarModifiedField => "pojo" -> s + case j: JsonModifiedField => "json" -> j + } + .groupBy(_._1) + .mapValues(_.map(_._2)) + )) ~ + ("strategy" -> decompose(pmf.strategy))) + }) + }) diff --git a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala index 3631f293c1..df324fda92 100644 --- a/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala +++ b/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala @@ -244,4 +244,7 @@ class EnrichedEvent extends Serializable { // True timestamp @BeanProperty var true_tstamp: String = _ + + // Fields modified form PII enrichemnt (JSON String) + @BeanProperty var pii: String = _ } diff --git a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala index 1df78779ff..7f40b44f8c 100644 --- a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala +++ b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala @@ -294,6 +294,7 @@ class EnrichmentConfigsSpec extends Specification with ValidationMatchers { import PiiConstants._ val piiPseudonymizerEnrichmentJson = parse("""{ | "enabled": true, + | "emitIdentificationEvent": true, | "parameters": { | "pii": [ | { @@ -317,35 +318,20 @@ class EnrichmentConfigsSpec extends Specification with ValidationMatchers { | } |}""".stripMargin) - val schemaKey = SchemaKey("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", "1-0-0") + val schemaKey = SchemaKey("com.snowplowanalytics.snowplow.enrichments", "pii_enrichment_config", "jsonschema", "2-0-0") val result = PiiPseudonymizerEnrichment.parse(piiPseudonymizerEnrichmentJson, schemaKey) - val expected = PiiPseudonymizerEnrichment( - fieldList = List( - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = java.security.MessageDigest.getInstance("SHA-256")), - fieldMutator = ScalarMutators.get("user_id").get), - PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = java.security.MessageDigest.getInstance("SHA-256")), - fieldMutator = JsonMutators.get("contexts").get, - schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-*").toOption.get, - jsonPath = "$.emailAddress" - ) - ) - ) result must beSuccessful.like { case piiRes: PiiPseudonymizerEnrichment => { - (piiRes.fieldList.size must_== 2) and + ((piiRes.strategy must haveClass[PiiStrategyPseudonymize]) and + (piiRes.strategy.asInstanceOf[PiiStrategyPseudonymize].hashFunction.toString must contain("SHA-256")) and + (piiRes.fieldList.size must_== 2) and (piiRes.fieldList(0) must haveClass[PiiScalar]) and - (piiRes.fieldList(0).asInstanceOf[PiiScalar].strategy must haveClass[PiiStrategyPseudonymize]) and - (piiRes.fieldList(0).asInstanceOf[PiiScalar].strategy.asInstanceOf[PiiStrategyPseudonymize].hashFunction.toString must contain( - "SHA-256")) and (piiRes.fieldList(0).asInstanceOf[PiiScalar].fieldMutator must_== ScalarMutators.get("user_id").get) and - (piiRes.fieldList(1).asInstanceOf[PiiJson].strategy must haveClass[PiiStrategyPseudonymize]) and (piiRes.fieldList(1).asInstanceOf[PiiJson].fieldMutator must_== JsonMutators.get("contexts").get) and (piiRes.fieldList(1).asInstanceOf[PiiJson].schemaCriterion.toString must_== "iglu:com.acme/email_sent/jsonschema/1-*-*") and (piiRes.fieldList(1).asInstanceOf[PiiJson].jsonPath must_== "$.emailAddress") and - (piiRes.fieldList(1).asInstanceOf[PiiJson].strategy.asInstanceOf[PiiStrategyPseudonymize].hashFunction.toString must contain( - "SHA-256")) + (piiRes.emitIdentificationEvent mustEqual (true))) } } } diff --git a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala index a1bf559870..4a2a89cb59 100644 --- a/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala +++ b/3-enrich/scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/PiiPseudonymizerEnrichmentSpec.scala @@ -11,7 +11,8 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.enrich +package com.snowplowanalytics +package snowplow.enrich package common package enrichments package registry @@ -31,13 +32,14 @@ import java.security.MessageDigest // Snowplow import common.loaders.{CollectorApi, CollectorContext, CollectorPayload, CollectorSource} import common.outputs.EnrichedEvent +import utils.TestResourcesRepositoryRef import common.SpecHelpers.toNameValuePairs -import common.utils.TestResourcesRepositoryRef // Iglu -import com.snowplowanalytics.iglu.client.SchemaCriterion -import com.snowplowanalytics.iglu.client.Resolver -import com.snowplowanalytics.iglu.client.repositories.{EmbeddedRepositoryRef, RepositoryRefConfig} +import iglu.client.SchemaCriterion +import iglu.client.Resolver +import iglu.client.repositories.RepositoryRefConfig +import iglu.client.validation.ValidatableJValue._ // Scalaz import scalaz.Scalaz._ @@ -53,6 +55,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche Hashing configured JSON fields in POJO should work when multiple fields are matched through jsonpath $e4 Hashing configured JSON fields in POJO should work when multiple fields are matched through schemacriterion $e5 Hashing configured JSON fields in POJO should silently ignore unsupported types $e6 + Hashing configured JSON and scalar fields in POJO emits a correct pii_transformation event $e7 """ def commonSetup(enrichmentMap: EnrichmentMap): List[ValidatedEnrichedEvent] = { @@ -134,27 +137,28 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche EtlPipeline.processEvents(registry, s"spark-0.0.0", new DateTime(1500000000L), input) } - private val ipEnrichment = IpLookupsEnrichment(Some(("geo", new URI("/ignored-in-local-mode/"), "GeoIPCity.dat")), - Some(("isp", new URI("/ignored-in-local-mode/"), "GeoIPISP.dat")), - None, - None, - None, - true) + private val ipEnrichment = IpLookupsEnrichment( + Some(("geo", new URI("/ignored-in-local-mode/"), "GeoIPCity.dat")), + Some(("isp", new URI("/ignored-in-local-mode/"), "GeoIPISP.dat")), + None, + None, + None, + localMode = true + ) def e1 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), - fieldMutator = ScalarMutators.get("user_id").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), - fieldMutator = ScalarMutators.get("user_ipaddress").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), - fieldMutator = ScalarMutators.get("ip_domain").get), - PiiScalar(strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), - fieldMutator = ScalarMutators.get("user_fingerprint").get) - ))) + PiiScalar(fieldMutator = ScalarMutators.get("user_id").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_ipaddress").get), + PiiScalar(fieldMutator = ScalarMutators.get("ip_domain").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_fingerprint").get) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -167,9 +171,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => (enrichedEvent.app_id must_== expected.app_id) and (enrichedEvent.user_id must_== expected.user_id) and (enrichedEvent.user_ipaddress must_== expected.user_ipaddress) and @@ -178,34 +182,33 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (enrichedEvent.geo_city must_== expected.geo_city) and (enrichedEvent.etl_tstamp must_== expected.etl_tstamp) and (enrichedEvent.collector_tstamp must_== expected.collector_tstamp) - } } } def e2 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, jsonPath = "$.emailAddress" ), PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-1-0").toOption.get, jsonPath = "$.data.emailAddress2" ), PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("unstruct_event").get, schemaCriterion = SchemaCriterion.parse("iglu:com.mailgun/message_clicked/jsonschema/1-0-0").toOption.get, jsonPath = "$.ip" ) - ))) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -219,9 +222,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) val unstructEventJ = parse(enrichedEvent.unstruct_event) @@ -240,22 +243,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (((unstructEventJ \ "data") \ "data" \ "ip") .extract[String] must_== "b5814ada7bb3abb2ed7f8713433a60ed3b3780f7d98a95c936cc62abb16f316f") and (((unstructEventJ \ "data") \ "data" \ "myVar2").extract[String] must_== "awesome") - } } } def e3 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-*").toOption.get, jsonPath = "$.field.that.does.not.exist.in.this.instance" ) - ))) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -269,31 +273,32 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress").extract[String] must_== "jim@acme.com") and (((contextJ \ "data")(0) \ "data" \ "emailAddress2").extract[String] must_== "bob@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e4 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, jsonPath = "$.['emailAddress', 'emailAddress2']" ) - ))) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) @@ -307,9 +312,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -318,22 +323,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche .extract[String] must_== "405ac8384fa984f787f9486daf34d84d98f20c4d6a12e2cc4ed89be3bcb06ad6") and (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e5 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-0").toOption.get, jsonPath = "$.emailAddress" ) - ))) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -346,9 +352,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -358,22 +364,23 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche (((contextJ \ "data")(1) \ "data" \ "emailAddress") .extract[String] must_== "663ea32adb6f26f7a025e3b6c850294d0d7755c3010c5e7a5fd690cfa5d2938f") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") - } } } def e6 = { val enrichmentMap = Map( - ("ip_lookups" -> ipEnrichment), - ("pii_enrichment_config" -> PiiPseudonymizerEnrichment( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( List( PiiJson( - strategy = PiiStrategyPseudonymize(hashFunction = MessageDigest.getInstance("SHA-256")), fieldMutator = JsonMutators.get("contexts").get, schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-*-*").toOption.get, jsonPath = "$.someInt" ) - ))) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) ) val output = commonSetup(enrichmentMap = enrichmentMap) val expected = new EnrichedEvent() @@ -386,9 +393,9 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche expected.etl_tstamp = "1970-01-18 08:40:00.000" expected.collector_tstamp = "2017-07-14 03:39:39.000" output.size must_== 1 - val out = output(0) + val out = output.head out must beSuccessful.like { - case enrichedEvent => { + case enrichedEvent => implicit val formats = org.json4s.DefaultFormats val contextJ = parse(enrichedEvent.contexts) (((contextJ \ "data")(0) \ "data" \ "emailAddress") @@ -399,7 +406,67 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidationMatche .extract[String] must_== "tim@acme.com") and (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") (((contextJ \ "data")(1) \ "data" \ "someInt").extract[Int] must_== 1) - } + } + } + + def e7 = { + val enrichmentMap = Map( + "ip_lookups" -> ipEnrichment, + "pii_enrichment_config" -> PiiPseudonymizerEnrichment( + List( + PiiScalar(fieldMutator = ScalarMutators.get("user_id").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_ipaddress").get), + PiiScalar(fieldMutator = ScalarMutators.get("ip_domain").get), + PiiScalar(fieldMutator = ScalarMutators.get("user_fingerprint").get), + PiiJson( + fieldMutator = JsonMutators.get("contexts").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-0-*").toOption.get, + jsonPath = "$.emailAddress" + ), + PiiJson( + fieldMutator = JsonMutators.get("contexts").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.acme/email_sent/jsonschema/1-1-0").toOption.get, + jsonPath = "$.data.emailAddress2" + ), + PiiJson( + fieldMutator = JsonMutators.get("unstruct_event").get, + schemaCriterion = SchemaCriterion.parse("iglu:com.mailgun/message_clicked/jsonschema/1-0-0").toOption.get, + jsonPath = "$.ip" + ) + ), + false, + PiiStrategyPseudonymize(MessageDigest.getInstance("SHA-256")) + ) + ) + val output = commonSetup(enrichmentMap = enrichmentMap) + val expected = new EnrichedEvent() + expected.app_id = "ads" + expected.ip_domain = null + expected.geo_city = "Delray Beach" + expected.etl_tstamp = "1970-01-18 08:40:00.000" + expected.collector_tstamp = "2017-07-14 03:39:39.000" + expected.pii = + """{"schema":"iglu:com.snowplowanalytics.snowplow/pii_transformation/jsonschema/1-0-0","data":{"pii":{"pojo":[{"fieldName":"user_id","originalValue":"john@acme.com","modifiedValue":"4b2d8785b49bad23638b17d8db76857a79bf79441241a78a97d88cc64bbf766e"},{"fieldName":"user_ipaddress","originalValue":"70.46.123.145","modifiedValue":"36595ea260a82b7e2d7cf44121892bf31031a9c27077d8c802454464178456c2"},{"fieldName":"user_fingerprint","originalValue":"its_you_again!","modifiedValue":"9f9fc89b7a5428f2646347974404650fc8776f791afc2200efc8a82aa754e7e6"}],"json":[{"field":"contexts","originalValue":"jim@acme.com","modifiedValue":"3571b422ecb9ac85cb654b2fce521ae351d4695b0fb788aac75caf724e7881f0","jsonPath":"$.emailAddress","schema":"iglu:com.acme/email_sent/jsonschema/1-0-0"},{"field":"contexts","originalValue":"bob@acme.com","modifiedValue":"405ac8384fa984f787f9486daf34d84d98f20c4d6a12e2cc4ed89be3bcb06ad6","jsonPath":"$.data.emailAddress2","schema":"iglu:com.acme/email_sent/jsonschema/1-1-0"},{"field":"unstruct_event","originalValue":"50.56.129.169","modifiedValue":"b5814ada7bb3abb2ed7f8713433a60ed3b3780f7d98a95c936cc62abb16f316f","jsonPath":"$.ip","schema":"iglu:com.mailgun/message_clicked/jsonschema/1-0-0"}]},"strategy":{"pseudonymize":{"hashFunction":"SHA-256"}}}}""" + + output.size must_== 1 + val out = output.head + out must beSuccessful.like { + case enrichedEvent => + implicit val formats = org.json4s.DefaultFormats + val contextJ = parse(enrichedEvent.contexts) + val unstructEventJ = parse(enrichedEvent.unstruct_event) + (enrichedEvent.pii must_== expected.pii) and // This is the important test, the rest just verify that nothing has changed. + (enrichedEvent.app_id must_== expected.app_id) and + (enrichedEvent.ip_domain must_== expected.ip_domain) and + (enrichedEvent.geo_city must_== expected.geo_city) and + (enrichedEvent.etl_tstamp must_== expected.etl_tstamp) and + (enrichedEvent.collector_tstamp must_== expected.collector_tstamp) and + (((contextJ \ "data")(0) \ "data" \ "emailAddress2").extract[String] must_== "bob@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "emailAddress").extract[String] must_== "tim@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "emailAddress2").extract[String] must_== "tom@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "data" \ "emailAddress").extract[String] must_== "jim@acme.com") and + (((contextJ \ "data")(1) \ "data" \ "schema").extract[String] must_== "iglu:com.acme/email_sent/jsonschema/1-0-0") and + (((unstructEventJ \ "data") \ "data" \ "myVar2").extract[String] must_== "awesome") } } }