Skip to content

Commit

Permalink
Port JSON Schema validation to Common Enrich from Shred (closes #1637)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dani Solà authored and fblundun committed Aug 26, 2015
1 parent 1f76b48 commit 124a00e
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package common
package enrichments

// Joda

import com.snowplowanalytics.iglu.client.Resolver
import org.joda.time.DateTime
import org.json4s.JObject

// Scalaz
import scalaz._
Expand All @@ -30,8 +33,7 @@ import util.Tap._
import adapters.RawEvent
import outputs.EnrichedEvent

import utils.{ConversionUtils => CU}
import utils.{JsonUtils => JU}
import com.snowplowanalytics.snowplow.enrich.common.utils.{ConversionUtils => CU, JsonUtils => JU, Shredder}
import utils.MapTransformer._

import enrichments.{EventEnrichments => EE}
Expand Down Expand Up @@ -62,7 +64,7 @@ object EnrichmentManager {
* NonHiveOutput.
*/
// TODO: etlTstamp shouldn't be stringly typed really
def enrichEvent(registry: EnrichmentRegistry, hostEtlVersion: String, etlTstamp: DateTime, raw: RawEvent): ValidatedEnrichedEvent = {
def enrichEvent(registry: EnrichmentRegistry, hostEtlVersion: String, etlTstamp: DateTime, raw: RawEvent)(implicit resolver: Resolver): ValidatedEnrichedEvent = {

// Placeholders for where the Success value doesn't matter.
// Useful when you're updating large (>22 field) POSOs.
Expand Down Expand Up @@ -409,6 +411,12 @@ object EnrichmentManager {
case None => Nil.success
}

// Validate contexts and unstructured events
val shred = Shredder.shred(event) match {
case Failure(msgs) => msgs.map(_.toString).fail
case Success(_) => unitSuccess.toValidationNel
}

// Assemble array of derived contexts
val derived_contexts = List(uaParser).collect {
case Success(Some(context)) => context
Expand Down Expand Up @@ -451,8 +459,9 @@ object EnrichmentManager {
pageQsMap.toValidationNel |@|
crossDomain.toValidationNel |@|
jsScript.toValidationNel |@|
campaign) {
(_,_,_,_,_,_,_) => ()
campaign |@|
shred) {
(_,_,_,_,_,_,_,_) => ()
}
(first |@| second) {
(_,_) => event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object JsonUtils {
* Converts a Joda DateTime into
* a JSON Schema-compatible date-time string.
*
* @param datetime The Joda DateTime
* @param dateTime The Joda DateTime
* to convert to a timestamp String
* @return the timestamp String
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
package com.snowplowanalytics
package snowplow
package enrich
package hadoop
package shredder
package common
package utils

// Jackson
import com.fasterxml.jackson.databind.JsonNode
Expand All @@ -41,9 +41,6 @@ import iglu.client.{
import iglu.client.validation.ProcessingMessageMethods._
import iglu.client.validation.ValidatableJsonMethods._

// This project
import hadoop.utils.JsonUtils

/**
* The shredder takes the two fields containing JSONs
* (contexts and unstructured event properties) and
Expand All @@ -53,6 +50,11 @@ import hadoop.utils.JsonUtils
*/
object Shredder {

/**
* A (possibly empty) list of JsonNodes
*/
type JsonNodes = List[JsonNode]

// All shredded JSONs have the events type (aka table) as their ultimate parent
private val TypeHierarchyRoot = "events"

Expand Down Expand Up @@ -83,7 +85,7 @@ object Shredder {
* and on Failure a NonEmptyList of
* JsonNodes containing error messages
*/
def shred(event: EnrichedEvent)(implicit resolver: Resolver): ValidatedNel[JsonSchemaPairs] = {
def shred(event: EnrichedEvent)(implicit resolver: Resolver): ValidatedNelMessage[JsonSchemaPairs] = {

// Define what we know so far of the type hierarchy.
val partialHierarchy = makePartialHierarchy(
Expand All @@ -97,7 +99,7 @@ object Shredder {
l = List(j)
} yield l

def extractContexts(json: String, field: String): Option[ValidatedNel[List[JsonNode]]] = {
def extractContexts(json: String, field: String): Option[ValidatedNelMessage[List[JsonNode]]] = {
for {
v <- extractAndValidateJson(field, ContextsSchema, Option(json))
} yield for {
Expand All @@ -109,7 +111,7 @@ object Shredder {
val c = extractContexts(event.contexts, "context")
val dc = extractContexts(event.derived_contexts, "derived_contexts")

def flatten(o: Option[ValidatedNel[JsonNodes]]): ValidatedNel[JsonNodes] = o match {
def flatten(o: Option[ValidatedNelMessage[JsonNodes]]): ValidatedNelMessage[JsonNodes] = o match {
case Some(vjl) => vjl
case None => List[JsonNode]().success
}
Expand Down Expand Up @@ -140,7 +142,7 @@ object Shredder {
* element
* @return the partially complete TypeHierarchy
*/
private[shredder] def makePartialHierarchy(rootId: String, rootTstamp: String): TypeHierarchy =
private[utils] def makePartialHierarchy(rootId: String, rootTstamp: String): TypeHierarchy =
TypeHierarchy(
rootId = rootId,
rootTstamp = rootTstamp,
Expand Down Expand Up @@ -172,7 +174,7 @@ object Shredder {
* contain the full schema key, plus the
* now-finalized hierarchy
*/
private[shredder] def attachMetadata(
private def attachMetadata(
instanceSchemaPair: JsonSchemaPair,
partialHierarchy: TypeHierarchy): JsonSchemaPair = {

Expand Down Expand Up @@ -215,8 +217,8 @@ object Shredder {
* Failure, or a singular
* JsonNode on success
*/
private[shredder] def extractAndValidateJson(field: String, schemaCriterion: SchemaCriterion, instance: Option[String])(implicit resolver: Resolver):
Option[ValidatedNel[JsonNode]] =
private def extractAndValidateJson(field: String, schemaCriterion: SchemaCriterion, instance: Option[String])(implicit resolver: Resolver):
Option[ValidatedNelMessage[JsonNode]] =
for {
i <- instance
} yield for {
Expand All @@ -235,6 +237,6 @@ object Shredder {
* @param instance The JSON instance itself
* @return the pimped ScalazArgs
*/
private[shredder] def extractJson(field: String, instance: String): ValidatedNel[JsonNode] =
private def extractJson(field: String, instance: String): ValidatedNelMessage[JsonNode] =
JsonUtils.extractJson(field, instance).toProcessingMessageNel
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich
package hadoop
package shredder
package common
package utils

// Jackson
import com.github.fge.jackson.JacksonUtils
Expand All @@ -30,12 +30,6 @@ import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// Snowplow Common Enrich
import common._

// This project
import hadoop.utils.JsonUtils

/**
* Companion object contains helpers.
*/
Expand Down Expand Up @@ -112,7 +106,7 @@ case class TypeHierarchy(
* element from
* @return the last-but-one element from this list
*/
private[shredder] def secondTail[A](ls: List[A]): A = ls match {
private def secondTail[A](ls: List[A]): A = ls match {
case h :: _ :: Nil => h
case _ :: tail => secondTail(tail)
case _ => throw new NoSuchElementException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import com.snowplowanalytics.maxmind.iplookups.IpLocation
// JSON Schema
import com.github.fge.jsonschema.core.report.ProcessingMessage

// Iglu
import com.snowplowanalytics.iglu.client.JsonSchemaPair

// This project
import common.loaders.CollectorPayload
import common.adapters.RawEvent
Expand Down Expand Up @@ -150,4 +153,9 @@ package object common {
* Parameters inside of a raw event
*/
type RawEventParameters = Map[String, String]

/**
* A (possibly empty) list of JsonSchemaPairs
*/
type JsonSchemaPairs = List[JsonSchemaPair]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
package com.snowplowanalytics
package snowplow
package enrich
package hadoop
package shredder
package common
package utils

// Snowplow Utils
import util.Tap._

// Snowplow Common Enrich
import common.outputs.EnrichedEvent
import outputs.EnrichedEvent

// Specs2
import org.specs2.Specification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich
package hadoop
package shredder
package common
package utils

// Specs2
import org.specs2.Specification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.twitter.scalding._
import common._
import common.FatalEtlError
import common.outputs.BadRow
import common.utils.Shredder

// Iglu Scala Client
import iglu.client.{
Expand All @@ -44,7 +45,6 @@ import iglu.client.validation.ProcessingMessageMethods._

// This project
import inputs.EnrichedEventLoader
import shredder.Shredder
import outputs.{
ShreddedPartition => UrShreddedPartition
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,4 @@ package object hadoop {
type Validated[A] = Validation[ProcessingMessage, A]
type ValidatedNel[A] = ValidationNel[ProcessingMessage, A]

/**
* A (possibly empty) list of JsonNodes
*/
type JsonNodes = List[JsonNode]

/**
* A (possibly empty) list of JsonSchemaPairs
*/
type JsonSchemaPairs = List[JsonSchemaPair]

}

0 comments on commit 124a00e

Please sign in to comment.