Skip to content

Commit

Permalink
Scala Kinesis Enrich: etl_tstamp should be Redshift Formatted not raw (
Browse files Browse the repository at this point in the history
…closes #1842)
  • Loading branch information
jbeemster committed Jul 6, 2015
1 parent 0e20942 commit 48d6378
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Expand Up @@ -44,6 +44,9 @@ import org.json4s.{ThreadLocal => _, _}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// Joda-Time
import org.joda.time.DateTime

// Iglu
import iglu.client.Resolver
import iglu.client.validation.ProcessingMessageMethods._
Expand All @@ -57,6 +60,7 @@ import common.outputs.{
import common.loaders.ThriftLoader
import common.enrichments.EnrichmentRegistry
import common.enrichments.EnrichmentManager
import common.enrichments.EventEnrichments
import common.adapters.AdapterRegistry

import common.ValidatedMaybeCollectorPayload
Expand Down Expand Up @@ -179,7 +183,8 @@ abstract class AbstractSource(config: KinesisEnrichConfig, igluResolver: Resolve
def enrichEvents(binaryData: Array[Byte]): List[Validation[(String, String), (String, String)]] = {
val canonicalInput: ValidatedMaybeCollectorPayload = ThriftLoader.toCollectorPayload(binaryData)
val processedEvents: List[ValidationNel[String, EnrichedEvent]] = EtlPipeline.processEvents(
enrichmentRegistry, s"kinesis-${generated.Settings.version}", System.currentTimeMillis.toString, canonicalInput)
enrichmentRegistry, s"kinesis-${generated.Settings.version}",
EventEnrichments.toTimestamp(new DateTime(System.currentTimeMillis)), canonicalInput)
processedEvents.map(validatedMaybeEvent => {
validatedMaybeEvent match {
case Success(co) => (tabSeparateEnrichedEvent(co) -> co.user_ipaddress).success
Expand Down
Expand Up @@ -52,7 +52,7 @@ object SpecHelpers {
*/
val EnrichVersion = "kinesis-0.6.0-common-0.15.0"

val TimestampRegex = "[0-9]+"
val TimestampRegex = "[0-9\\s-:.]+"

/**
* The regexp pattern for a Type 4 UUID.
Expand Down

0 comments on commit 48d6378

Please sign in to comment.