Skip to content

Commit

Permalink
Scala Common Enrich: apply automated code formatting (closes #3532)
Browse files Browse the repository at this point in the history
  • Loading branch information
knservis committed Feb 14, 2018
1 parent b2ba704 commit b720409
Show file tree
Hide file tree
Showing 134 changed files with 5,044 additions and 3,657 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import Scalaz._
* Note that the SnowPlow ETL does **not**
* use exceptions for control flow - it uses
* Scalaz Validation and ValidationNel objects.
*
*
* However two types of exception we do support
* are:
*
*
* 1. FatalEtlException - should always cause
* the ETL to die
* 2. UnexpectedEtlException - ETL may die or
Expand All @@ -42,7 +42,7 @@ sealed class EtlException(msg: String) extends RuntimeException(msg)
* for the exceptions below.
*/
trait EtlExceptionConstructors[E <: EtlException] {

// Structured type lets us pass in
// a factory to construct our E
self: {
Expand All @@ -61,8 +61,8 @@ trait EtlExceptionConstructors[E <: EtlException] {
* error messages
* @return a new EtlException of
* type E
*/
def apply(errs: NonEmptyList[String]): E =
*/
def apply(errs: NonEmptyList[String]): E =
apply(errs.list)

/**
Expand All @@ -76,8 +76,8 @@ trait EtlExceptionConstructors[E <: EtlException] {
* error messages
* @return a new EtlException of
* type E
*/
def apply(errs: List[String]): E =
*/
def apply(errs: List[String]): E =
fac(formatErrors(errs))

/**
Expand Down Expand Up @@ -114,10 +114,10 @@ object FatalEtlException extends EtlExceptionConstructors[FatalEtlException] {
// TODO: delete when Cascading FailureTrap supports exclusions
object FatalEtlError {

def apply(errs: NonEmptyList[String]): FatalEtlError =
def apply(errs: NonEmptyList[String]): FatalEtlError =
apply(errs.list)

def apply(errs: List[String]): FatalEtlError =
def apply(errs: List[String]): FatalEtlError =
FatalEtlError(formatErrors(errs))

private def formatErrors(errs: List[String]): String =
Expand Down Expand Up @@ -177,4 +177,4 @@ case class FatalEtlError(msg: String) extends Error(msg)
* Unexpected Error? flag passed in
* to the ETL.
*/
case class UnexpectedEtlException(msg: String) extends EtlException(msg)
case class UnexpectedEtlException(msg: String) extends EtlException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package enrich
package common

// Java
import java.io.{
PrintWriter,
StringWriter
}
import java.io.{PrintWriter, StringWriter}

// Joda
import org.joda.time.DateTime
Expand All @@ -35,11 +32,8 @@ import scalaz._
import Scalaz._

// This project
import adapters.AdapterRegistry
import enrichments.{
EnrichmentRegistry,
EnrichmentManager
}
import adapters.{AdapterRegistry, RawEvent}
import enrichments.{EnrichmentManager, EnrichmentRegistry}
import outputs.EnrichedEvent

/**
Expand Down Expand Up @@ -69,7 +63,11 @@ object EtlPipeline {
* flatMap, will include any validation errors
* contained within the ValidatedMaybeCanonicalInput
*/
def processEvents(registry: EnrichmentRegistry, etlVersion: String, etlTstamp: DateTime, input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {
def processEvents(
registry: EnrichmentRegistry,
etlVersion: String,
etlTstamp: DateTime,
input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {

def flattenToList[A](v: Validated[Option[Validated[NonEmptyList[Validated[A]]]]]): List[Validated[A]] = v match {
case Success(Some(Success(nel))) => nel.toList
Expand All @@ -81,15 +79,18 @@ object EtlPipeline {
try {
val e: Validated[Option[Validated[NonEmptyList[ValidatedEnrichedEvent]]]] =
for {
maybePayload <- input
} yield for {
payload <- maybePayload
} yield for {
events <- AdapterRegistry.toRawEvents(payload)
} yield for {
event <- events
enriched = EnrichmentManager.enrichEvent(registry, etlVersion, etlTstamp, event)
} yield enriched
maybePayload <- input
} yield
for {
payload <- maybePayload
} yield
for {
events <- AdapterRegistry.toRawEvents(payload)
} yield
for {
event <- events
enriched = EnrichmentManager.enrichEvent(registry, etlVersion, etlTstamp, event)
} yield enriched

flattenToList[EnrichedEvent](e)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import Scalaz._

// This project
import loaders.CollectorPayload
import registry.snowplow.{Tp1Adapter => SpTp1Adapter}
import registry.snowplow.{Tp2Adapter => SpTp2Adapter}
import registry.snowplow.{Tp1Adapter => SpTp1Adapter}
import registry.snowplow.{Tp2Adapter => SpTp2Adapter}
import registry.snowplow.{RedirectAdapter => SpRedirectAdapter}
import registry._

Expand Down Expand Up @@ -68,25 +68,27 @@ object AdapterRegistry {
* NEL of RawEvents on Success,
* or a NEL of Strings on Failure
*/
def toRawEvents(payload: CollectorPayload)(implicit resolver: Resolver): ValidatedRawEvents = (payload.api.vendor, payload.api.version) match {
case (Vendor.Snowplow, "tp1") => SpTp1Adapter.toRawEvents(payload)
case (Vendor.Snowplow, "tp2") => SpTp2Adapter.toRawEvents(payload)
case (Vendor.Redirect, "tp2") => SpRedirectAdapter.toRawEvents(payload)
case (Vendor.Iglu, "v1") => IgluAdapter.toRawEvents(payload)
case (Vendor.Callrail, "v1") => CallrailAdapter.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") => CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.Mailchimp, "v1") => MailchimpAdapter.toRawEvents(payload)
case (Vendor.Mailgun, "v1") => MailgunAdapter.toRawEvents(payload)
case (Vendor.GoogleAnalytics, "v1") => GoogleAnalyticsAdapter.toRawEvents(payload)
case (Vendor.Mandrill, "v1") => MandrillAdapter.toRawEvents(payload)
case (Vendor.Olark, "v1") => OlarkAdapter.toRawEvents(payload)
case (Vendor.Pagerduty, "v1") => PagerdutyAdapter.toRawEvents(payload)
case (Vendor.Pingdom, "v1") => PingdomAdapter.toRawEvents(payload)
case (Vendor.Sendgrid, "v3") => SendgridAdapter.toRawEvents(payload)
case (Vendor.StatusGator, "v1") => StatusGatorAdapter.toRawEvents(payload)
case (Vendor.Unbounce, "v1") => UnbounceAdapter.toRawEvents(payload)
case (Vendor.UrbanAirship, "v1") => UrbanAirshipAdapter.toRawEvents(payload)
case _ => s"Payload with vendor ${payload.api.vendor} and version ${payload.api.version} not supported by this version of Scala Common Enrich".failNel
}
def toRawEvents(payload: CollectorPayload)(implicit resolver: Resolver): ValidatedRawEvents =
(payload.api.vendor, payload.api.version) match {
case (Vendor.Snowplow, "tp1") => SpTp1Adapter.toRawEvents(payload)
case (Vendor.Snowplow, "tp2") => SpTp2Adapter.toRawEvents(payload)
case (Vendor.Redirect, "tp2") => SpRedirectAdapter.toRawEvents(payload)
case (Vendor.Iglu, "v1") => IgluAdapter.toRawEvents(payload)
case (Vendor.Callrail, "v1") => CallrailAdapter.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") => CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.Mailchimp, "v1") => MailchimpAdapter.toRawEvents(payload)
case (Vendor.Mailgun, "v1") => MailgunAdapter.toRawEvents(payload)
case (Vendor.GoogleAnalytics, "v1") => GoogleAnalyticsAdapter.toRawEvents(payload)
case (Vendor.Mandrill, "v1") => MandrillAdapter.toRawEvents(payload)
case (Vendor.Olark, "v1") => OlarkAdapter.toRawEvents(payload)
case (Vendor.Pagerduty, "v1") => PagerdutyAdapter.toRawEvents(payload)
case (Vendor.Pingdom, "v1") => PingdomAdapter.toRawEvents(payload)
case (Vendor.Sendgrid, "v3") => SendgridAdapter.toRawEvents(payload)
case (Vendor.StatusGator, "v1") => StatusGatorAdapter.toRawEvents(payload)
case (Vendor.Unbounce, "v1") => UnbounceAdapter.toRawEvents(payload)
case (Vendor.UrbanAirship, "v1") => UrbanAirshipAdapter.toRawEvents(payload)
case _ =>
s"Payload with vendor ${payload.api.vendor} and version ${payload.api.version} not supported by this version of Scala Common Enrich".failNel
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ package com.snowplowanalytics.snowplow.enrich.common
package adapters

// This project
import loaders.{
CollectorSource,
CollectorContext,
CollectorApi
}
import loaders.{CollectorApi, CollectorContext, CollectorSource}

/**
* The canonical input format for the ETL
Expand All @@ -29,9 +25,9 @@ import loaders.{
* stage of the Enrichment.
*/
final case class RawEvent(
api: CollectorApi,
parameters: RawEventParameters,
api: CollectorApi,
parameters: RawEventParameters,
contentType: Option[String], // Not yet used but should be logged
source: CollectorSource,
context: CollectorContext
source: CollectorSource,
context: CollectorContext
)

0 comments on commit b720409

Please sign in to comment.