diff --git a/build.sbt b/build.sbt index 4da4082..0eb5dd3 100644 --- a/build.sbt +++ b/build.sbt @@ -20,18 +20,14 @@ lazy val common = project .settings(name := "snowplow-postgres") .enablePlugins(BuildInfoPlugin) .settings(BuildSettings.projectSettings) - .settings(BuildSettings.buildInfoSettings) .settings(BuildSettings.scoverageSettings) - .settings(BuildSettings.addExampleConfToTestCp) .settings(BuildSettings.mavenSettings) .settings( resolvers += Dependencies.SnowplowBintray, libraryDependencies ++= Seq( Dependencies.logger, Dependencies.postgres, - Dependencies.commons, Dependencies.catsEffect, - Dependencies.decline, Dependencies.circe, Dependencies.circeGeneric, Dependencies.circeExtras, @@ -41,8 +37,6 @@ lazy val common = project Dependencies.doobiePg, Dependencies.doobiePgCirce, Dependencies.doobieHikari, - Dependencies.fs2Aws, - Dependencies.fs2PubSub, Dependencies.analyticsSdk, Dependencies.badRows, Dependencies.schemaDdl, @@ -57,6 +51,17 @@ lazy val loader = project .settings(name := "snowplow-postgres-loader") .settings(BuildSettings.projectSettings) .settings(BuildSettings.dockerSettings) + .settings(BuildSettings.buildInfoSettings) + .settings(BuildSettings.addExampleConfToTestCp) + .settings( + libraryDependencies ++= Seq( + Dependencies.commons, + Dependencies.fs2Aws, + Dependencies.fs2PubSub, + Dependencies.decline, + Dependencies.specs2 + ) + ) .dependsOn(common) .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala index 69a2ff2..9969070 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala @@ -27,14 +27,14 @@ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList -import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, schema} +import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema} import com.snowplowanalytics.snowplow.postgres.storage.ddl import com.snowplowanalytics.snowplow.postgres.streaming.sink trait DB[F[_]] { def insert(event: List[Entity]): F[Unit] def alter(schemaKey: SchemaKey): F[Unit] - def create(schemaKey: SchemaKey): F[Unit] + def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] def getSchemaList(schemaKey: SchemaKey): F[SchemaList] } @@ -43,14 +43,18 @@ object DB { def apply[F[_]](implicit ev: DB[F]): DB[F] = ev - def process[F[_]](event: List[Entity], state: State[F]) + def process[F[_]](shredded: Shredded, state: State[F]) (implicit D: DB[F], B: Bracket[F, Throwable]): F[Unit] = { - val insert = D.insert(event) + val (includeMeta, entities) = shredded match { + case Shredded.ShreddedSnowplow(atomic, entities) => (true, atomic :: entities) + case Shredded.ShreddedSelfDescribing(entity) => (false, List(entity)) + } + val insert = D.insert(entities) // Mutate table and Loader's mutable variable. Only for locked state! def mutate(missing: Set[SchemaKey], outdated: Set[SchemaKey]): F[Unit] = for { - _ <- missing.toList.traverse(D.create) // Create missing tables if any + _ <- missing.toList.traverse(key => D.create(key, includeMeta)) // Create missing tables if any _ <- outdated.toList.traverse(D.alter) // Updated outdated tables if any _ <- (missing ++ outdated).toList.traverse_ { entity => for { // Update state with new schemas @@ -60,7 +64,7 @@ object DB { } } yield () - state.checkAndRun(_.checkEvent(event), insert, mutate) + state.checkAndRun(_.checkEvent(entities), insert, mutate) } @@ -87,8 +91,7 @@ object DB { def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], logger: LogHandler, - schemaName: String, - meta: Boolean): DB[F] = new DB[F] { + schemaName: String): DB[F] = new DB[F] { def insert(event: List[Entity]): F[Unit] = event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa) @@ -97,8 +100,8 @@ object DB { rethrow(result.semiflatMap(_.transact(xa))) } - def create(schemaKey: SchemaKey): F[Unit] = { - val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, meta) + def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] = { + val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, includeMeta) rethrow(result.semiflatMap(_.transact(xa))) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala deleted file mode 100644 index c56eec6..0000000 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.postgres.config - -import java.util.Base64 - -import cats.syntax.either._ -import cats.data.ValidatedNel - -import io.circe.Json -import io.circe.parser.{ parse => jsonParse } - -import com.monovore.decline.Argument - -/** Base64-encoded JSON */ -case class Base64Encoded(json: Json) extends AnyVal - -object Base64Encoded { - def parse(string: String): Either[String, Base64Encoded] = - Either - .catchOnly[IllegalArgumentException](Base64.getDecoder.decode(string)) - .map(bytes => new String(bytes)) - .flatMap(str => jsonParse(str)) - .leftMap(e => s"Cannot parse ${string} as Base64-encoded JSON: ${e.getMessage}") - .map(json => Base64Encoded(json)) - - implicit def base64EncodedDeclineArg: Argument[Base64Encoded] = - new Argument[Base64Encoded] { - def read(string: String): ValidatedNel[String, Base64Encoded] = - Base64Encoded.parse(string).toValidatedNel - - def defaultMetavar: String = "base64" - } -} - diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala new file mode 100644 index 0000000..9a7769e --- /dev/null +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.config + +import DBConfig.JdbcUri + +case class DBConfig(host: String, + port: Int, + database: String, + username: String, + password: String, // TODO: can be EC2 store + sslMode: String, + schema: String) { + def getJdbc: JdbcUri = + JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-')) +} + +object DBConfig { + + + case class JdbcUri(host: String, port: Int, database: String, sslMode: String) { + override def toString = + s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode" + } + +} diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala index 9c78fe0..c90759f 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala @@ -22,47 +22,51 @@ import doobie.util.ExecutionContexts import doobie.util.log.LogHandler import doobie.util.transactor.Transactor -import fs2.concurrent.Queue - import io.circe.Json import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.snowplow.postgres.api.State -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri -import com.snowplowanalytics.snowplow.postgres.streaming.source.BadData +import com.snowplowanalytics.snowplow.postgres.config.DBConfig +import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri object resources { + val FixedThreadPoolSize: Int = 32 + /** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */ - def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: LoaderConfig, + def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, logger: LogHandler, iglu: Client[F, Json]) = for { blocker <- Blocker[F] - badQueue <- Resource.liftF(Queue.bounded[F, BadData](128)) xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker) - keysF = for { - ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate) - (issues, comments) = ci - _ <- issues.traverse_(issue => Sync[F].delay(println(issue))) - } yield comments - keys <- Resource.liftF(keysF) - initState = State.init[F](keys, iglu.resolver).value.flatMap { + state <- Resource.liftF(initializeState(postgres, logger, iglu, xa)) + } yield (blocker, xa, state) + + def initializeState[F[_]: Concurrent: Clock](postgres: DBConfig, + logger: LogHandler, + iglu: Client[F, Json], + xa: Transactor[F]): F[State[F]] = { + for { + ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate) + (issues, comments) = ci + _ <- issues.traverse_(issue => Sync[F].delay(println(issue))) + initState = State.init[F](comments, iglu.resolver).value.flatMap { case Left(error) => val exception = new RuntimeException(s"Couldn't initalise the state $error") Sync[F].raiseError[State[F]](exception) case Right(state) => Sync[F].pure(state) } - state <- Resource.liftF(initState) - } yield (blocker, xa, state, badQueue) + state <- initState + } yield state + } /** Get a HikariCP transactor */ def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] = for { - ce <- ExecutionContexts.fixedThreadPool[F](32) + ce <- ExecutionContexts.fixedThreadPool[F](FixedThreadPoolSize) xa <- HikariTransactor.newHikariTransactor[F]("org.postgresql.Driver", jdbcUri.toString, user, password, ce, be) } yield xa diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala new file mode 100644 index 0000000..3042bd9 --- /dev/null +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.shredding + +sealed trait Shredded + +object Shredded { + case class ShreddedSnowplow(event: Entity, entities: List[Entity]) extends Shredded + case class ShreddedSelfDescribing(entity: Entity) extends Shredded +} diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala index ea4948c..a7f1062 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala @@ -34,25 +34,25 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, Schema} import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload} +import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload, Processor} import Entity.Column -import com.snowplowanalytics.snowplow.postgres.config.Cli +import Shredded.{ShreddedSelfDescribing, ShreddedSnowplow} object transform { val Atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) /** Transform the whole `Event` (canonical and JSONs) into list of independent entities ready to be inserted */ - def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], event: Event): EitherT[F, BadRow, List[Entity]] = { + def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], processor: Processor, event: Event): EitherT[F, BadRow, ShreddedSnowplow] = { val entities = event.contexts.data ++ event.derived_contexts.data ++ event.unstruct_event.data.toList val wholeEvent = entities .parTraverse(shredJson(client)) .value .map { shreddedOrError => (shreddedOrError, shredAtomic(Map())(event)).mapN { - (shreddedEntities, atomic) => atomic :: shreddedEntities.map(addMetadata(event.event_id, event.collector_tstamp)) + (shreddedEntities, atomic) => ShreddedSnowplow(atomic, shreddedEntities.map(_.entity).map(addMetadata(event.event_id, event.collector_tstamp))) } } - EitherT(wholeEvent).leftMap[BadRow](buildBadRow(event)) + EitherT(wholeEvent).leftMap[BadRow](buildBadRow(processor, event)) } def addMetadata(eventId: UUID, tstamp: Instant)(entity: Entity): Entity = { @@ -84,7 +84,7 @@ object transform { /** Transform JSON into [[Entity]] */ def shredJson[F[_]: Sync: Clock](client: Client[F, Json]) - (data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], Entity] = { + (data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], ShreddedSelfDescribing] = { val key = data.schema schema.getOrdered(client.resolver)(key.vendor, key.name, key.version.model) .leftMap { error => NonEmptyList.of(error) } @@ -106,7 +106,7 @@ object transform { case Atomic => "events" case other => StringUtils.getTableName(SchemaMap(other)) } - Entity(tableName, data.schema, columns) + ShreddedSelfDescribing(Entity(tableName, data.schema, columns)) } } } @@ -282,7 +282,7 @@ object transform { (columnName, dataType, value) } - private def buildBadRow(event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) = - BadRow.LoaderIgluError(Cli.processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) + private def buildBadRow(processor: Processor, event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) = + BadRow.LoaderIgluError(processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala new file mode 100644 index 0000000..42ccbe5 --- /dev/null +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.streaming + +import com.snowplowanalytics.iglu.core.SelfDescribingData + +import io.circe.Json + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.BadRow + +object data { + + /** Kind of data flowing through the Loader */ + sealed trait Data extends Product with Serializable { + def snowplow: Boolean = this match { + case _: Data.Snowplow => true + case _: Data.SelfDescribing => false + } + } + + object Data { + case class Snowplow(data: Event) extends Data + case class SelfDescribing(data: SelfDescribingData[Json]) extends Data + } + + /** Data that for some reasons cannot be inserted into DB */ + sealed trait BadData extends Throwable with Product with Serializable + object BadData { + /** Typical Snowplow bad row (Loader Iglu Error etc) */ + case class BadEnriched(data: BadRow) extends BadData + /** Non-enriched error */ + case class BadJson(payload: String, error: String) extends BadData + } +} + diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala index 0038a4b..5195ace 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala @@ -12,12 +12,12 @@ */ package com.snowplowanalytics.snowplow.postgres.streaming +import cats.data.EitherT import cats.implicits._ import cats.effect.{Sync, Clock, Concurrent} import fs2.Pipe -import fs2.concurrent.Queue import doobie._ import doobie.implicits._ @@ -28,11 +28,11 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.Client -import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor} import com.snowplowanalytics.snowplow.postgres.api.{State, DB} -import com.snowplowanalytics.snowplow.postgres.config.Cli.processor +import com.snowplowanalytics.snowplow.postgres.resources.FixedThreadPoolSize import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, transform} -import com.snowplowanalytics.snowplow.postgres.streaming.source.{Data, BadData} +import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData} object sink { @@ -41,59 +41,54 @@ object sink { /** * Sink good events into Postgres. During sinking, payloads go through all transformation steps * and checking the state of the DB itself. - * Events that could not be transformed (due Iglu errors or DB unavailability) are sent back - * to `badQueue` + * Events that could not be transformed (due Iglu errors or DB unavailability) are emitted from + * the pipe * @param state mutable Loader state - * @param badQueue queue where all unsucessful actions can unload its results * @param client Iglu Client + * @param processor The actor processing these events */ def goodSink[F[_]: Concurrent: Clock: DB](state: State[F], - badQueue: Queue[F, BadData], - client: Client[F, Json]): Pipe[F, Data, Unit] = - _.parEvalMapUnordered(32)(sinkPayload(state, badQueue, client)) - - /** Sink bad data coming directly into the `Pipe` and data coming from `badQueue` */ - def badSink[F[_]: Concurrent](badQueue: Queue[F, BadData]): Pipe[F, BadData, Unit] = - _.merge(badQueue.dequeue).evalMap { + client: Client[F, Json], + processor: Processor): Pipe[F, Data, BadData] = + _.parEvalMapUnordered(FixedThreadPoolSize)(sinkPayload(state, client, processor)) + .collect { + case Left(badData) => badData + } + + /** Sink bad data coming directly into the `Pipe` */ + def badSink[F[_]: Concurrent]: Pipe[F, BadData, Unit] = + _.evalMap { case BadData.BadEnriched(row) => Sync[F].delay(println(row.compact)) case BadData.BadJson(payload, error) => Sync[F].delay(println(s"Cannot parse $payload. $error")) } /** Implementation for [[goodSink]] */ def sinkPayload[F[_]: Sync: Clock: DB](state: State[F], - badQueue: Queue[F, BadData], - client: Client[F, Json])(payload: Data): F[Unit] = { + client: Client[F, Json], + processor: Processor)(payload: Data): F[Either[BadData, Unit]] = { val result = for { entities <- payload match { case Data.Snowplow(event) => transform - .shredEvent[F](client, event) + .shredEvent[F](client, processor, event) .leftMap(bad => BadData.BadEnriched(bad)) case Data.SelfDescribing(json) => transform .shredJson(client)(json) - .map(entity => List(entity)) .leftMap(errors => BadData.BadJson(json.normalize.noSpaces, errors.toString)) } - insert = DB.process(entities, state).attempt.flatMap { - case Right(_) => Sync[F].unit - case Left(error) => payload match { + insert <- EitherT(DB.process(entities, state).attempt).leftMap { + case error => payload match { case Data.Snowplow(event) => val badRow = BadRow.LoaderRuntimeError(processor, error.getMessage, Payload.LoaderPayload(event)) - val pgBadRow = BadData.BadEnriched(badRow) - badQueue.enqueue1(pgBadRow) + BadData.BadEnriched(badRow) case Data.SelfDescribing(json) => - val pgBadRow = BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}") - badQueue.enqueue1(pgBadRow) - + BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}") } } } yield insert - result.value.flatMap { - case Right(action) => action - case Left(error) => badQueue.enqueue1(error) - } + result.value } /** diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala index ce2720c..7be64a6 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala @@ -23,7 +23,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry.{HttpConne import com.snowplowanalytics.iglu.client.validator.CirceValidator import com.snowplowanalytics.snowplow.badrows.FailureDetails -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri +import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri trait Database extends Specification with BeforeAfterEach { import Database._ @@ -47,7 +47,7 @@ object Database { val logger: LogHandler = LogHandler.nop implicit val CS: ContextShift[IO] = IO.contextShift(concurrent.ExecutionContext.global) - val jdbcUri = JdbcUri("localhost", 5432, "snowplow") + val jdbcUri = JdbcUri("localhost", 5432, "snowplow", "allow") val registry = Http(Config("localhost registry", 1, Nil), HttpConnection(URI.create("http://localhost:8080/api/"), None)) val igluClient = Client[IO, Json](Resolver(List(Registry.IgluCentral, registry), None), CirceValidator) val xa: Transactor[IO] = resources.getTransactorDefault[IO](jdbcUri, "postgres", "mysecretpassword") diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala index 9d1ac93..d8c56f9 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala @@ -17,7 +17,6 @@ import java.util.UUID import cats.effect.IO import fs2.Stream -import fs2.concurrent.Queue import io.circe.Json import io.circe.literal._ @@ -27,25 +26,29 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.postgres.Database import com.snowplowanalytics.snowplow.postgres.api.{State, DB} -import com.snowplowanalytics.snowplow.postgres.streaming.source.{Data, BadData} +import com.snowplowanalytics.snowplow.postgres.streaming.data.Data + class sinkspec extends Database { import Database._ + val processor = Processor("pgloader", "test") + + "goodSink" should { "sink a single good event" >> { val line = "snowplowweb\tweb\t2018-12-18 15:07:17.970\t2016-03-29 07:28:18.611\t2016-03-29 07:28:18.634\tpage_view\t11cdec7b-4cbd-4aa4-a4c9-3874ab9663d4\t\tsnplow6\tjs-2.6.0\tssc-0.6.0-kinesis\tspark-1.16.0-common-0.35.0\t34df2c48bc170c87befb441732a94196\t372d1f2983860eefd262b58e6592dfbc\t80546dc70f4a91f1283c4b6247e31bcf\t26e6412a2421eb923d9d40258ca9ca69\t1\t3a12e8b8e3e91a4d092b833d583c7e30\tDK\t82\tOdder\t8300\t42.0001\t42.003\tCentral Jutland\tTDC Danmark\tTDC Danmark\t\t\thttp://snowplowanalytics.com/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\tMarket basket analysis - identifying products and content that go well together – Snowplow\thttp://snowplowanalytics.com/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\thttp\tsnowplowanalytics.com\t80\t/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\thttp\tsnowplowanalytics.com\t80\t/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\tinternal\t\t\t\t\t\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0\",\"data\":{\"id\":\"05862d26-0dde-4d7a-a494-fc9aae283d23\"}},{\"schema\":\"iglu:org.schema/WebPage/jsonschema/1-0-0\",\"data\":{\"genre\":\"documentation\",\"inLanguage\":\"en-US\"}},{\"schema\":\"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0\",\"data\":{\"navigationStart\":1459236496534,\"unloadEventStart\":1459236496838,\"unloadEventEnd\":1459236496838,\"redirectStart\":0,\"redirectEnd\":0,\"fetchStart\":1459236496534,\"domainLookupStart\":1459236496534,\"domainLookupEnd\":1459236496534,\"connectStart\":1459236496534,\"connectEnd\":1459236496534,\"secureConnectionStart\":0,\"requestStart\":1459236496580,\"responseStart\":1459236496834,\"responseEnd\":1459236496844,\"domLoading\":1459236496853,\"domInteractive\":1459236497780,\"domContentLoadedEventStart\":1459236497780,\"domContentLoadedEventEnd\":1459236498038,\"domComplete\":0,\"loadEventStart\":0,\"loadEventEnd\":0,\"chromeFirstPaint\":1459236498203}}]}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tMozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36\tChrome 49\tChrome\t49.0.2623.87\tBrowser\tWEBKIT\ten-US\t1\t1\t0\t0\t0\t0\t0\t0\t0\t1\t24\t1920\t1075\tWindows 7\tWindows\tMicrosoft Corporation\tEurope/Berlin\tComputer\t0\t1920\t1200\tUTF-8\t1903\t11214\t\t\t\t\t\t\t\tEurope/Copenhagen\t\t\t\t2016-03-29 07:28:18.636\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0\",\"data\":{\"useragentFamily\":\"Chrome\",\"useragentMajor\":\"49\",\"useragentMinor\":\"0\",\"useragentPatch\":\"2623\",\"useragentVersion\":\"Chrome 49.0.2623\",\"osFamily\":\"Windows\",\"osMajor\":\"7\",\"osMinor\":null,\"osPatch\":null,\"osPatchMinor\":null,\"osVersion\":\"Windows 7\",\"deviceFamily\":\"Other\"}}]}\t88c23330-ac4d-4c82-8a18-aa83c1e0c163\t2016-03-29 07:28:18.609\tcom.snowplowanalytics.snowplow\tpage_view\tjsonschema\t1-0-0\tcab5ba164038f31d8e10befc4eb199df\t" val event = Event.parse(line).getOrElse(throw new RuntimeException("Event is invalid")) val stream = Stream.emit[IO, Data](Data.Snowplow(event)) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, true) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action eventIds <- query.action uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action } yield (eventIds, uaParserCtxs) @@ -63,12 +66,11 @@ class sinkspec extends Database { val json = SelfDescribingData.parse(row).getOrElse(throw new RuntimeException("Invalid SelfDescribingData")) val stream = Stream.emit[IO, Data](Data.SelfDescribing(json)) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, false) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action eventIds <- query.action rows <- count("com_getvero_bounced_1").action } yield (eventIds, rows) @@ -101,12 +103,11 @@ class sinkspec extends Database { ColumnInfo("big_int", None, true, "bigint", None), ) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, false) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action rows <- count("me_chuwy_pg_test_1").action table <- describeTable("me_chuwy_pg_test_1").action } yield (rows, table) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala similarity index 94% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala index 1e8e6f7..0528434 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala @@ -35,7 +35,7 @@ import com.monovore.decline._ import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo -case class Cli[F[_]](postgres: LoaderConfig, iglu: Client[F, Json], debug: Boolean) +case class Cli[F[_]](config: LoaderConfig, iglu: Client[F, Json], debug: Boolean) object Cli { @@ -55,8 +55,8 @@ object Cli { configJson <- PathOrJson.load(rawConfig.config) configData <- SelfDescribingData.parse(configJson).leftMap(e => s"Configuration JSON is not self-describing, ${e.message(configJson.noSpaces)}").toEitherT[F] _ <- igluClient.check(configData).leftMap(e => s"Iglu validation failed with following error\n: ${e.asJson.spaces2}") - pgConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") - } yield Cli(pgConfig, igluClient, rawConfig.debug) + appConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") + } yield Cli(appConfig, igluClient, rawConfig.debug) } /** Config files for Loader can be passed either as FS path diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala similarity index 82% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index 1505d51..3b4d290 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -24,46 +24,28 @@ import io.circe.generic.semiauto.deriveDecoder import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredDecoder -import LoaderConfig.{JdbcUri, Source, Purpose} +import LoaderConfig.{Purpose, Source} import software.amazon.awssdk.regions.Region import software.amazon.kinesis.common.InitialPositionInStream case class LoaderConfig(name: String, - id: UUID, - source: Source, - host: String, - port: Int, - database: String, - username: String, - password: String, // TODO: can be EC2 store - sslMode: String, - schema: String, - purpose: Purpose) { - def getJdbc: JdbcUri = - JdbcUri(host, port, database) + id: UUID, + source: Source, + host: String, + port: Int, + database: String, + username: String, + password: String, // TODO: can be EC2 store + sslMode: String, + schema: String, + purpose: Purpose) { + def getDBConfig: DBConfig = + DBConfig(host, port, database, username, password, sslMode, schema) } object LoaderConfig { - sealed trait Purpose extends Product with Serializable { - def snowplow: Boolean = this match { - case Purpose.Enriched => true - case Purpose.SelfDescribing => false - } - } - object Purpose { - case object Enriched extends Purpose - case object SelfDescribing extends Purpose - - implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] = - Decoder.decodeString.emap { - case "ENRICHED_EVENTS" => Enriched.asRight - case "JSON" => SelfDescribing.asRight - case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft - } - } - implicit val awsRegionDecoder: Decoder[Region] = Decoder.decodeString.emap { s => val allRegions = Region.regions().asScala.toSet.map((r: Region) => r.id()) @@ -107,6 +89,19 @@ object LoaderConfig { } } + sealed trait Purpose extends Product with Serializable + object Purpose { + case object Enriched extends Purpose + case object SelfDescribing extends Purpose + + implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] = + Decoder.decodeString.emap { + case "ENRICHED_EVENTS" => Enriched.asRight + case "JSON" => SelfDescribing.asRight + case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft + } + } + sealed trait Source extends Product with Serializable object Source { @@ -120,11 +115,6 @@ object LoaderConfig { deriveConfiguredDecoder[Source] } - case class JdbcUri(host: String, port: Int, database: String) { - override def toString = - s"jdbc:postgresql://$host:$port/$database" - } - implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] = deriveDecoder[LoaderConfig] diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala index f075a69..bc5ad34 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -16,32 +16,36 @@ import cats.effect.{ExitCode, IO, IOApp} import doobie.util.log.LogHandler +import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.postgres.api.DB import com.snowplowanalytics.snowplow.postgres.config.Cli import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo import com.snowplowanalytics.snowplow.postgres.resources import com.snowplowanalytics.snowplow.postgres.storage.utils import com.snowplowanalytics.snowplow.postgres.streaming.{sink, source} object Main extends IOApp { + + val processor = Processor(BuildInfo.name, BuildInfo.version) + def run(args: List[String]): IO[ExitCode] = Cli.parse[IO](args).value.flatMap { - case Right(Cli(postgres, iglu, debug)) => + case Right(Cli(loaderConfig, iglu, debug)) => val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop - resources.initialize[IO](postgres, logger, iglu).use { - case (blocker, xa, state, badQueue) => - source.getSource[IO](blocker, postgres.purpose, postgres.source) match { + resources.initialize[IO](loaderConfig.getDBConfig, logger, iglu).use { + case (blocker, xa, state) => + source.getSource[IO](blocker, loaderConfig.purpose, loaderConfig.source) match { case Right(dataStream) => - val meta = postgres.purpose.snowplow - implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, postgres.schema, meta) + implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, loaderConfig.schema) for { - _ <- postgres.purpose match { - case Purpose.Enriched => utils.prepare[IO](postgres.schema, xa, logger) + _ <- loaderConfig.purpose match { + case Purpose.Enriched => utils.prepare[IO](loaderConfig.schema, xa, logger) case Purpose.SelfDescribing => IO.unit } - goodSink = sink.goodSink[IO](state, badQueue, iglu) - badSink = sink.badSink[IO](badQueue) - s = dataStream.observeEither(badSink, goodSink) + goodSink = sink.goodSink[IO](state, iglu, processor) + badSink = sink.badSink[IO] + s = dataStream.observeEither(badSink, goodSink.andThen(_.through(badSink))) _ <- s.compile.drain } yield ExitCode.Success diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala similarity index 86% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala index 220fb19..28e8e4b 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala @@ -34,6 +34,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} import com.snowplowanalytics.snowplow.postgres.config.{LoaderConfig, Cli} import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.streaming.data.{BadData, Data} import com.google.pubsub.v1.PubsubMessage import com.permutive.pubsub.consumer.Model.{Subscription, ProjectId} @@ -50,7 +51,7 @@ object source { * @return either error or stream of parsed payloads */ def getSource[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, - purpose: LoaderConfig.Purpose, + purpose: Purpose, config: LoaderConfig.Source) = config match { case LoaderConfig.Source.Kinesis(appName, streamName, region, position) => @@ -109,28 +110,6 @@ object source { .flatMap(json => SelfDescribingData.parse[Json](json).leftMap(_.message(json.noSpaces))) .leftMap(error => BadData.BadJson(s, error)) - /** Kind of data flowing through the Loader */ - sealed trait Data extends Product with Serializable { - def snowplow: Boolean = this match { - case _: Data.Snowplow => true - case _: Data.SelfDescribing => false - } - } - - object Data { - case class Snowplow(data: Event) extends Data - case class SelfDescribing(data: SelfDescribingData[Json]) extends Data - } - - /** Data that for some reasons cannot be inserted into DB */ - sealed trait BadData extends Throwable with Product with Serializable - object BadData { - /** Typical Snowplow bad row (Loader Iglu Error etc) */ - case class BadEnriched(data: BadRow) extends BadData - /** Non-enriched error */ - case class BadJson(payload: String, error: String) extends BadData - } - def pubsubDataDecoder(purpose: Purpose): MessageDecoder[Either[BadData, Data]] = purpose match { case Purpose.Enriched => @@ -140,8 +119,7 @@ object source { } def pubsubErrorHandler[F[_]: Sync](message: PubsubMessage, error: Throwable, ack: F[Unit], nack: F[Unit]): F[Unit] = { - val _ = error - val _ = nack + val _ = (error, nack) Sync[F].delay(println(s"Couldn't handle ${message.getData.toStringUtf8}")) *> ack } diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala similarity index 98% rename from modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala rename to modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala index b20cdd6..1aefd45 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala @@ -17,7 +17,7 @@ import java.util.UUID import cats.effect.{IO, Clock} -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{Source, InitPosition, Purpose} +import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{Purpose, Source, InitPosition} import org.specs2.mutable.Specification import software.amazon.awssdk.regions.Region diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index b2c8a46..cd0ea85 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -28,10 +28,14 @@ import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._ import scoverage.ScoverageKeys._ object BuildSettings { + val scala212 = "2.12.11" + val scala213 = "2.13.3" + lazy val projectSettings = Seq( organization := "com.snowplowanalytics", - version := "0.1.0-rc3", - scalaVersion := "2.13.3", + version := "0.1.0-rc4", + scalaVersion := scala213, + crossScalaVersions := Seq(scala212, scala213), description := "Loading Snowplow enriched data into PostgreSQL in real-time", licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.html")), parallelExecution in Test := false