Skip to content

Commit

Permalink
Merge 5d1d2a2 into 70e5af3
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jun 25, 2020
2 parents 70e5af3 + 5d1d2a2 commit b751fc8
Show file tree
Hide file tree
Showing 123 changed files with 2,316 additions and 2,415 deletions.
6 changes: 4 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
version = "2.0.0-RC6"
version = "2.6.1"
style = default
align = none
align.preset = none
align.openParenCallSite = true
align.arrowEnumeratorGenerator = true
maxColumn = 140
docstrings = JavaDoc
optIn.breakChainOnFirstMethodDot = true
Expand Down
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ lazy val common = project
Dependencies.Libraries.jaywayJsonpath,
Dependencies.Libraries.iabClient,
Dependencies.Libraries.yauaa,
Dependencies.Libraries.rhino,
Dependencies.Libraries.guava,
Dependencies.Libraries.circeOptics,
Dependencies.Libraries.circeJackson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ object Enrich {
registryJson <- parseEnrichmentRegistry(config.enrichments, client)
confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither
labels <- config.labels.map(parseLabels).getOrElse(Right(Map.empty[String, String]))
_ <- if (emitPii(confs) && config.pii.isEmpty) {
"A pii topic needs to be used in order to use the pii enrichment".asLeft
} else {
().asRight
}
_ <- if (emitPii(confs) && config.pii.isEmpty)
"A pii topic needs to be used in order to use the pii enrichment".asLeft
else
().asRight
} yield ParsedEnrichConfig(
config.raw,
config.enriched,
Expand All @@ -99,9 +98,8 @@ object Enrich {
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
if (config.labels.nonEmpty) {
if (config.labels.nonEmpty)
sc.optionsAs[DataflowPipelineOptions].setLabels(config.labels.asJava)
}

val cachedFiles: DistCache[List[Either[String, String]]] =
buildDistCache(sc, config.enrichmentConfs)
Expand Down Expand Up @@ -254,9 +252,8 @@ object Enrich {
.withName("split-oversized-pii")
.partition(_._2 >= MaxRecordSize)
Some((tooBigPiis, properlySizedPiis))
} else {
} else
None
}

/**
* Enrich a collector payload into a list of [[EnrichedEvent]].
Expand Down Expand Up @@ -326,13 +323,12 @@ object Enrich {
* @return Right if it exists, left otherwise
*/
private def checkTopicExists(sc: ScioContext, topicName: String): Either[String, Unit] =
if (sc.isTest) {
if (sc.isTest)
().asRight
} else {
else
PubSubAdmin.topic(sc.options.as(classOf[PubsubOptions]), topicName) match {
case scala.util.Success(_) => ().asRight
case scala.util.Failure(e) =>
s"Output topic $topicName couldn't be retrieved: ${e.getMessage}".asLeft
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ object config {
for {
_ <- if (args.optional("help").isDefined) helpString(configurations).asLeft else "".asRight
l <- configurations
.collect {
case RequiredConfiguration(key, _) =>
args.optional(key).toValidNel(s"Missing `$key` argument")
}
.sequence[ValidatedNelS, String]
.leftMap(_.toList.mkString("\n"))
.toEither
.collect {
case RequiredConfiguration(key, _) =>
args.optional(key).toValidNel(s"Missing `$key` argument")
}
.sequence[ValidatedNelS, String]
.leftMap(_.toList.mkString("\n"))
.toEither
List(jobName, raw, enriched, bad, resolver) = l
} yield EnrichConfig(
jobName,
Expand Down Expand Up @@ -160,16 +160,16 @@ object config {
fileContents <- readEnrichmentFiles(enrichmentsPath)
jsons <- fileContents.map(JsonUtils.extractJson(_)).sequence[EitherS, Json]
schemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"enrichments",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
"com.snowplowanalytics.snowplow",
"enrichments",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
enrichmentsJson = SelfDescribingData[Json](schemaKey, Json.fromValues(jsons)).asJson
_ <- EnrichmentRegistry
.parse(enrichmentsJson, client, false)
.leftMap(_.toList.mkString("\n"))
.toEither
.parse(enrichmentsJson, client, false)
.leftMap(_.toList.mkString("\n"))
.toEither
} yield enrichmentsJson

/** Reads all the enrichment files contained in a directory at the specified path. */
Expand All @@ -178,11 +178,11 @@ object config {
.map { p =>
for {
files <- Option(new File(p).listFiles)
.toRight(s"Enrichment directory `$p` does not exist")
.toRight(s"Enrichment directory `$p` does not exist")
read = files
.filter(_.getName.endsWith(".json"))
.map(Source.fromFile(_).mkString)
.toList
.filter(_.getName.endsWith(".json"))
.map(Source.fromFile(_).mkString)
.toList
} yield read
}
.getOrElse(Nil.asRight)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ object singleton {
* @param resolverJson JSON representing the Resolver
*/
def get(resolverJson: Json): Client[Id, Json] = {
if (instance == null) {
if (instance == null)
synchronized {
if (instance == null) {
if (instance == null)
instance = Client
.parseDefault[Id](resolverJson)
.valueOr(e => throw new RuntimeException(e.toString))
}
}
}
instance
}
}
Expand All @@ -55,16 +53,14 @@ object singleton {
* @param enrichmentConfs list of enabled enrichment configuration
*/
def get(enrichmentConfs: List[EnrichmentConf]): EnrichmentRegistry[Id] = {
if (instance == null) {
if (instance == null)
synchronized {
if (instance == null) {
if (instance == null)
instance = EnrichmentRegistry
.build[Id](enrichmentConfs)
.value
.valueOr(e => throw new RuntimeException(e.toString))
}
}
}
instance
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ object utils {
): BadRow = {
val originalBadRow = badRow.compact
val size = getSize(originalBadRow)
if (size > maxSizeBytes) {
if (size > maxSizeBytes)
BadRow
.SizeViolation(
processor,
Failure
.SizeViolation(Instant.now(), maxSizeBytes, size, "bad row exceeded the maximum size"),
Payload.RawPayload(originalBadRow.take(maxSizeBytes / ReductionFactor))
)
} else badRow
else badRow
}

/** The size of a string in bytes */
Expand All @@ -161,12 +161,12 @@ object utils {
*/
def createSymLink(file: File, symLink: String): Either[String, Path] = {
val symLinkPath = Paths.get(symLink)
if (!Files.exists(symLinkPath)) {
if (!Files.exists(symLinkPath))
Try(Files.createSymbolicLink(symLinkPath, file.toPath)) match {
case scala.util.Success(p) => Right(p)
case scala.util.Failure(t) => Left(s"Symlink can't be created: ${t.getMessage}")
}
} else Left(s"A file at path $symLinkPath already exists")
else Left(s"A file at path $symLinkPath already exists")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class UtilsSpec extends AnyFreeSpec with Matchers {
e.pii = "pii"
e
}
tabSeparatedEnrichedEvent(event) should not include ("pii")
tabSeparatedEnrichedEvent(event) should not include "pii"
}
}
"make a getPii function available" - {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object CallrailAdapter extends Adapter {
"empty querystring"
)
Monad[F].pure(failure.invalidNel)
} else {
} else
Monad[F].pure(
NonEmptyList
.of(
Expand All @@ -104,6 +104,5 @@ object CallrailAdapter extends Adapter {
)
.valid
)
}
}
}

0 comments on commit b751fc8

Please sign in to comment.