Skip to content

Commit

Permalink
Scala Common Enrich: made etlTstamp parameter to processEvents a Joda…
Browse files Browse the repository at this point in the history
… DateTime (closes #1841)
  • Loading branch information
fblundun committed Jul 27, 2015
1 parent 1e75232 commit fa3df1e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package snowplow
package enrich
package common

// Joda
import org.joda.time.DateTime

// Iglu
import iglu.client.Resolver

Expand Down Expand Up @@ -60,7 +63,7 @@ object EtlPipeline {
* flatMap, will include any validation errors
* contained within the ValidatedMaybeCanonicalInput
*/
def processEvents(registry: EnrichmentRegistry, etlVersion: String, etlTstamp: String, input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {
def processEvents(registry: EnrichmentRegistry, etlVersion: String, etlTstamp: DateTime, input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {

def flattenToList[A](v: Validated[Option[Validated[NonEmptyList[Validated[A]]]]]): List[Validated[A]] = v match {
case Success(Some(Success(nel))) => nel.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package enrich
package common
package enrichments

// Joda
import org.joda.time.DateTime

// Scalaz
import scalaz._
import Scalaz._
Expand Down Expand Up @@ -59,7 +62,7 @@ object EnrichmentManager {
* NonHiveOutput.
*/
// TODO: etlTstamp shouldn't be stringly typed really
def enrichEvent(registry: EnrichmentRegistry, hostEtlVersion: String, etlTstamp: String, raw: RawEvent): ValidatedEnrichedEvent = {
def enrichEvent(registry: EnrichmentRegistry, hostEtlVersion: String, etlTstamp: DateTime, raw: RawEvent): ValidatedEnrichedEvent = {

// Placeholders for where the Success value doesn't matter.
// Useful when you're updating large (>22 field) POSOs.
Expand All @@ -74,7 +77,7 @@ object EnrichmentManager {
e.event_id = EE.generateEventId // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
e.v_etl = ME.etlVersion(hostEtlVersion)
e.etl_tstamp = etlTstamp
e.etl_tstamp = EE.toTimestamp(etlTstamp)
e.network_userid = raw.context.userId.orNull // May be updated later by 'nuid'
e.user_ipaddress = raw.context.ipAddress.orNull // May be updated later by 'ip'
}
Expand Down

0 comments on commit fa3df1e

Please sign in to comment.