Skip to content

Commit

Permalink
Scala Common Enrich: stored etl_tstamp in new field in CanonicalOutput (
Browse files Browse the repository at this point in the history
#818)

Updated tests with etl_tstamp
  • Loading branch information
fblundun committed Jun 25, 2014
1 parent f0c6b50 commit 1d210e0
Show file tree
Hide file tree
Showing 20 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion 3-enrich/scala-common-enrich/project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object BuildSettings {
// Basic settings for our app
lazy val basicSettings = Seq[Setting[_]](
organization := "com.snowplowanalytics",
version := "0.4.0",
version := "0.5.0-SNAPSHOT",
description := "Common functionality for enriching raw Snowplow events",
scalaVersion := "2.10.1",
scalacOptions := Seq("-deprecation", "-encoding", "utf8",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object EnrichmentManager {
* either failure Strings or a
* NonHiveOutput.
*/
def enrichEvent(geo: IpGeo, hostEtlVersion: String, anonOctets: AnonOctets, raw: CanonicalInput): ValidatedCanonicalOutput = {
def enrichEvent(geo: IpGeo, hostEtlVersion: String, anonOctets: AnonOctets, etlTstamp: String, raw: CanonicalInput): ValidatedCanonicalOutput = {

// Placeholders for where the Success value doesn't matter.
// Useful when you're updating large (>22 field) POSOs.
Expand Down Expand Up @@ -290,6 +290,7 @@ object EnrichmentManager {
event.refr_urlfragment = CU.truncate(event.refr_urlfragment, 255)
event.refr_term = CU.truncate(event.refr_term, 255)
event.se_label = CU.truncate(event.se_label, 255)
event.etl_tstamp = etlTstamp

// Collect our errors on Failure, or return our event on Success
(useragent.toValidationNel |@| client.toValidationNel |@| pageUri.toValidationNel |@| geoLocation.toValidationNel |@| refererUri.toValidationNel |@| transform |@| campaign) {
Expand Down
2 changes: 1 addition & 1 deletion 3-enrich/scala-hadoop-enrich/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object Dependencies {
val scalding = "0.8.11"
val scalaz7 = "7.0.0"
val snowplowRawEvent = "0.1.0"
val commonEnrich = "0.4.0"
val commonEnrich = "0.5.0-SNAPSHOT"
// Scala (test only)
val specs2 = "1.14"
val scalazSpecs2 = "0.1.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ object EtlJob {
* flatMap, will include any validation errors
* contained within the ValidatedMaybeCanonicalInput
*/
def toCanonicalOutput(geo: IpGeo, anonOctets: AnonOctets, input: ValidatedMaybeCanonicalInput): ValidatedMaybeCanonicalOutput = {
def toCanonicalOutput(geo: IpGeo, anonOctets: AnonOctets, input: ValidatedMaybeCanonicalInput, etlTstamp: String): ValidatedMaybeCanonicalOutput = {
input.flatMap {
_.cata(EnrichmentManager.enrichEvent(geo, etlVersion, anonOctets, _).map(_.some),
_.cata(EnrichmentManager.enrichEvent(geo, etlVersion, anonOctets, etlTstamp, _).map(_.some),
none.success)
}
}
Expand Down Expand Up @@ -140,6 +140,8 @@ class EtlJob(args: Args) extends Job(args) {
val ipGeoFile = EtlJob.installIpGeoFile(etlConfig.maxmindFile)
lazy val ipGeo = EtlJob.createIpGeo(ipGeoFile)

val etlTstamp = etlConfig.etl_tstamp

// Aliases for our job
val input = MultipleTextLineFiles(etlConfig.inFolder).read
val goodOutput = Tsv(etlConfig.outFolder)
Expand All @@ -155,7 +157,7 @@ class EtlJob(args: Args) extends Job(args) {
// TODO: let's fix this Any typing
val common = trappableInput
.map('line -> 'output) { l: Any =>
EtlJob.toCanonicalOutput(ipGeo, etlConfig.anonOctets, loader.toCanonicalInput(l))
EtlJob.toCanonicalOutput(ipGeo, etlConfig.anonOctets, loader.toCanonicalInput(l), etlTstamp)
}

// Handle bad rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.twitter.scalding.Args
import common.utils.ConversionUtils
import common.enrichments.PrivacyEnrichments.AnonOctets
import AnonOctets._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EventEnrichments

// This project
import utils.ScalazArgs
Expand Down Expand Up @@ -111,7 +112,7 @@ object EtlJobConfig {
val outFolder = args.requiredz("output_folder")
val badFolder = args.requiredz("bad_rows_folder")
val anonOctets = args.requiredz("anon_ip_octets").flatMap(q => getAnonOctets(q))
val etlTstamp = args.requiredz("etl_tstamp").flatMap(t => extractTimestamp("etl_tstamp", t))
val etlTstamp = args.requiredz("etl_tstamp").flatMap(t => EventEnrichments.extractTimestamp("etl_tstamp", t))
val exceptionsFolder = args.optionalz("exceptions_folder")

(inFolder.toValidationNel |@| inFormat.toValidationNel |@| maxmindFile.toValidationNel |@| outFolder.toValidationNel |@| badFolder.toValidationNel |@| anonOctets.toValidationNel |@| etlTstamp.toValidationNel |@| exceptionsFolder.toValidationNel) { EtlJobConfig(_,_,_,_,_,_,_,_) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import org.specs2.matcher.Matchers._
// Scalding
import com.twitter.scalding._

// Joda-Time
import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.DateTimeFormat

// Snowplow Common Enrich
import common.outputs.CanonicalOutput

Expand All @@ -35,7 +39,9 @@ object JobSpecHelpers {
/**
* The current version of our Hadoop ETL
*/
val EtlVersion = "hadoop-0.5.0-common-0.4.0"
val EtlVersion = "hadoop-0.5.0-common-0.5.0-SNAPSHOT"

val EtlTimestamp = "2001-09-09 01:46:40.000"

/**
* Fields in our CanonicalOutput which are unmatchable
Expand Down Expand Up @@ -146,5 +152,6 @@ object JobSpecHelpers {
arg("output_folder", "outputFolder").
arg("bad_rows_folder", "badFolder").
arg("anon_ip_octets", anonOctets).
arg("exceptions_folder", "exceptionsFolder")
arg("exceptions_folder", "exceptionsFolder").
arg("etl_tstamp", "1000000000000")
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Apr2014CfLineSpec {
val expected = List(
"snowplowweb",
"web",
EtlTimestamp,
"2014-04-29 09:00:54.000",
"2014-04-29 09:00:54.889",
"page_ping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Aug2013CfLineSpec {
val expected = List(
"snowplowweb",
"web",
EtlTimestamp,
"2013-08-29 00:18:48.000",
"2013-08-29 00:19:17.970",
"page_view",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object CljTomcatLineSpec {
val expected = List(
"snowplowweb",
"web",
EtlTimestamp,
"2013-10-07 19:47:54.000",
"2013-10-07 19:47:54.123",
"page_view",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object FutureCfLineSpec {
val expected = List(
"snowplowweb",
"web",
EtlTimestamp,
"2015-04-29 09:00:54.000",
"2015-04-29 08:00:54.889",
"page_ping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object LateOct2013CfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2013-10-22 00:41:30.000",
"2013-10-22 00:41:53.725",
"page_view",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Oct2013CfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2013-10-07 23:35:30.000",
"2013-10-07 23:35:27.571",
"page_ping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object PagePingCfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2013-03-25 02:04:00.000",
"2013-03-25 02:03:37.342",
"page_ping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object PageViewCfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2012-05-24 00:06:42.000",
"2013-03-25 13:52:09.188",
"page_view",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Sep2013CfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2013-10-07 21:32:22.000",
"2013-10-07 21:30:37.923",
"page_view",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object StructEventCfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2012-05-27 11:35:53.000",
"2013-03-25 17:02:49.450",
"struct",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object TransactionCfLineSpec {
val expected = List(
"CFe23a",
null, // Not set (legacy input line)
EtlTimestamp,
"2012-05-27 11:35:53.000",
"2013-03-25 02:03:37.342",
"transaction",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object TransactionItemCfLineSpec {
val expected = List(
"CFe23a",
null, // Not set (legacy input line)
EtlTimestamp,
"2012-05-27 11:35:53.000",
"2013-03-25 02:03:37.342",
"transaction_item",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object UnstructEventCfLineSpec {
val expected = List(
"pbzsite",
"web",
EtlTimestamp,
"2012-05-27 11:35:53.000",
"2013-03-25 17:02:49.450",
"unstruct",
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Version 0.9.X (2014-XX-XX)
--------------------------
Scala Common Enrich: stored etl_tstamp in new field in CanonicalOutput (#818)
Redshift: bumped table-def to 0.4.0
Redshift: added etl_tstamp to atomic.events (#819)
Postgres: bumped table-def to 0.3.0
Expand Down

0 comments on commit 1d210e0

Please sign in to comment.