Skip to content

Commit

Permalink
Merge d8e9314 into bed2c1a
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 11, 2021
2 parents bed2c1a + d8e9314 commit 49b6242
Show file tree
Hide file tree
Showing 20 changed files with 723 additions and 314 deletions.
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -63,6 +63,7 @@ lazy val loader = project
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.fs2BlobCore,
Dependencies.decline,
Dependencies.config,
Dependencies.specs2
Expand Down
12 changes: 11 additions & 1 deletion config/config.kinesis.reference.hocon
Expand Up @@ -11,12 +11,22 @@
# Either TRIM_HORIZON or LATEST
"initialPosition": "TRIM_HORIZON"

# Optional, set the polling mode for retrieving records. Default is FanOut
# Optional, set the polling mode for retrieving records. Default is FanOut
# "retrievalMode": "FanOut"
# "retrievalMode": {
# "type": "Polling"
# "maxRecords": 1000
# }

# Optional, configure the checkpointer.
"checkpointSettings": {
# The max number of records to aggregate before checkpointing the records.
# Default is 1000.
"maxBatchSize": 1000
# The max amount of time to wait before checkpointing the records. It is in milliseconds.
# Default is 10000 milliseconds.
"maxBatchWait": 10000
}
}

"output" : {
Expand Down
16 changes: 16 additions & 0 deletions config/config.local.minimal.hocon
@@ -0,0 +1,16 @@
# The minimum required config options for loading from local source
{
"input": {
"type": "LocalFS"
"path": "/tmp/example"
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
32 changes: 32 additions & 0 deletions config/config.local.reference.hocon
@@ -0,0 +1,32 @@
{
"input": {
# Enable the local event source
"type": "LocalFS"
# Path for event source. It can be directory or file.
# If it is directory, all the files under given directory will be read recursively.
# Also, given path can be both absolute path or relative path w.r.t. executable.
"path": "./tmp/example"
}

"output" : {
"type": "Postgres"
# PostgreSQL host ('localhost' for enabled SSH Tunnel)
"host": "localhost"
# PostgreSQL database port
"port": 5432
# PostgreSQL database name
"database": "snowplow"
# PostgreSQL user to load data
"username": "postgres"
# PostgreSQL password, either plain text or from an environment variable
"password": "mysecretpassword"
"password": ${?POSTGRES_PASSWORD}
# PostgreSQL database schema
"schema": "atomic"
# JDBC ssl mode
"sslMode": "REQUIRE"
}

# Kind of data stored in this instance. Either ENRICHED_EVENTS or JSON
"purpose": "ENRICHED_EVENTS"
}
15 changes: 15 additions & 0 deletions config/config.local.relativetest.hocon
@@ -0,0 +1,15 @@
{
"input": {
"type": "LocalFS"
"path": "tmp/example"
}

"output" : {
"type": "Postgres"
"host": "localhost"
"database": "snowplow"
"username": "postgres"
"password": ${POSTGRES_PASSWORD}
"schema": "atomic"
}
}
Expand Up @@ -18,35 +18,35 @@ import doobie.hikari.HikariTransactor

/** Evaluates effects, possibly concurrently, and emits the results downstream in any order
*/
trait UnorderedPipe[F[_]] {
trait OrderedPipe[F[_]] {
def apply[A, B](f: A => F[B]): Pipe[F, A, B]
}

object UnorderedPipe {
object OrderedPipe {

/** An UnorderedPipe in which results are emitted in the same order as the inputs
/** An OrderedPipe in which results are emitted in the same order as the inputs
*
* Use this UnorderedPipe when a `Concurrent[F]` is not available
* Use this OrderedPipe when a `Concurrent[F]` is not available
*/
def sequential[F[_]]: UnorderedPipe[F] =
new UnorderedPipe[F] {
def sequential[F[_]]: OrderedPipe[F] =
new OrderedPipe[F] {
override def apply[A, B](f: A => F[B]): Pipe[F, A, B] =
_.evalMap(f)
}

/** An UnorderedPipe that evaluates effects in parallel.
/** An OrderedPipe that evaluates effects in parallel.
*/
def concurrent[F[_]: Concurrent](maxConcurrent: Int): UnorderedPipe[F] =
new UnorderedPipe[F] {
def concurrent[F[_]: Concurrent](maxConcurrent: Int): OrderedPipe[F] =
new OrderedPipe[F] {
override def apply[A, B](f: A => F[B]): Pipe[F, A, B] =
_.parEvalMapUnordered(maxConcurrent)(f)
_.parEvalMap(maxConcurrent)(f)
}

/** A concurrent UnorderedPipe whose parallelism matches the size of the transactor's underlying connection pool.
/** A concurrent OrderedPipe whose parallelism matches the size of the transactor's underlying connection pool.
*
* Use this UnorderedPipe whenever the effect requires a database connection
* Use this OrderedPipe whenever the effect requires a database connection
*/
def forTransactor[F[_]: Concurrent](xa: HikariTransactor[F]): UnorderedPipe[F] =
def forTransactor[F[_]: Concurrent](xa: HikariTransactor[F]): OrderedPipe[F] =
concurrent(xa.kernel.getMaximumPoolSize)

}
Expand Up @@ -17,8 +17,6 @@ import cats.implicits._

import cats.effect.{Clock, ContextShift, Sync}

import fs2.Pipe

import doobie._
import doobie.implicits._

Expand All @@ -42,38 +40,25 @@ object sink {

type Insert = ConnectionIO[Unit]

def sinkResult[F[_]: Sync: ContextShift: Clock: DB](
state: State[F],
client: Client[F, Json],
processor: Processor
)(result: Either[BadData, Data]): F[Unit] =
result.fold(sinkBad[F], sinkGood[F](state, client, processor)(_).flatMap {
case Left(badData) => sinkBad[F](badData)
case Right(_) => Sync[F].unit
})

/**
* Sink good events into Postgres. During sinking, payloads go through all transformation steps
* and checking the state of the DB itself.
* Events that could not be transformed (due Iglu errors or DB unavailability) are emitted from
* the pipe
* @param unorderdPipe pipe which might optimise by processing events concurrently
* Events that could not be transformed (due Iglu errors or DB unavailability) are emitted as bad data
* @param state mutable Loader state
* @param client Iglu Client
* @param processor The actor processing these events
*/
def goodSink[F[_]: Sync: Clock: DB](unorderedPipe: UnorderedPipe[F],
state: State[F],
client: Client[F, Json],
processor: Processor
): Pipe[F, Data, BadData] =
unorderedPipe(sinkPayload(state, client, processor)).andThen {
_.collect {
case Left(badData) => badData
}
}

/** Sink bad data coming directly into the `Pipe` */
def badSink[F[_]: Sync: ContextShift]: Pipe[F, BadData, Unit] =
_.evalMap {
case BadData.BadEnriched(row) => Sync[F].delay(logger.warn(row.compact))
case BadData.BadJson(payload, error) => Sync[F].delay(logger.warn(s"Cannot parse $payload. $error"))
}

/** Implementation for [[goodSink]] */
def sinkPayload[F[_]: Sync: Clock: DB](state: State[F], client: Client[F, Json], processor: Processor)(
payload: Data
): F[Either[BadData, Unit]] = {
def sinkGood[F[_]: Sync: Clock: DB](state: State[F], client: Client[F, Json], processor: Processor)(payload: Data): F[Either[BadData, Unit]] = {
val result = for {
entities <- payload match {
case Data.Snowplow(event) =>
Expand All @@ -92,10 +77,16 @@ object sink {
}
}
} yield insert

result.value
}

/** Implementation for [[badSink] */
def sinkBad[F[_]: Sync: ContextShift: Clock: DB](badData: BadData): F[Unit] =
badData match {
case BadData.BadEnriched(row) => Sync[F].delay(logger.warn(row.compact))
case BadData.BadJson(payload, error) => Sync[F].delay(logger.warn(s"Cannot parse $payload. $error"))
}

/**
* Build an `INSERT` action for a single entity
* Multiple inserts later can be combined into a transaction
Expand Down
Expand Up @@ -35,7 +35,7 @@ class sinkspec extends Database {
import Database._

val processor = Processor("pgloader", "test")
val unorderedPipe = UnorderedPipe.concurrent[IO](5)
val orderedPipe = OrderedPipe.concurrent[IO](5)

"goodSink" should {
"sink a single good event" >> {
Expand All @@ -48,7 +48,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
_ <- stream.through(sink.goodSink(unorderedPipe, state, igluClient, processor)).compile.drain.action
_ <- stream.through(orderedPipe(sink.sinkGood(state, igluClient, processor))).compile.drain.action
eventIds <- query.action
uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action
} yield (eventIds, uaParserCtxs)
Expand All @@ -70,7 +70,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
_ <- stream.through(sink.goodSink(unorderedPipe, state, igluClient, processor)).compile.drain.action
_ <- stream.through(orderedPipe(sink.sinkGood(state, igluClient, processor))).compile.drain.action
eventIds <- query.action
rows <- count("com_getvero_bounced_1").action
} yield (eventIds, rows)
Expand Down Expand Up @@ -109,7 +109,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
_ <- stream.through(sink.goodSink(unorderedPipe, state, igluClient, processor)).compile.drain.action
_ <- stream.through(orderedPipe(sink.sinkGood(state, igluClient, processor))).compile.drain.action
rows <- count("me_chuwy_pg_test_1").action
table <- describeTable("me_chuwy_pg_test_1").action
} yield (rows, table)
Expand Down
4 changes: 4 additions & 0 deletions modules/loader/src/main/resources/application.conf
Expand Up @@ -7,6 +7,10 @@
"maxRecords": 10000 # Only used if `type` is changed to Polling
}
"initialPosition": "TRIM_HORIZON"
"checkpointSettings": {
"maxBatchSize": 1000
"maxBatchWait": 10000
}
}

"output": {
Expand Down
Expand Up @@ -14,6 +14,7 @@ package com.snowplowanalytics.snowplow.postgres.config

import java.nio.file.{InvalidPathException, Paths}
import java.util.Base64
import java.nio.file.Files

import cats.data.{EitherT, ValidatedNel}
import cats.implicits._
Expand All @@ -35,6 +36,8 @@ import com.snowplowanalytics.snowplow.badrows.Processor
import com.monovore.decline._

import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Source.LocalFS
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Source.LocalFS.PathInfo

case class Cli[F[_]](config: LoaderConfig, iglu: Client[F, Json])

Expand All @@ -49,6 +52,20 @@ object Cli {
case Right(rawConfig) => fromRawConfig(rawConfig)
}

def configPreCheck[F[_]: Sync](cli: Cli[F]): EitherT[F, String, Cli[F]] =
cli.config.input match {
case LocalFS(pathInfo) =>
fileExists(pathInfo)
.attemptT
.leftMap(_.toString)
.ensure(s"Local source path [${pathInfo.allPath}] does not exist")(identity)
.as(cli)
case _ => EitherT.pure(cli)
}

private def fileExists[F[_]: Sync](pathInfo: PathInfo): F[Boolean] =
Sync[F].delay(Files.exists(pathInfo.allPath))

private def fromRawConfig[F[_]: Sync: Clock](rawConfig: RawConfig): EitherT[F, String, Cli[F]] =
for {
resolverJson <- loadJson(rawConfig.resolver).toEitherT[F]
Expand Down

0 comments on commit 49b6242

Please sign in to comment.