Skip to content

Commit

Permalink
Replaces the queue with pure pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter committed Aug 11, 2020
1 parent f913ced commit 5313d95
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
Expand Up @@ -22,16 +22,13 @@ import doobie.util.ExecutionContexts
import doobie.util.log.LogHandler
import doobie.util.transactor.Transactor

import fs2.concurrent.Queue

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.snowplow.postgres.api.State
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri
import com.snowplowanalytics.snowplow.postgres.streaming.data.BadData

object resources {

Expand All @@ -41,7 +38,6 @@ object resources {
iglu: Client[F, Json]) =
for {
blocker <- Blocker[F]
badQueue <- Resource.liftF(Queue.bounded[F, BadData](128))
xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker)
keysF = for {
ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate)
Expand All @@ -57,7 +53,7 @@ object resources {
Sync[F].pure(state)
}
state <- Resource.liftF(initState)
} yield (blocker, xa, state, badQueue)
} yield (blocker, xa, state)

/** Get a HikariCP transactor */
def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] =
Expand Down
Expand Up @@ -12,12 +12,12 @@
*/
package com.snowplowanalytics.snowplow.postgres.streaming

import cats.data.EitherT
import cats.implicits._

import cats.effect.{Sync, Clock, Concurrent}

import fs2.Pipe
import fs2.concurrent.Queue

import doobie._
import doobie.implicits._
Expand All @@ -40,31 +40,31 @@ object sink {
/**
* Sink good events into Postgres. During sinking, payloads go through all transformation steps
* and checking the state of the DB itself.
* Events that could not be transformed (due Iglu errors or DB unavailability) are sent back
* to `badQueue`
* Events that could not be transformed (due Iglu errors or DB unavailability) are emitted from
* the pipe
* @param state mutable Loader state
* @param badQueue queue where all unsucessful actions can unload its results
* @param client Iglu Client
* @param processor The actor processing these events
*/
def goodSink[F[_]: Concurrent: Clock: DB](state: State[F],
badQueue: Queue[F, BadData],
client: Client[F, Json],
processor: Processor): Pipe[F, Data, Unit] =
_.parEvalMapUnordered(32)(sinkPayload(state, badQueue, client, processor))

/** Sink bad data coming directly into the `Pipe` and data coming from `badQueue` */
def badSink[F[_]: Concurrent](badQueue: Queue[F, BadData]): Pipe[F, BadData, Unit] =
_.merge(badQueue.dequeue).evalMap {
processor: Processor): Pipe[F, Data, BadData] =
_.parEvalMapUnordered(32)(sinkPayload(state, client, processor))
.collect {
case Left(badData) => badData
}

/** Sink bad data coming directly into the `Pipe` */
def badSink[F[_]: Concurrent]: Pipe[F, BadData, Unit] =
_.evalMap {
case BadData.BadEnriched(row) => Sync[F].delay(println(row.compact))
case BadData.BadJson(payload, error) => Sync[F].delay(println(s"Cannot parse $payload. $error"))
}

/** Implementation for [[goodSink]] */
def sinkPayload[F[_]: Sync: Clock: DB](state: State[F],
badQueue: Queue[F, BadData],
client: Client[F, Json],
processor: Processor)(payload: Data): F[Unit] = {
processor: Processor)(payload: Data): F[Either[BadData, Unit]] = {
val result = for {
entities <- payload match {
case Data.Snowplow(event) =>
Expand All @@ -77,25 +77,18 @@ object sink {
.map(entity => List(entity))
.leftMap(errors => BadData.BadJson(json.normalize.noSpaces, errors.toString))
}
insert = DB.process(entities, state).attempt.flatMap {
case Right(_) => Sync[F].unit
case Left(error) => payload match {
insert <- EitherT(DB.process(entities, state).attempt).leftMap {
case error => payload match {
case Data.Snowplow(event) =>
val badRow = BadRow.LoaderRuntimeError(processor, error.getMessage, Payload.LoaderPayload(event))
val pgBadRow = BadData.BadEnriched(badRow)
badQueue.enqueue1(pgBadRow)
BadData.BadEnriched(badRow)
case Data.SelfDescribing(json) =>
val pgBadRow = BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}")
badQueue.enqueue1(pgBadRow)

BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}")
}
}
} yield insert

result.value.flatMap {
case Right(action) => action
case Left(error) => badQueue.enqueue1(error)
}
result.value
}

/**
Expand Down
Expand Up @@ -17,7 +17,6 @@ import java.util.UUID
import cats.effect.IO

import fs2.Stream
import fs2.concurrent.Queue

import io.circe.Json
import io.circe.literal._
Expand All @@ -30,7 +29,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.postgres.Database
import com.snowplowanalytics.snowplow.postgres.api.{State, DB}
import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData}
import com.snowplowanalytics.snowplow.postgres.streaming.data.Data


class sinkspec extends Database {
Expand All @@ -49,8 +48,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
queue <- Queue.bounded[IO, BadData](1).action
_ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action
_ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action
eventIds <- query.action
uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action
} yield (eventIds, uaParserCtxs)
Expand All @@ -72,8 +70,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
queue <- Queue.bounded[IO, BadData](1).action
_ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action
_ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action
eventIds <- query.action
rows <- count("com_getvero_bounced_1").action
} yield (eventIds, rows)
Expand Down Expand Up @@ -110,8 +107,7 @@ class sinkspec extends Database {

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
queue <- Queue.bounded[IO, BadData](1).action
_ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action
_ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action
rows <- count("me_chuwy_pg_test_1").action
table <- describeTable("me_chuwy_pg_test_1").action
} yield (rows, table)
Expand Down
Expand Up @@ -34,7 +34,7 @@ object Main extends IOApp {
case Right(Cli(appConfig, iglu, debug)) =>
val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop
resources.initialize[IO](appConfig.getLoaderConfig, logger, iglu).use {
case (blocker, xa, state, badQueue) =>
case (blocker, xa, state) =>
source.getSource[IO](blocker, appConfig.purpose, appConfig.source) match {
case Right(dataStream) =>
val meta = appConfig.purpose.snowplow
Expand All @@ -44,9 +44,9 @@ object Main extends IOApp {
case Purpose.Enriched => utils.prepare[IO](appConfig.schema, xa, logger)
case Purpose.SelfDescribing => IO.unit
}
goodSink = sink.goodSink[IO](state, badQueue, iglu, processor)
badSink = sink.badSink[IO](badQueue)
s = dataStream.observeEither(badSink, goodSink)
goodSink = sink.goodSink[IO](state, iglu, processor)
badSink = sink.badSink[IO]
s = dataStream.observeEither(badSink, goodSink.andThen(_.through(badSink)))

_ <- s.compile.drain
} yield ExitCode.Success
Expand Down

0 comments on commit 5313d95

Please sign in to comment.