Skip to content

Commit

Permalink
Scala Hadoop Shred: update tests to expect bad row JSONs with timesta…
Browse files Browse the repository at this point in the history
…mps and processing messages (closes #1953)
  • Loading branch information
fblundun committed Aug 26, 2015
1 parent 576d9f5 commit 72fdb7c
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import scala.collection.mutable.ListBuffer
import scalaz._
import Scalaz._

// Scala
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// Scalding
import com.twitter.scalding._

Expand Down Expand Up @@ -174,4 +179,16 @@ object JobSpecHelpers {
Sinks(output, badRows, exceptions)
}

/**
* Removes the timestamp from bad rows so that what remains is deterministic
*
* @param badRow
* @return The bad row without the timestamp
*/
def removeTstamp(badRow: String): String = {
val badRowJson = parse(badRow)
val badRowWithoutTimestamp = ("line", (badRowJson \ "line")) ~ ("errors", (badRowJson \ "errors"))
compact(badRowWithoutTimestamp)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ object InvalidEnrichedEventsSpec {
)

val expected = (line: String) =>
"""|{"line":"%s","errors":[
|{"level":"error","message":"Field [event_id]: [not-a-uuid] is not a valid UUID"},
|{"level":"error","message":"Field [collector_tstamp]: [29th May 2013 18:04:12] is not in the expected Redshift/Postgres timestamp format"}
|]}""".stripMargin.format(line.replaceAll("\"", "\\\\\"")).replaceAll("[\n\r]","").replaceAll("[\t]","\\\\t")
"""{"line":"%s","errors":["error: Field [event_id]: [not-a-uuid] is not a valid UUID\n level: \"error\"\n","error: Field [collector_tstamp]: [29th May 2013 18:04:12] is not in the expected Redshift/Postgres timestamp format\n level: \"error\"\n"]}"""
.format(line.replaceAll("\"", "\\\\\"")).replaceAll("[\n\r]","").replaceAll("[\t]","\\\\t")
}

/**
Expand Down Expand Up @@ -72,7 +70,7 @@ class InvalidEnrichedEventsSpec extends Specification {
sink[String](Tsv("badFolder")){ json =>
"write a bad row JSON with input line and error message for each input line" in {
for (i <- json.indices) {
json(i) must_== InvalidEnrichedEventsSpec.expected(InvalidEnrichedEventsSpec.lines(i)._2)
removeTstamp(json(i)) must_== InvalidEnrichedEventsSpec.expected(InvalidEnrichedEventsSpec.lines(i)._2)
}
}
}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@ object InvalidJsonsSpec {
)

val expected = (line: String) =>
s"""|{
|"line":"${line}",
|"errors":[
|{"level":"error","message":"Field [ue_properties]: invalid JSON [|%|] with parsing error: Unexpected character ('|' (code 124)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@xxxxxx; line: 1, column: 2]"},
|{"level":"error","message":"Field [context]: invalid JSON [&&&] with parsing error: Unexpected character ('&' (code 38)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@xxxxxx; line: 1, column: 2]"}
|]
|}""".stripMargin.replaceAll("[\n\r]","").replaceAll("[\t]","\\\\t")
"""{"line":"%s","errors":["error: Field [ue_properties]: invalid JSON [|%%|] with parsing error: Unexpected character ('|' (code 124)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@xxxxxx; line: 1, column: 2]\n level: \"error\"\n","error: Field [context]: invalid JSON [&&&] with parsing error: Unexpected character ('&' (code 38)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@xxxxxx; line: 1, column: 2]\n level: \"error\"\n"]}"""
.format(line)
.replaceAll("[\t]","\\\\t")
}

/**
Expand Down Expand Up @@ -75,7 +71,7 @@ class InvalidJsonsSpec extends Specification {
sink[String](Tsv("badFolder")){ json =>
"write a bad row JSON with input line and error message for each bad JSON" in {
for (i <- json.indices) {
json(i) must_== InvalidJsonsSpec.expected(InvalidJsonsSpec.lines(i)._2)
removeTstamp(json(i)) must_== InvalidJsonsSpec.expected(InvalidJsonsSpec.lines(i)._2)
}
}
}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ object MissingJsonSchemaSpec {
"""snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:04:12.000 2014-05-29 18:04:11.639 page_view a4583919-4df8-496a-917b-d40fa1c8ca7f 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.website/fake_context/jsonschema/1-0-0","data":{"author":"Alex Dean","topics":["hive","udf","serde","java","hadoop"],"subCategory":"inside the plow","category":"blog","whenPublished":"2013-02-08"}}]} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015"""
)

val expected = (line: String) => """{"line":"%s","errors":[{"level":"error","message":"Could not find schema with key iglu:com.snowplowanalytics.website/fake_context/jsonschema/1-0-0 in any repository, tried:","repositories":["Iglu Client Embedded [embedded]","Iglu Central [HTTP]"]}]}""".format(line.replaceAll("\"", "\\\\\"")).replaceAll("[\t]","\\\\t")
val expected = (line: String) => """{"line":"%s","errors":["error: Could not find schema with key iglu:com.snowplowanalytics.website/fake_context/jsonschema/1-0-0 in any repository, tried:\n level: \"error\"\n repositories: [\"Iglu Client Embedded [embedded]\",\"Iglu Central [HTTP]\"]\n"]}"""
.format(line.replaceAll("\"", "\\\\\""))
.replaceAll("[\t]", "\\\\t")
}

/**
Expand Down Expand Up @@ -68,7 +70,7 @@ class MissingJsonSchemaSpec extends Specification {
sink[String](Tsv("badFolder")){ json =>
"write a bad row JSON with input line and error message for each missing schema" in {
for (i <- json.indices) {
json(i) must_== MissingJsonSchemaSpec.expected(MissingJsonSchemaSpec.lines(i)._2)
removeTstamp(json(i)) must_== MissingJsonSchemaSpec.expected(MissingJsonSchemaSpec.lines(i)._2)
}
}
}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object NotEnrichedEventsSpec {
"2012-05-21 07:14:47 FRA2 3343 83.4.209.35 GET d3t05xllj8hhgj.cloudfront.net"
)

val expected = (line: String) => s"""{"line":"${line}","errors":[{"level":"error","message":"Line does not match Snowplow enriched event (expected 108+ fields; found 1)"}]}"""
val expected = (line: String) => """{"line":"%s","errors":["error: Line does not match Snowplow enriched event (expected 108+ fields; found 1)\n level: \"error\"\n"]}"""
.format(line)
}

/**
Expand Down Expand Up @@ -70,7 +71,7 @@ class NotEnrichedEventsSpec extends Specification {
sink[String](Tsv("badFolder")){ json =>
"write a bad row JSON with input line and error message for each input line" in {
for (i <- json.indices) {
json(i) must_== NotEnrichedEventsSpec.expected(NotEnrichedEventsSpec.lines(i)._2)
removeTstamp(json(i)) must_== NotEnrichedEventsSpec.expected(NotEnrichedEventsSpec.lines(i)._2)
}
}
}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class SchemaValidationFailed1Spec extends Specification {
sink[String](Tsv("badFolder")){ json =>
"write a bad row JSON with input line and error message for each input line" in {
for (i <- json.indices) {
json(i) must_== SchemaValidationFailed1Spec.expected(SchemaValidationFailed1Spec.lines(i)._2)
removeTstamp(json(i)) must_== SchemaValidationFailed1Spec.expected(SchemaValidationFailed1Spec.lines(i)._2)
}
}
}.
Expand Down

0 comments on commit 72fdb7c

Please sign in to comment.