Skip to content

Commit

Permalink
Scala Hadoop Shred: extracted JSONs from new derived_contexts field (c…
Browse files Browse the repository at this point in the history
…loses #786)
  • Loading branch information
fblundun committed Feb 5, 2015
1 parent 1d88808 commit 1c0c8ce
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object EnrichedEventLoader {
val eventId = 6
val contexts = 52
val unstructEvent = 58
val derivedContexts = 119
}

/**
Expand All @@ -76,6 +77,7 @@ object EnrichedEventLoader {
val event = new EnrichedEvent().tap { e =>
e.contexts = fields(FieldIndexes.contexts)
e.unstruct_event = fields(FieldIndexes.unstructEvent)
e.derived_contexts = fields(FieldIndexes.derivedContexts)
}

// Get and validate the event ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,25 @@ object Shredder {
val partialHierarchy = makePartialHierarchy(
event.event_id, event.collector_tstamp)

// Get our unstructured event and List of contexts, both Option-boxed
// Get our unstructured event and Lists of contexts and derived_contexts, all Option-boxed
val ue = for {
v <- extractAndValidateJson("ue_properties", UePropertiesSchema, Option(event.unstruct_event))
} yield for {
j <- v
l = List(j)
} yield l

val c = for {
v <- extractAndValidateJson("context", ContextsSchema, Option(event.contexts))
} yield for {
j <- v
l = j.iterator.toList
} yield l
def extractContexts(json: String, field: String): Option[ValidatedNel[List[JsonNode]]] = {
for {
v <- extractAndValidateJson(field, ContextsSchema, Option(json))
} yield for {
j <- v
l = j.iterator.toList
} yield l
}

val c = extractContexts(event.contexts, "context")
val dc = extractContexts(event.derived_contexts, "derived_contexts")

def flatten(o: Option[ValidatedNel[JsonNodes]]): ValidatedNel[JsonNodes] = o match {
case Some(vjl) => vjl
Expand All @@ -111,7 +116,7 @@ object Shredder {

// Let's harmonize our Option[JsonNode] and Option[List[JsonNode]]
// into a List[JsonNode], collecting Failures too
val all = (flatten(ue) |@| flatten(c)) { _ ++ _ }
val all = (flatten(ue) |@| flatten(c) |@| flatten(dc)) { _ ++ _ ++ _ }

// Let's validate the instances against their schemas, and
// then attach metadata to the nodes
Expand Down

0 comments on commit 1c0c8ce

Please sign in to comment.