Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Common Enrich: write out event vendor/name/format/version (closes sno…
Browse files Browse the repository at this point in the history
  • Loading branch information
Dani Solà authored and fblundun committed Sep 25, 2015
1 parent df354b5 commit 8c3c089
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,27 @@ object EnrichmentManager {
case _ => unitSuccess
}

// Execute the JavaScript scripting enrichment
val jsScript = registry.getJavascriptScriptEnrichment match {
case Some(jse) => jse.process(event)
case None => Nil.success
}

// Validate contexts and unstructured events
val shred = Shredder.shred(event) match {
case Failure(msgs) => msgs.map(_.toString).fail
case Success(_) => unitSuccess.toValidationNel
}

// Extract the event vendor/name/format/version
val extractSchema = SchemaEnrichment.extractSchema(event).map(schemaKey => {
event.event_vendor = schemaKey.vendor
event.event_name = schemaKey.name
event.event_format = schemaKey.format
event.event_version = schemaKey.version
unitSuccess
})

// Execute the JavaScript scripting enrichment
val jsScript = registry.getJavascriptScriptEnrichment match {
case Some(jse) => jse.process(event)
case None => Nil.success
}

// Assemble array of derived contexts
val derived_contexts = List(uaParser).collect {
case Success(Some(context)) => context
Expand Down Expand Up @@ -460,8 +469,9 @@ object EnrichmentManager {
crossDomain.toValidationNel |@|
jsScript.toValidationNel |@|
campaign |@|
shred) {
(_,_,_,_,_,_,_,_) => ()
shred |@|
extractSchema.toValidationNel) {
(_,_,_,_,_,_,_,_,_) => ()
}
(first |@| second) {
(_,_) => event
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2012-2014 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics
package snowplow
package enrich
package common
package enrichments

// Iglu
import iglu.client.SchemaKey
import iglu.client.Resolver

// Jackson
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.TextNode

// Common
import outputs.EnrichedEvent
import utils.shredder.Shredder

// Scalaz
import scalaz._
import Scalaz._

object SchemaEnrichment {

val pageViewSchema = SchemaKey("com.snowplowanalytics.snowplow", "page_view", "jsonschema", "1-0-0").success
val transactionSchema = SchemaKey("com.snowplowanalytics.snowplow", "transaction", "jsonschema", "1-0-0").success
val transactionItemSchema = SchemaKey("com.snowplowanalytics.snowplow", "transaction_item", "jsonschema", "1-0-0").success
val structSchema = SchemaKey("com.google.analytics", "event", "jsonschema", "1-0-0").success

def extractSchema(event: EnrichedEvent)(implicit resolver: Resolver): Validation[String, SchemaKey] = event.event match {
case "page_view" => pageViewSchema
case "struct" => structSchema
case "transaction" => transactionSchema
case "transaction_item" => transactionItemSchema
case "unstruct" => extractUnstructSchema(event)
case eventType => "Unrecognized event [%s]".format(eventType).fail
}

private def extractUnstructSchema(event: EnrichedEvent)(implicit resolver: Resolver): Validation[String, SchemaKey] = {
Shredder.extractUnstructEvent(event) match {
case Some(Success(List(json))) =>
parseSchemaKey(Option(json.get("schema")))
case _ =>
"Unstructured event couldn't be extracted".fail
}
}

private def parseSchemaKey(node: Option[JsonNode]): Validation[String, SchemaKey] = node match {
case Some(textNode: TextNode) =>
SchemaKey.parse(textNode.textValue()).<-:(_.toString)
case _ =>
"Unrecognized unstructured event structure".fail // It's validated by the Shredder, so it should never happen
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class EnrichedEvent {

// Transaction (i.e. this logging event)
@BeanProperty var event: String = _
@BeanProperty var event_vendor: String = _
@BeanProperty var event_name: String = _
@BeanProperty var event_format: String = _
@BeanProperty var event_version: String = _
@BeanProperty var event_id: String = _
@BeanProperty var txn_id: String = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ object Shredder {
event.event_id, event.collector_tstamp)

// Get our unstructured event and Lists of contexts and derived_contexts, all Option-boxed
val ue = for {
v <- extractAndValidateJson("ue_properties", UePropertiesSchema, Option(event.unstruct_event))
} yield for {
j <- v
l = List(j)
} yield l
val ue = extractUnstructEvent(event)

def extractContexts(json: String, field: String): Option[ValidatedNelMessage[List[JsonNode]]] = {
for {
Expand Down Expand Up @@ -133,6 +128,15 @@ object Shredder {
} yield mj).flatMap(_.sequenceU) // Swap nested List[scalaz.Validation[...]
}

def extractUnstructEvent(event: EnrichedEvent)(implicit resolver: Resolver): Option[ValidatedNelMessage[JsonNodes]] = {
for {
v <- extractAndValidateJson("ue_properties", UePropertiesSchema, Option(event.unstruct_event))
} yield for {
j <- v
l = List(j)
} yield l
}

/**
* Convenience to make a partial TypeHierarchy.
* Partial because we don't have the complete
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2012-2014 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich
package common
package enrichments

// Iglu
import com.snowplowanalytics.iglu.client.SchemaKey

// Common
import outputs.EnrichedEvent
import enrichments.SchemaEnrichment._

// Specs2
import org.specs2.Specification
import org.specs2.matcher.DataTables
import org.specs2.scalaz.ValidationMatchers

// Scalaz
import scalaz._
import Scalaz._

class SchemaEnrichmentTest extends Specification with DataTables with ValidationMatchers {

implicit val resolver = SpecHelpers.IgluResolver
val signupFormSubmitted = """{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"name":"Χαριτίνη NEW Unicode test","email":"alex+test@snowplowanalytics.com","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}"""
val signupFormSubmittedSchema = SchemaKey("com.snowplowanalytics.snowplow-website", "signup_form_submitted", "jsonschema", "1-0-0").success
val invalidPayload = """{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"serviceType":"unsure"}}}"""

def is =
"Extracting SchemaKeys from valid events should work" ! e1^
"Invalid events should fail when extracting SchemaKeys" ! e2

def e1 =
"SPEC NAME" || "EVENT" | "EXPECTED SCHEMA" |
"page view" !! event("page_view") ! pageViewSchema |
"transaction" !! event("transaction") ! transactionSchema |
"transaction item" !! event("transaction_item") ! transactionItemSchema |
"struct event" !! event("struct") ! structSchema |
"invalid unstruct event" !! unstructEvent(invalidPayload) ! signupFormSubmittedSchema |
"unstruct event" !! unstructEvent(signupFormSubmitted) ! signupFormSubmittedSchema |> {
(_, event, expected) => {
val schema = SchemaEnrichment.extractSchema(event)
schema must_== expected
}
}

val nonSchemedPayload = """{"name":"Χαριτίνη NEW Unicode test","email":"alex+test@snowplowanalytics.com","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}"""
val invalidKeyPayload = """{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema","data":{"name":"Χαριτίνη NEW Unicode test","email":"alex+test@snowplowanalytics.com","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}"""

def e2 =
"SPEC NAME" || "EVENT" |
"unknown event" !! event("unknown") |
"missing event" !! event(null) |
"not schemed" !! unstructEvent(nonSchemedPayload) |
"invalid key" !! unstructEvent(invalidKeyPayload) |> {
(_, event) => {
val schema = SchemaEnrichment.extractSchema(event)
schema must beFailing
}
}

def event(eventType: String) = {
val event: EnrichedEvent = new EnrichedEvent()
event.setEvent(eventType)
event
}

def unstructEvent(unstruct: String) = {
val event: EnrichedEvent = new EnrichedEvent()
event.setEvent("unstruct")
event.setUnstruct_event(unstruct)
event
}
}

0 comments on commit 8c3c089

Please sign in to comment.