Skip to content

Commit

Permalink
Stream Enrich: replace sbt-scalafmt-coursier with sbt-scalafmt (closes
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and chuwy committed Nov 5, 2019
1 parent 5febe13 commit cb19ecc
Show file tree
Hide file tree
Showing 27 changed files with 444 additions and 289 deletions.
39 changes: 15 additions & 24 deletions 3-enrich/stream-enrich/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
style = defaultWithAlign
version = "2.0.0-RC6"
style = default
align = none
maxColumn = 100

docstrings = JavaDoc
optIn.breakChainOnFirstMethodDot = true
spaces.afterKeywordBeforeParen = true
continuationIndent.defnSite = 2
continuationIndent.callSite = 2
continuationIndent.defnSite = 2
verticalMultiline.atDefnSite = true
verticalMultiline.arityThreshold = 3
verticalMultiline.newlineAfterOpenParen = true
verticalMultiline.newlineBeforeImplicitKW = true
verticalMultiline.excludeDanglingParens = []
importSelectors = noBinPack

newlines {
sometimesBeforeColonInMethodReturnType = false
}

align {
arrowEnumeratorGenerator = false
ifWhileOpenParen = false
openParenCallSite = false
openParenDefnSite = false
}

rewrite {
rules = [
AsciiSortImports,
RedundantBraces,
RedundantParens,
PreferCurlyFors
]
redundantBraces.maxLines = 1
}
rewrite.rules = [
AsciiSortImports,
RedundantBraces,
RedundantParens,
PreferCurlyFors
]
6 changes: 6 additions & 0 deletions 3-enrich/stream-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ lazy val core = project
.settings(moduleName := "snowplow-stream-enrich")
.settings(buildSettings)
.settings(libraryDependencies ++= commonDependencies)
.settings(BuildSettings.formatting)
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](organization, name, version,
Expand All @@ -68,6 +69,7 @@ lazy val core = project
lazy val kinesis = project
.settings(moduleName := "snowplow-stream-enrich-kinesis")
.settings(allSettings)
.settings(BuildSettings.formatting)
.settings(libraryDependencies ++= Seq(
Dependencies.Libraries.kinesisClient,
Dependencies.Libraries.kinesisSdk,
Expand All @@ -80,6 +82,7 @@ lazy val kinesis = project
lazy val kafka = project
.settings(moduleName := "snowplow-stream-enrich-kafka")
.settings(allSettings)
.settings(BuildSettings.formatting)
.settings(libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients
))
Expand All @@ -88,17 +91,20 @@ lazy val kafka = project
lazy val nsq = project
.settings(moduleName := "snowplow-stream-enrich-nsq")
.settings(allSettings)
.settings(BuildSettings.formatting)
.settings(libraryDependencies ++= Seq(Dependencies.Libraries.nsqClient))
.dependsOn(core)

lazy val stdin = project
.settings(moduleName := "snowplow-stream-enrich-stdin")
.settings(allSettings)
.settings(BuildSettings.formatting)
.dependsOn(core)

lazy val integrationTests = project.in(file("./integration-tests"))
.settings(moduleName := "integration-tests")
.settings(allSettings)
.settings(BuildSettings.formatting)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(libraryDependencies ++= Seq(
// Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.Try
object model {

sealed trait Credentials
case object NoCredentials extends Credentials
case object NoCredentials extends Credentials
final case class AWSCredentials(accessKey: String, secretKey: String) extends Credentials

// Case classes necessary to the decoding of the configuration
Expand All @@ -39,7 +39,12 @@ object model {
appName: String
)
final case class InConfig(raw: String)
final case class OutConfig(enriched: String, pii: Option[String], bad: String, partitionKey: String)
final case class OutConfig(
enriched: String,
pii: Option[String],
bad: String,
partitionKey: String
)
final case class KinesisBackoffPolicyConfig(minBackoff: Long, maxBackoff: Long)
sealed trait SourceSinkConfig
final case class Kinesis(
Expand All @@ -62,14 +67,14 @@ object model {

val streamEndpoint = customEndpoint.getOrElse(region match {
case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn"
case _ => s"https://kinesis.$region.amazonaws.com"
case _ => s"https://kinesis.$region.amazonaws.com"
})
}
final case class Kafka(
brokers: String,
retries: Int,
consumerConf: Option[Map[String,String]],
producerConf: Option[Map[String,String]]
consumerConf: Option[Map[String, String]],
producerConf: Option[Map[String, String]]
) extends SourceSinkConfig
final case class Nsq(
rawChannel: String,
Expand All @@ -79,7 +84,11 @@ object model {
lookupPort: Int
) extends SourceSinkConfig
case object Stdin extends SourceSinkConfig
final case class BufferConfig(byteLimit: Long, recordLimit: Long, timeLimit: Long)
final case class BufferConfig(
byteLimit: Long,
recordLimit: Long,
timeLimit: Long
)
final case class MonitoringConfig(snowplow: SnowplowMonitoringConfig)
final case class SnowplowMonitoringConfig(
collectorUri: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait Enrich {

lazy val log = LoggerFactory.getLogger(getClass())

val FilepathRegex = "^file:(.+)$".r
val FilepathRegex = "^file:(.+)$".r
private val regexMsg = "'file:[filename]'"

implicit val creds: Credentials = NoCredentials
Expand All @@ -58,10 +58,10 @@ trait Enrich {
val trackerSource = for {
config <- parseConfig(args).validation
(enrichConfig, resolverArg, enrichmentsArg, forceDownload) = config
resolver <- parseResolver(resolverArg)
resolver <- parseResolver(resolverArg)
enrichmentRegistry <- parseEnrichmentRegistry(enrichmentsArg)(resolver, implicitly)
_ <- cacheFiles(enrichmentRegistry, forceDownload)
adapterRegistry = new AdapterRegistry(prepareRemoteAdapters(enrichConfig.remoteAdapters))
adapterRegistry = new AdapterRegistry(prepareRemoteAdapters(enrichConfig.remoteAdapters))
_ <- cacheFiles(enrichmentRegistry, forceDownload)
tracker = enrichConfig.monitoring.map(c => SnowplowTracking.initializeTracker(c.snowplow))
source <- getSource(enrichConfig.streams, resolver, adapterRegistry, enrichmentRegistry, tracker)
} yield (tracker, source)
Expand Down Expand Up @@ -99,20 +99,23 @@ trait Enrich {
* the optional enrichments argument and the force download flag
*/
def parseConfig(
args: Array[String]): \/[String, (EnrichConfig, String, Option[String], Boolean)] = {
implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))
args: Array[String]
): \/[String, (EnrichConfig, String, Option[String], Boolean)] = {
implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))
implicit val sourceSinkConfigHint = new FieldCoproductHint[SourceSinkConfig]("enabled")
for {
parsedCliArgs <- \/.fromEither(
parser.parse(args, FileConfig()).toRight("Error while parsing command line arguments"))
parser.parse(args, FileConfig()).toRight("Error while parsing command line arguments")
)
unparsedConfig = utils.fold(Try(ConfigFactory.parseFile(parsedCliArgs.config).resolve()))(
t => t.getMessage.left,
c =>
(c, parsedCliArgs.resolver, parsedCliArgs.enrichmentsDir, parsedCliArgs.forceDownload).right
)
validatedConfig <- utils.filterOrElse(unparsedConfig)(
t => t._1.hasPath("enrich"),
"No top-level \"enrich\" could be found in the configuration")
"No top-level \"enrich\" could be found in the configuration"
)
(config, resolverArg, enrichmentsArg, forceDownload) = validatedConfig
parsedConfig <- utils
.toEither(Try(loadConfigOrThrow[EnrichConfig](config.getConfig("enrich"))))
Expand Down Expand Up @@ -140,12 +143,15 @@ trait Enrich {
a * @param creds optionally necessary credentials to download the resolver
* @return a validated iglu resolver
*/
def parseResolver(resolverArg: String)(
implicit creds: Credentials): Validation[String, Resolver] =
def parseResolver(
resolverArg: String
)(
implicit creds: Credentials
): Validation[String, Resolver] =
for {
parsedResolver <- extractResolver(resolverArg)
json <- JsonUtils.extractJson("", parsedResolver)
resolver <- Resolver.parse(json).leftMap(_.toString)
json <- JsonUtils.extractJson("", parsedResolver)
resolver <- Resolver.parse(json).leftMap(_.toString)
} yield resolver

/**
Expand All @@ -162,7 +168,7 @@ a * @param creds optionally necessary credentials to download the resolver
if (file.exists) Source.fromFile(file).mkString.success
else "Iglu resolver configuration file \"%s\" does not exist".format(filepath).failure
case _ => s"Resolver argument [$resolverArgument] must match $regexMsg".failure
}
}

/**
* Retrieve and parse an enrichment registry from the corresponding cli argument value
Expand All @@ -171,13 +177,16 @@ a * @param creds optionally necessary credentials to download the resolver
* @param creds optionally necessary credentials to download the enrichments
* @return a validated enrichment registry
*/
def parseEnrichmentRegistry(enrichmentsDirArg: Option[String])(
def parseEnrichmentRegistry(
enrichmentsDirArg: Option[String]
)(
implicit resolver: Resolver,
creds: Credentials): Validation[String, EnrichmentRegistry] =
creds: Credentials
): Validation[String, EnrichmentRegistry] =
for {
enrichmentConfig <- extractEnrichmentConfigs(enrichmentsDirArg)
registryConfig <- JsonUtils.extractJson("", enrichmentConfig)
reg <- EnrichmentRegistry.parse(fromJsonNode(registryConfig), false).leftMap(_.toString)
registryConfig <- JsonUtils.extractJson("", enrichmentConfig)
reg <- EnrichmentRegistry.parse(fromJsonNode(registryConfig), false).leftMap(_.toString)
} yield reg

/**
Expand All @@ -186,8 +195,11 @@ a * @param creds optionally necessary credentials to download the resolver
* @param creds optionally necessary credentials to download the enrichments
* @return JSON containing configuration for all enrichments
*/
def extractEnrichmentConfigs(enrichmentArgument: Option[String])(
implicit creds: Credentials): Validation[String, String]
def extractEnrichmentConfigs(
enrichmentArgument: Option[String]
)(
implicit creds: Credentials
): Validation[String, String]
val localEnrichmentConfigsExtractor = (enrichmentArgument: Option[String]) => {
val jsons: Validation[String, List[String]] = enrichmentArgument
.map {
Expand Down Expand Up @@ -220,8 +232,8 @@ a * @param creds optionally necessary credentials to download the resolver
val httpDownloader = (uri: URI, targetFile: File) =>
uri.getScheme match {
case "http" | "https" => (uri.toURL #> targetFile).!.success
case s => s"Scheme $s for file $uri not supported".failure
}
case s => s"Scheme $s for file $uri not supported".failure
}

/**
* Download the IP lookup files locally.
Expand All @@ -233,18 +245,23 @@ a * @param creds optionally necessary credentials to download the resolver
def cacheFiles(
registry: EnrichmentRegistry,
forceDownload: Boolean
)(implicit creds: Credentials): ValidationNel[String, List[Int]] =
)(
implicit creds: Credentials
): ValidationNel[String, List[Int]] =
registry.filesToCache
.map { case (uri, path) =>
(new java.net.URI(uri.toString.replaceAll("(?<!(http:|https:|s3:))//", "/")),
new File(path))
.map {
case (uri, path) =>
(
new java.net.URI(uri.toString.replaceAll("(?<!(http:|https:|s3:))//", "/")),
new File(path)
)
}
.filter { case (_, targetFile) => forceDownload || targetFile.length == 0L }
.map {
case (cleanURI, targetFile) =>
download(cleanURI, targetFile).flatMap {
case i if i != 0 => s"Attempt to download $cleanURI to $targetFile failed".failure
case o => o.success
case o => o.success
}.toValidationNel
}
.sequenceU
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,19 @@ object SnowplowTracking {
streamName: String,
appName: String,
retryCount: Long,
putSize: Long): Unit =
putSize: Long
): Unit =
tracker.trackUnstructEvent(
SelfDescribingJson(
"iglu:com.snowplowanalytics.monitoring.kinesis/stream_write_failed/jsonschema/1-0-0",
("errorType" -> errorType) ~
("errorType" -> errorType) ~
("errorMessage" -> errorMessage) ~
("streamName" -> streamName) ~
("appName" -> appName) ~
("retryCount" -> retryCount) ~
("putSize" -> putSize)
))
("streamName" -> streamName) ~
("appName" -> appName) ~
("retryCount" -> retryCount) ~
("putSize" -> putSize)
)
)

/**
* Send an initialization event and schedule heartbeat and shutdown events
Expand Down Expand Up @@ -112,7 +114,8 @@ object SnowplowTracking {
SelfDescribingJson(
"iglu:com.snowplowanalytics.monitoring.kinesis/app_initialized/jsonschema/1-0-0",
JObject(Nil)
))
)
)

/**
* Send an application_shutdown unstructured event
Expand All @@ -124,7 +127,8 @@ object SnowplowTracking {
SelfDescribingJson(
"iglu:com.snowplowanalytics.monitoring.kinesis/app_shutdown/jsonschema/1-0-0",
JObject(Nil)
))
)
)

/**
* Send a warning unstructured event
Expand All @@ -137,7 +141,8 @@ object SnowplowTracking {
SelfDescribingJson(
"iglu:com.snowplowanalytics.monitoring.kinesis/app_warning/jsonschema/1-0-0",
("warning" -> message)
))
)
)

/**
* Send a heartbeat unstructured event
Expand All @@ -150,5 +155,6 @@ object SnowplowTracking {
SelfDescribingJson(
"iglu:com.snowplowanalytics.monitoring.kinesis/app_heartbeat/jsonschema/1-0-0",
"interval" -> heartbeatInterval
))
)
)
}
Loading

0 comments on commit cb19ecc

Please sign in to comment.