Skip to content

Commit

Permalink
Scala Common Enrich: apply automated code formatting (closes #3532)
Browse files Browse the repository at this point in the history
  • Loading branch information
knservis committed Dec 20, 2017
1 parent de3a14a commit 765c263
Show file tree
Hide file tree
Showing 125 changed files with 4,151 additions and 3,407 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import Scalaz._
* Note that the SnowPlow ETL does **not**
* use exceptions for control flow - it uses
* Scalaz Validation and ValidationNel objects.
*
*
* However two types of exception we do support
* are:
*
*
* 1. FatalEtlException - should always cause
* the ETL to die
* 2. UnexpectedEtlException - ETL may die or
Expand All @@ -42,7 +42,7 @@ sealed class EtlException(msg: String) extends RuntimeException(msg)
* for the exceptions below.
*/
trait EtlExceptionConstructors[E <: EtlException] {

// Structured type lets us pass in
// a factory to construct our E
self: {
Expand All @@ -61,8 +61,8 @@ trait EtlExceptionConstructors[E <: EtlException] {
* error messages
* @return a new EtlException of
* type E
*/
def apply(errs: NonEmptyList[String]): E =
*/
def apply(errs: NonEmptyList[String]): E =
apply(errs.list)

/**
Expand All @@ -76,8 +76,8 @@ trait EtlExceptionConstructors[E <: EtlException] {
* error messages
* @return a new EtlException of
* type E
*/
def apply(errs: List[String]): E =
*/
def apply(errs: List[String]): E =
fac(formatErrors(errs))

/**
Expand Down Expand Up @@ -114,10 +114,10 @@ object FatalEtlException extends EtlExceptionConstructors[FatalEtlException] {
// TODO: delete when Cascading FailureTrap supports exclusions
object FatalEtlError {

def apply(errs: NonEmptyList[String]): FatalEtlError =
def apply(errs: NonEmptyList[String]): FatalEtlError =
apply(errs.list)

def apply(errs: List[String]): FatalEtlError =
def apply(errs: List[String]): FatalEtlError =
FatalEtlError(formatErrors(errs))

private def formatErrors(errs: List[String]): String =
Expand Down Expand Up @@ -177,4 +177,4 @@ case class FatalEtlError(msg: String) extends Error(msg)
* Unexpected Error? flag passed in
* to the ETL.
*/
case class UnexpectedEtlException(msg: String) extends EtlException(msg)
case class UnexpectedEtlException(msg: String) extends EtlException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package enrich
package common

// Java
import java.io.{
PrintWriter,
StringWriter
}
import java.io.{PrintWriter, StringWriter}

// Joda
import org.joda.time.DateTime
Expand All @@ -35,14 +32,8 @@ import scalaz._
import Scalaz._

// This project
import adapters.{
RawEvent,
AdapterRegistry
}
import enrichments.{
EnrichmentRegistry,
EnrichmentManager
}
import adapters.{AdapterRegistry, RawEvent}
import enrichments.{EnrichmentManager, EnrichmentRegistry}
import outputs.EnrichedEvent

/**
Expand Down Expand Up @@ -72,7 +63,8 @@ object EtlPipeline {
* flatMap, will include any validation errors
* contained within the ValidatedMaybeCanonicalInput
*/
def processEvents(registry: EnrichmentRegistry, etlVersion: String, etlTstamp: DateTime, input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {
def processEvents(registry: EnrichmentRegistry, etlVersion: String, etlTstamp: DateTime, input: ValidatedMaybeCollectorPayload)(
implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {

def flattenToList[A](v: Validated[Option[Validated[NonEmptyList[Validated[A]]]]]): List[Validated[A]] = v match {
case Success(Some(Success(nel))) => nel.toList
Expand All @@ -84,15 +76,18 @@ object EtlPipeline {
try {
val e: Validated[Option[Validated[NonEmptyList[ValidatedEnrichedEvent]]]] =
for {
maybePayload <- input
} yield for {
payload <- maybePayload
} yield for {
events <- AdapterRegistry.toRawEvents(payload)
} yield for {
event <- events
enriched = EnrichmentManager.enrichEvent(registry, etlVersion, etlTstamp, event)
} yield enriched
maybePayload <- input
} yield
for {
payload <- maybePayload
} yield
for {
events <- AdapterRegistry.toRawEvents(payload)
} yield
for {
event <- events
enriched = EnrichmentManager.enrichEvent(registry, etlVersion, etlTstamp, event)
} yield enriched

flattenToList[EnrichedEvent](e)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics
package com.snowplowanalytics
package snowplow
package enrich
package common
Expand All @@ -25,23 +25,19 @@ import Scalaz._

// This project
import loaders.CollectorPayload
import registry.snowplow.{Tp1Adapter => SpTp1Adapter}
import registry.snowplow.{Tp2Adapter => SpTp2Adapter}
import registry.snowplow.{Tp1Adapter => SpTp1Adapter}
import registry.snowplow.{Tp2Adapter => SpTp2Adapter}
import registry.snowplow.{RedirectAdapter => SpRedirectAdapter}
import registry.{
CallrailAdapter,
CloudfrontAccessLogAdapter,
IgluAdapter,
CallrailAdapter,
MailchimpAdapter,
MailgunAdapter,
MandrillAdapter,
OlarkAdapter,
PagerdutyAdapter,
PingdomAdapter,
UrbanAirshipAdapter,
SendgridAdapter,
StatusGatorAdapter,
UnbounceAdapter
UrbanAirshipAdapter
}

/**
Expand All @@ -56,16 +52,12 @@ object AdapterRegistry {
val Iglu = "com.snowplowanalytics.iglu"
val Callrail = "com.callrail"
val Mailchimp = "com.mailchimp"
val Mailgun = "com.mailgun"
val Mandrill = "com.mandrill"
val Olark = "com.olark"
val Pagerduty = "com.pagerduty"
val Pingdom = "com.pingdom"
val Cloudfront = "com.amazon.aws.cloudfront"
val UrbanAirship = "com.urbanairship.connect"
val Sendgrid = "com.sendgrid"
val StatusGator = "com.statusgator"
val Unbounce = "com.unbounce"
}

/**
Expand All @@ -81,24 +73,22 @@ object AdapterRegistry {
* NEL of RawEvents on Success,
* or a NEL of Strings on Failure
*/
def toRawEvents(payload: CollectorPayload)(implicit resolver: Resolver): ValidatedRawEvents = (payload.api.vendor, payload.api.version) match {
case (Vendor.Snowplow, "tp1") => SpTp1Adapter.toRawEvents(payload)
case (Vendor.Snowplow, "tp2") => SpTp2Adapter.toRawEvents(payload)
case (Vendor.Redirect, "tp2") => SpRedirectAdapter.toRawEvents(payload)
case (Vendor.Iglu, "v1") => IgluAdapter.toRawEvents(payload)
case (Vendor.Callrail, "v1") => CallrailAdapter.toRawEvents(payload)
case (Vendor.Mailchimp, "v1") => MailchimpAdapter.toRawEvents(payload)
case (Vendor.Olark, "v1") => OlarkAdapter.toRawEvents(payload)
case (Vendor.Mailgun, "v1") => MailgunAdapter.toRawEvents(payload)
case (Vendor.Mandrill, "v1") => MandrillAdapter.toRawEvents(payload)
case (Vendor.Pagerduty, "v1") => PagerdutyAdapter.toRawEvents(payload)
case (Vendor.Pingdom, "v1") => PingdomAdapter.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") => CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.UrbanAirship, "v1") => UrbanAirshipAdapter.toRawEvents(payload)
case (Vendor.Sendgrid, "v3") => SendgridAdapter.toRawEvents(payload)
case (Vendor.StatusGator, "v1") => StatusGatorAdapter.toRawEvents(payload)
case (Vendor.Unbounce, "v1") => UnbounceAdapter.toRawEvents(payload)
case _ => s"Payload with vendor ${payload.api.vendor} and version ${payload.api.version} not supported by this version of Scala Common Enrich".failNel
}
def toRawEvents(payload: CollectorPayload)(implicit resolver: Resolver): ValidatedRawEvents =
(payload.api.vendor, payload.api.version) match {
case (Vendor.Snowplow, "tp1") => SpTp1Adapter.toRawEvents(payload)
case (Vendor.Snowplow, "tp2") => SpTp2Adapter.toRawEvents(payload)
case (Vendor.Redirect, "tp2") => SpRedirectAdapter.toRawEvents(payload)
case (Vendor.Iglu, "v1") => IgluAdapter.toRawEvents(payload)
case (Vendor.Callrail, "v1") => CallrailAdapter.toRawEvents(payload)
case (Vendor.Mailchimp, "v1") => MailchimpAdapter.toRawEvents(payload)
case (Vendor.Mandrill, "v1") => MandrillAdapter.toRawEvents(payload)
case (Vendor.Pagerduty, "v1") => PagerdutyAdapter.toRawEvents(payload)
case (Vendor.Pingdom, "v1") => PingdomAdapter.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") => CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.UrbanAirship, "v1") => UrbanAirshipAdapter.toRawEvents(payload)
case (Vendor.Sendgrid, "v3") => SendgridAdapter.toRawEvents(payload)
case _ =>
s"Payload with vendor ${payload.api.vendor} and version ${payload.api.version} not supported by this version of Scala Common Enrich".failNel
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ package adapters
import org.joda.time.DateTime

// This project
import loaders.{
CollectorSource,
CollectorContext,
CollectorApi
}
import loaders.{CollectorApi, CollectorContext, CollectorSource}

/**
* The canonical input format for the ETL
Expand All @@ -32,9 +28,9 @@ import loaders.{
* stage of the Enrichment.
*/
final case class RawEvent(
api: CollectorApi,
parameters: RawEventParameters,
api: CollectorApi,
parameters: RawEventParameters,
contentType: Option[String], // Not yet used but should be logged
source: CollectorSource,
context: CollectorContext
)
source: CollectorSource,
context: CollectorContext
)

0 comments on commit 765c263

Please sign in to comment.