Skip to content

Commit

Permalink
Common: apply automated code formatting for all projects (close #263)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jun 25, 2020
1 parent a9452f1 commit 70e5af3
Show file tree
Hide file tree
Showing 21 changed files with 43 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,7 @@ object Enrich {
* @param enriched collection of events that went through the enrichment phase
* @return a collection of properly-sized enriched events and another of oversized ones
*/
private def formatEnrichedEvents(
enriched: SCollection[EnrichedEvent]
): (SCollection[(String, Int)], SCollection[(String, Int)]) =
private def formatEnrichedEvents(enriched: SCollection[EnrichedEvent]): (SCollection[(String, Int)], SCollection[(String, Int)]) =
enriched
.withName("format-enriched")
.map { enrichedEvent =>
Expand Down Expand Up @@ -304,10 +302,7 @@ object Enrich {
* @param enrichmentConfs list of enrichment configurations
* @return a properly build [[DistCache]]
*/
private def buildDistCache(
sc: ScioContext,
enrichmentConfs: List[EnrichmentConf]
): DistCache[List[Either[String, String]]] = {
private def buildDistCache(sc: ScioContext, enrichmentConfs: List[EnrichmentConf]): DistCache[List[Either[String, String]]] = {
val filesToCache: List[(String, String)] = enrichmentConfs
.map(_.filesToCache)
.flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ object config {
* @param enrichmentsPath path where the enrichment directory is located
* @return the enrichment registry built from the enrichments found
*/
def parseEnrichmentRegistry(
enrichmentsPath: Option[String],
client: Client[Id, Json]
): Either[String, Json] =
def parseEnrichmentRegistry(enrichmentsPath: Option[String], client: Client[Id, Json]): Either[String, Json] =
for {
fileContents <- readEnrichmentFiles(enrichmentsPath)
jsons <- fileContents.map(JsonUtils.extractJson(_)).sequence[EitherS, Json]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ object utils {
ee.event_format = "jsonschema"
ee.event_name = "pii_transformation"
ee.event_version = "1-0-0"
ee.v_etl =
s"beam-enrich-${generated.BuildInfo.version}-common-${generated.BuildInfo.sceVersion}"
ee.v_etl = s"beam-enrich-${generated.BuildInfo.version}-common-${generated.BuildInfo.sceVersion}"
ee.contexts = getContextParentEvent(ee.event_id).noSpaces
ee
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ object ApiRequestEnrichmentSpec {
SpecHelpers.buildCollectorPayload(
querystring = s"e=ue&cx=$contexts&ue_pr=$unstructEvent".some,
path = "/i",
userAgent =
"Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:12.0)%20Gecko/20100101%20Firefox/12.0".some,
userAgent = "Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:12.0)%20Gecko/20100101%20Firefox/12.0".some,
refererUri = "http://www.pb.com/oracles/119.html?view=print#detail".some
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import com.spotify.scio.testing._
object CampaignAttributionEnrichmentSpec {
val raw = Seq(
SpecHelpers.buildCollectorPayload(
refererUri =
"http://pb.com/?utm_source=GoogleSearch&utm_medium=cpc&utm_term=pb&utm_content=39&cid=tna&gclid=CI6".some,
refererUri = "http://pb.com/?utm_source=GoogleSearch&utm_medium=cpc&utm_term=pb&utm_content=39&cid=tna&gclid=CI6".some,
path = "/i",
querystring = "e=pp".some
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ object IabEnrichmentSpec {
SpecHelpers.buildCollectorPayload(
path = "/i",
querystring = "e=pp".some,
userAgent =
"Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:12.0)%20Gecko/20100101%20Firefox/12.0".some,
userAgent = "Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:12.0)%20Gecko/20100101%20Firefox/12.0".some,
ipAddress = "216.160.83.56",
networkUserId = "11111111-1111-1111-1111-111111111111"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ import pureconfig.generic.{FieldCoproductHint, ProductHint}
import good._
import model.{StreamsConfig, TargetPlatformConfig}

class PiiEmitSpec(implicit ee: ExecutionEnv)
extends Specification
with FutureMatchers
with KafkaIntegrationSpec
with BeforeAfterAll {
class PiiEmitSpec(implicit ee: ExecutionEnv) extends Specification with FutureMatchers with KafkaIntegrationSpec with BeforeAfterAll {

var ktu: KafkaTestUtils = _
override def beforeAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,9 @@ object KafkaEnrich extends Enrich {

override val parser: scopt.OptionParser[FileConfig] = localParser

override def extractResolver(
resolverArgument: String
)(
implicit creds: Credentials
): Either[String, String] =
override def extractResolver(resolverArgument: String)(implicit creds: Credentials): Either[String, String] =
localResolverExtractor(resolverArgument)

override def extractEnrichmentConfigs(
enrichmentArg: Option[String]
)(
implicit creds: Credentials
): Either[String, Json] =
override def extractEnrichmentConfigs(enrichmentArg: Option[String])(implicit creds: Credentials): Either[String, Json] =
localEnrichmentConfigsExtractor(enrichmentArg)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,15 @@ import model.{BufferConfig, Kafka}

/** KafkaSink companion object with factory method */
object KafkaSink {
def validateAndCreateProducer(
kafkaConfig: Kafka,
bufferConfig: BufferConfig
): Either[String, KafkaProducer[String, String]] =
def validateAndCreateProducer(kafkaConfig: Kafka, bufferConfig: BufferConfig): Either[String, KafkaProducer[String, String]] =
createProducer(kafkaConfig, bufferConfig).asRight

/**
* Instantiates a producer on an existing topic with the given configuration options.
* This can fail if the producer can't be created.
* @return a Kafka producer
*/
private def createProducer(
kafkaConfig: Kafka,
bufferConfig: BufferConfig
): KafkaProducer[String, String] = {
private def createProducer(kafkaConfig: Kafka, bufferConfig: BufferConfig): KafkaProducer[String, String] = {
val properties = createProperties(kafkaConfig, bufferConfig)
properties.putAll(kafkaConfig.producerConf.getOrElse(Map()).asJava)
new KafkaProducer[String, String](properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ class KafkaSource private (
new KafkaSink(goodProducer, config.out.enriched)
}

override val threadLocalPiiSink: Option[ThreadLocal[Sink]] = piiProducer.flatMap {
somePiiProducer =>
config.out.pii.map { piiTopicName =>
new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(somePiiProducer, piiTopicName)
}
override val threadLocalPiiSink: Option[ThreadLocal[Sink]] = piiProducer.flatMap { somePiiProducer =>
config.out.pii.map { piiTopicName =>
new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(somePiiProducer, piiTopicName)
}
}
}

override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
Expand All @@ -139,10 +138,7 @@ class KafkaSource private (
}
}

private def createConsumer(
brokers: String,
groupId: String
): KafkaConsumer[String, Array[Byte]] = {
private def createConsumer(brokers: String, groupId: String): KafkaConsumer[String, Array[Byte]] = {
val properties = createProperties(brokers, groupId)
properties.putAll(kafkaConfig.consumerConf.getOrElse(Map()).asJava)
new KafkaConsumer[String, Array[Byte]](properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
import io.circe.Json
import io.circe.syntax._
import config._
import model.{
Credentials,
DualCloudCredentialsPair,
Kinesis,
NoCredentials,
SentryConfig,
StreamsConfig
}
import model.{Credentials, DualCloudCredentialsPair, Kinesis, NoCredentials, SentryConfig, StreamsConfig}
import sources.KinesisSource
import utils.getAWSCredentialsProvider

Expand Down Expand Up @@ -151,11 +144,7 @@ object KinesisEnrich extends Enrich {
forceCachedFilesDownloadOption()
}

override def extractResolver(
resolverArgument: String
)(
implicit creds: Credentials
): Either[String, String] =
override def extractResolver(resolverArgument: String)(implicit creds: Credentials): Either[String, String] =
resolverArgument match {
case FilepathRegex(filepath) =>
val file = new File(filepath)
Expand Down Expand Up @@ -201,11 +190,7 @@ object KinesisEnrich extends Enrich {
} yield json
}

override def extractEnrichmentConfigs(
enrichmentArg: Option[String]
)(
implicit creds: Credentials
): Either[String, Json] = {
override def extractEnrichmentConfigs(enrichmentArg: Option[String])(implicit creds: Credentials): Either[String, Json] = {
val jsons: Either[String, List[String]] = enrichmentArg
.map {
case FilepathRegex(dir) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ class KinesisSink(
client.putRecords(putRecordsRequest)
}

private[sinks] def getErrorsSummary(
badResponses: List[PutRecordsResultEntry]
): Map[String, (Long, String)] =
private[sinks] def getErrorsSummary(badResponses: List[PutRecordsResultEntry]): Map[String, (Long, String)] =
badResponses.foldLeft(Map[String, (Long, String)]())(
(counts, r) =>
if (counts.contains(r.getErrorCode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,7 @@ class KinesisSource private (
this.kinesisShardId = shardId
}

override def processRecords(
records: List[Record],
checkpointer: IRecordProcessorCheckpointer
) = {
override def processRecords(records: List[Record], checkpointer: IRecordProcessorCheckpointer) = {

if (!records.isEmpty) {
log.info(s"Processing ${records.size} records from $kinesisShardId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,9 @@ object NsqEnrich extends Enrich {

override val parser: scopt.OptionParser[FileConfig] = localParser

override def extractResolver(
resolverArgument: String
)(
implicit creds: Credentials
): Either[String, String] =
override def extractResolver(resolverArgument: String)(implicit creds: Credentials): Either[String, String] =
localResolverExtractor(resolverArgument)

override def extractEnrichmentConfigs(
enrichmentArg: Option[String]
)(
implicit creds: Credentials
): Either[String, Json] =
override def extractEnrichmentConfigs(enrichmentArg: Option[String])(implicit creds: Credentials): Either[String, Json] =
localEnrichmentConfigsExtractor(enrichmentArg)
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,12 @@ class NsqSource private (
override def initialValue: Sink = new NsqSink(goodProducer, config.out.enriched)
}

override val threadLocalPiiSink: Option[ThreadLocal[Sink]] = piiProducer.flatMap {
somePiiProducer =>
config.out.pii.map { piiTopicName =>
new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(somePiiProducer, piiTopicName)
}
override val threadLocalPiiSink: Option[ThreadLocal[Sink]] = piiProducer.flatMap { somePiiProducer =>
config.out.pii.map { piiTopicName =>
new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(somePiiProducer, piiTopicName)
}
}
}

override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,9 @@ object StdinEnrich extends Enrich {

override val parser: scopt.OptionParser[FileConfig] = localParser

override def extractResolver(
resolverArgument: String
)(
implicit creds: Credentials
): Either[String, String] =
override def extractResolver(resolverArgument: String)(implicit creds: Credentials): Either[String, String] =
localResolverExtractor(resolverArgument)

override def extractEnrichmentConfigs(
enrichmentArg: Option[String]
)(
implicit creds: Credentials
): Either[String, Json] =
override def extractEnrichmentConfigs(enrichmentArg: Option[String])(implicit creds: Credentials): Either[String, Json] =
localEnrichmentConfigsExtractor(enrichmentArg)
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ trait Enrich {
* @return a validated tuple containing the parsed enrich configuration, the resolver argument,
* the optional enrichments argument and the force download flag
*/
def parseConfig(
args: Array[String]
): Either[String, (EnrichConfig, String, Option[String], Boolean)] =
def parseConfig(args: Array[String]): Either[String, (EnrichConfig, String, Option[String], Boolean)] =
for {
parsedCliArgs <- parser
.parse(args, FileConfig())
Expand All @@ -140,8 +138,7 @@ trait Enrich {
.catchNonFatal(ConfigFactory.parseFile(parsedCliArgs.config).resolve())
.fold(
t => t.getMessage.asLeft,
c =>
(c, parsedCliArgs.resolver, parsedCliArgs.enrichmentsDir, parsedCliArgs.forceDownload).asRight
c => (c, parsedCliArgs.resolver, parsedCliArgs.enrichmentsDir, parsedCliArgs.forceDownload).asRight
)
validatedConfig <- unparsedConfig.filterOrElse(
t => t._1.hasPath("enrich"),
Expand Down Expand Up @@ -173,11 +170,7 @@ trait Enrich {
a * @param creds optionally necessary credentials to download the resolver
* @return a validated iglu resolver
*/
def parseClient(
resolverArg: String
)(
implicit creds: Credentials
): Either[String, Client[Id, Json]] =
def parseClient(resolverArg: String)(implicit creds: Credentials): Either[String, Client[Id, Json]] =
for {
parsedResolver <- extractResolver(resolverArg)
json <- JsonUtils.extractJson(parsedResolver)
Expand Down Expand Up @@ -224,11 +217,7 @@ a * @param creds optionally necessary credentials to download the resolver
* @param creds optionally necessary credentials to download the enrichments
* @return JSON containing configuration for all enrichments
*/
def extractEnrichmentConfigs(
enrichmentArgument: Option[String]
)(
implicit creds: Credentials
): Either[String, Json]
def extractEnrichmentConfigs(enrichmentArgument: Option[String])(implicit creds: Credentials): Either[String, Json]
val localEnrichmentConfigsExtractor = (enrichmentArgument: Option[String]) => {
val jsons: Either[String, List[String]] = enrichmentArgument
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ abstract class Source(
ee.event_name = Source.PiiEventName
ee.event_version = Source.PiiEventVersion
ee.contexts = getContextParentEvent(event.event_id).noSpaces
ee.v_etl =
s"stream-enrich-${generated.BuildInfo.version}-common-${generated.BuildInfo.commonEnrichVersion}"
ee.v_etl = s"stream-enrich-${generated.BuildInfo.version}-common-${generated.BuildInfo.commonEnrichVersion}"
ee
}

Expand Down Expand Up @@ -201,9 +200,7 @@ abstract class Source(
* @return List containing failed, successful and, if present, pii events. Successful and failed, each specify a
* partition key.
*/
def enrichEvents(
binaryData: Array[Byte]
): List[Validated[(BadRow, String), (String, String, Option[String])]] = {
def enrichEvents(binaryData: Array[Byte]): List[Validated[(BadRow, String), (String, String, Option[String])]] = {
val canonicalInput: ValidatedNel[BadRow, Option[CollectorPayload]] =
ThriftLoader.toCollectorPayload(binaryData, processor)
Either.catchNonFatal(
Expand Down Expand Up @@ -297,6 +294,5 @@ abstract class Source(
}
}

private val isTooLarge: String => Boolean = evt =>
MaxRecordSize.map(Source.getSize(evt) >= _).getOrElse(false)
private val isTooLarge: String => Boolean = evt => MaxRecordSize.map(Source.getSize(evt) >= _).getOrElse(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdap
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils
import com.snowplowanalytics.snowplow.enrich.stream.model.{
AWSCredentials,
CloudAgnosticPlatformConfig,
GCPCredentials,
Kafka,
Nsq,
Stdin
}
import com.snowplowanalytics.snowplow.enrich.stream.model.{AWSCredentials, CloudAgnosticPlatformConfig, GCPCredentials, Kafka, Nsq, Stdin}
import org.specs2.matcher.{Expectable, Matcher}
import sources.TestSource
import utils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@
package com.snowplowanalytics.snowplow.enrich.stream
package sources

import com.snowplowanalytics.snowplow.enrich.stream.model.{
AWSCredentials,
DualCloudCredentialsPair,
GCPCredentials,
NoCredentials
}
import com.snowplowanalytics.snowplow.enrich.stream.model.{AWSCredentials, DualCloudCredentialsPair, GCPCredentials, NoCredentials}
import org.specs2.mutable.Specification

class UtilsSpec extends Specification {
Expand Down

0 comments on commit 70e5af3

Please sign in to comment.