From 9021c4fa2c5d0903bf1420daeb88e2b98dcd2711 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 10 Aug 2020 14:14:40 +0100 Subject: [PATCH 1/7] Configure JDBC connection with sslmode (closes #3) --- .../snowplow/postgres/config/LoaderConfig.scala | 6 +++--- .../com/snowplowanalytics/snowplow/postgres/Database.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index 1505d51..e3a06d4 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -41,7 +41,7 @@ case class LoaderConfig(name: String, schema: String, purpose: Purpose) { def getJdbc: JdbcUri = - JdbcUri(host, port, database) + JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-')) } object LoaderConfig { @@ -120,9 +120,9 @@ object LoaderConfig { deriveConfiguredDecoder[Source] } - case class JdbcUri(host: String, port: Int, database: String) { + case class JdbcUri(host: String, port: Int, database: String, sslMode: String) { override def toString = - s"jdbc:postgresql://$host:$port/$database" + s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode" } implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] = diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala index ce2720c..974d7f1 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala @@ -47,7 +47,7 @@ object Database { val logger: LogHandler = LogHandler.nop implicit val CS: ContextShift[IO] = IO.contextShift(concurrent.ExecutionContext.global) - val jdbcUri = JdbcUri("localhost", 5432, "snowplow") + val jdbcUri = JdbcUri("localhost", 5432, "snowplow", "allow") val registry = Http(Config("localhost registry", 1, Nil), HttpConnection(URI.create("http://localhost:8080/api/"), None)) val igluClient = Client[IO, Json](Resolver(List(Registry.IgluCentral, registry), None), CirceValidator) val xa: Transactor[IO] = resources.getTransactorDefault[IO](jdbcUri, "postgres", "mysecretpassword") From 24864d652b188f1190fefcb8c77673de5ceb8c6b Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 10 Aug 2020 15:33:06 +0100 Subject: [PATCH 2/7] Cross compile for scala 2.12 and 2.13 --- .../snowplow/postgres/streaming/source.scala | 3 +-- project/BuildSettings.scala | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala index 220fb19..84421d2 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala @@ -140,8 +140,7 @@ object source { } def pubsubErrorHandler[F[_]: Sync](message: PubsubMessage, error: Throwable, ack: F[Unit], nack: F[Unit]): F[Unit] = { - val _ = error - val _ = nack + val _ = (error, nack) Sync[F].delay(println(s"Couldn't handle ${message.getData.toStringUtf8}")) *> ack } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index b2c8a46..b522a85 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -28,10 +28,14 @@ import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._ import scoverage.ScoverageKeys._ object BuildSettings { + val scala212 = "2.12.11" + val scala213 = "2.13.3" + lazy val projectSettings = Seq( organization := "com.snowplowanalytics", version := "0.1.0-rc3", - scalaVersion := "2.13.3", + scalaVersion := scala213, + crossScalaVersions := Seq(scala212, scala213), description := "Loading Snowplow enriched data into PostgreSQL in real-time", licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.html")), parallelExecution in Test := false From f913cedfa4b1ebc201ba0b94a8117e08a1c4a2e0 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 11 Aug 2020 10:24:26 +0100 Subject: [PATCH 3/7] Split sink- and source-specific code into different modules --- build.sbt | 17 ++- .../postgres/config/Base64Encoded.scala | 45 -------- .../postgres/config/LoaderConfig.scala | 74 +----------- .../snowplow/postgres/resources.scala | 2 +- .../postgres/shredding/transform.scala | 11 +- .../snowplow/postgres/streaming/data.scala | 46 ++++++++ .../snowplow/postgres/streaming/sink.scala | 16 +-- .../postgres/streaming/sinkspec.scala | 13 ++- .../snowplow/postgres/config/AppConfig.scala | 109 ++++++++++++++++++ .../snowplow/postgres/config/Cli.scala | 6 +- .../snowplow/postgres/loader/Main.scala | 21 ++-- .../snowplow/postgres/streaming/source.scala | 33 +----- .../config/CliSpec.scala | 5 +- project/BuildSettings.scala | 2 +- 14 files changed, 218 insertions(+), 182 deletions(-) delete mode 100644 modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala create mode 100644 modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala rename modules/{common => loader}/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala (94%) rename modules/{common => loader}/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala (82%) rename modules/{common/src/test/scala/com/snowplowanalytics/snowplow/postgres => loader/src/test/scala/com.snowplowanalytics.snowplow.postgres}/config/CliSpec.scala (93%) diff --git a/build.sbt b/build.sbt index 4da4082..0eb5dd3 100644 --- a/build.sbt +++ b/build.sbt @@ -20,18 +20,14 @@ lazy val common = project .settings(name := "snowplow-postgres") .enablePlugins(BuildInfoPlugin) .settings(BuildSettings.projectSettings) - .settings(BuildSettings.buildInfoSettings) .settings(BuildSettings.scoverageSettings) - .settings(BuildSettings.addExampleConfToTestCp) .settings(BuildSettings.mavenSettings) .settings( resolvers += Dependencies.SnowplowBintray, libraryDependencies ++= Seq( Dependencies.logger, Dependencies.postgres, - Dependencies.commons, Dependencies.catsEffect, - Dependencies.decline, Dependencies.circe, Dependencies.circeGeneric, Dependencies.circeExtras, @@ -41,8 +37,6 @@ lazy val common = project Dependencies.doobiePg, Dependencies.doobiePgCirce, Dependencies.doobieHikari, - Dependencies.fs2Aws, - Dependencies.fs2PubSub, Dependencies.analyticsSdk, Dependencies.badRows, Dependencies.schemaDdl, @@ -57,6 +51,17 @@ lazy val loader = project .settings(name := "snowplow-postgres-loader") .settings(BuildSettings.projectSettings) .settings(BuildSettings.dockerSettings) + .settings(BuildSettings.buildInfoSettings) + .settings(BuildSettings.addExampleConfToTestCp) + .settings( + libraryDependencies ++= Seq( + Dependencies.commons, + Dependencies.fs2Aws, + Dependencies.fs2PubSub, + Dependencies.decline, + Dependencies.specs2 + ) + ) .dependsOn(common) .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala deleted file mode 100644 index c56eec6..0000000 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Base64Encoded.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.postgres.config - -import java.util.Base64 - -import cats.syntax.either._ -import cats.data.ValidatedNel - -import io.circe.Json -import io.circe.parser.{ parse => jsonParse } - -import com.monovore.decline.Argument - -/** Base64-encoded JSON */ -case class Base64Encoded(json: Json) extends AnyVal - -object Base64Encoded { - def parse(string: String): Either[String, Base64Encoded] = - Either - .catchOnly[IllegalArgumentException](Base64.getDecoder.decode(string)) - .map(bytes => new String(bytes)) - .flatMap(str => jsonParse(str)) - .leftMap(e => s"Cannot parse ${string} as Base64-encoded JSON: ${e.getMessage}") - .map(json => Base64Encoded(json)) - - implicit def base64EncodedDeclineArg: Argument[Base64Encoded] = - new Argument[Base64Encoded] { - def read(string: String): ValidatedNel[String, Base64Encoded] = - Base64Encoded.parse(string).toValidatedNel - - def defaultMetavar: String = "base64" - } -} - diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index e3a06d4..0925d84 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -12,27 +12,13 @@ */ package com.snowplowanalytics.snowplow.postgres.config -import java.util.{UUID, Date} -import java.time.Instant - -import scala.jdk.CollectionConverters._ - import cats.syntax.either._ import io.circe.Decoder +import LoaderConfig.{JdbcUri, Purpose} import io.circe.generic.semiauto.deriveDecoder -import io.circe.generic.extras.Configuration -import io.circe.generic.extras.semiauto.deriveConfiguredDecoder - -import LoaderConfig.{JdbcUri, Source, Purpose} -import software.amazon.awssdk.regions.Region -import software.amazon.kinesis.common.InitialPositionInStream - -case class LoaderConfig(name: String, - id: UUID, - source: Source, - host: String, +case class LoaderConfig(host: String, port: Int, database: String, username: String, @@ -64,62 +50,6 @@ object LoaderConfig { } } - implicit val awsRegionDecoder: Decoder[Region] = - Decoder.decodeString.emap { s => - val allRegions = Region.regions().asScala.toSet.map((r: Region) => r.id()) - if (allRegions.contains(s)) Region.of(s).asRight - else s"Region $s is unknown, choose from [${allRegions.mkString(", ")}]".asLeft - } - - sealed trait InitPosition { - /** Turn it into fs2-aws-compatible structure */ - def unwrap: Either[InitialPositionInStream, Date] = this match { - case InitPosition.Latest => InitialPositionInStream.LATEST.asLeft - case InitPosition.TrimHorizon => InitialPositionInStream.TRIM_HORIZON.asLeft - case InitPosition.AtTimestamp(date) => Date.from(date).asRight - } - } - object InitPosition { - case object Latest extends InitPosition - case object TrimHorizon extends InitPosition - case class AtTimestamp(timestamp: Instant) extends InitPosition - - implicit val ioCirceInitPositionDecoder: Decoder[InitPosition] = - Decoder.decodeJson.emap { json => - json.asString match { - case Some("TRIM_HORIZON") => TrimHorizon.asRight - case Some("LATEST") => Latest.asRight - case Some(other) => - s"Initial position $other is unknown. Choose from LATEST and TRIM_HORIZEON. AT_TIMESTAMP must provide the timestamp".asLeft - case None => - val result = for { - root <- json.asObject.map(_.toMap) - atTimestamp <- root.get("AT_TIMESTAMP") - atTimestampObj <- atTimestamp.asObject.map(_.toMap) - timestampStr <- atTimestampObj.get("timestamp") - timestamp <- timestampStr.as[Instant].toOption - } yield AtTimestamp(timestamp) - result match { - case Some(atTimestamp) => atTimestamp.asRight - case None => "Initial position can be either LATEST or TRIM_HORIZON string or AT_TIMESTAMP object (e.g. 2020-06-03T00:00:00Z)".asLeft - } - } - } - } - - sealed trait Source extends Product with Serializable - object Source { - - case class Kinesis(appName: String, streamName: String, region: Region, initialPosition: InitPosition) extends Source - case class PubSub(projectId: String, subscriptionId: String) extends Source - - implicit val config: Configuration = - Configuration.default.withSnakeCaseConstructorNames - - implicit def ioCirceConfigSourceDecoder: Decoder[Source] = - deriveConfiguredDecoder[Source] - } - case class JdbcUri(host: String, port: Int, database: String, sslMode: String) { override def toString = s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode" diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala index 9c78fe0..daa3cd1 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala @@ -31,7 +31,7 @@ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.snowplow.postgres.api.State import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri -import com.snowplowanalytics.snowplow.postgres.streaming.source.BadData +import com.snowplowanalytics.snowplow.postgres.streaming.data.BadData object resources { diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala index ea4948c..dc6767e 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala @@ -34,15 +34,14 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, Schema} import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload} +import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload, Processor} import Entity.Column -import com.snowplowanalytics.snowplow.postgres.config.Cli object transform { val Atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) /** Transform the whole `Event` (canonical and JSONs) into list of independent entities ready to be inserted */ - def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], event: Event): EitherT[F, BadRow, List[Entity]] = { + def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], processor: Processor, event: Event): EitherT[F, BadRow, List[Entity]] = { val entities = event.contexts.data ++ event.derived_contexts.data ++ event.unstruct_event.data.toList val wholeEvent = entities .parTraverse(shredJson(client)) @@ -52,7 +51,7 @@ object transform { (shreddedEntities, atomic) => atomic :: shreddedEntities.map(addMetadata(event.event_id, event.collector_tstamp)) } } - EitherT(wholeEvent).leftMap[BadRow](buildBadRow(event)) + EitherT(wholeEvent).leftMap[BadRow](buildBadRow(processor, event)) } def addMetadata(eventId: UUID, tstamp: Instant)(entity: Entity): Entity = { @@ -282,7 +281,7 @@ object transform { (columnName, dataType, value) } - private def buildBadRow(event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) = - BadRow.LoaderIgluError(Cli.processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) + private def buildBadRow(processor: Processor, event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) = + BadRow.LoaderIgluError(processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala new file mode 100644 index 0000000..42ccbe5 --- /dev/null +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/data.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.streaming + +import com.snowplowanalytics.iglu.core.SelfDescribingData + +import io.circe.Json + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.BadRow + +object data { + + /** Kind of data flowing through the Loader */ + sealed trait Data extends Product with Serializable { + def snowplow: Boolean = this match { + case _: Data.Snowplow => true + case _: Data.SelfDescribing => false + } + } + + object Data { + case class Snowplow(data: Event) extends Data + case class SelfDescribing(data: SelfDescribingData[Json]) extends Data + } + + /** Data that for some reasons cannot be inserted into DB */ + sealed trait BadData extends Throwable with Product with Serializable + object BadData { + /** Typical Snowplow bad row (Loader Iglu Error etc) */ + case class BadEnriched(data: BadRow) extends BadData + /** Non-enriched error */ + case class BadJson(payload: String, error: String) extends BadData + } +} + diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala index 0038a4b..eee18a7 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala @@ -28,11 +28,10 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.Client -import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} +import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor} import com.snowplowanalytics.snowplow.postgres.api.{State, DB} -import com.snowplowanalytics.snowplow.postgres.config.Cli.processor import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, transform} -import com.snowplowanalytics.snowplow.postgres.streaming.source.{Data, BadData} +import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData} object sink { @@ -46,11 +45,13 @@ object sink { * @param state mutable Loader state * @param badQueue queue where all unsucessful actions can unload its results * @param client Iglu Client + * @param processor The actor processing these events */ def goodSink[F[_]: Concurrent: Clock: DB](state: State[F], badQueue: Queue[F, BadData], - client: Client[F, Json]): Pipe[F, Data, Unit] = - _.parEvalMapUnordered(32)(sinkPayload(state, badQueue, client)) + client: Client[F, Json], + processor: Processor): Pipe[F, Data, Unit] = + _.parEvalMapUnordered(32)(sinkPayload(state, badQueue, client, processor)) /** Sink bad data coming directly into the `Pipe` and data coming from `badQueue` */ def badSink[F[_]: Concurrent](badQueue: Queue[F, BadData]): Pipe[F, BadData, Unit] = @@ -62,12 +63,13 @@ object sink { /** Implementation for [[goodSink]] */ def sinkPayload[F[_]: Sync: Clock: DB](state: State[F], badQueue: Queue[F, BadData], - client: Client[F, Json])(payload: Data): F[Unit] = { + client: Client[F, Json], + processor: Processor)(payload: Data): F[Unit] = { val result = for { entities <- payload match { case Data.Snowplow(event) => transform - .shredEvent[F](client, event) + .shredEvent[F](client, processor, event) .leftMap(bad => BadData.BadEnriched(bad)) case Data.SelfDescribing(json) => transform diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala index 9d1ac93..6d22c5e 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala @@ -27,13 +27,18 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.postgres.Database import com.snowplowanalytics.snowplow.postgres.api.{State, DB} -import com.snowplowanalytics.snowplow.postgres.streaming.source.{Data, BadData} +import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData} + class sinkspec extends Database { import Database._ + val processor = Processor("pgloader", "test") + + "goodSink" should { "sink a single good event" >> { val line = "snowplowweb\tweb\t2018-12-18 15:07:17.970\t2016-03-29 07:28:18.611\t2016-03-29 07:28:18.634\tpage_view\t11cdec7b-4cbd-4aa4-a4c9-3874ab9663d4\t\tsnplow6\tjs-2.6.0\tssc-0.6.0-kinesis\tspark-1.16.0-common-0.35.0\t34df2c48bc170c87befb441732a94196\t372d1f2983860eefd262b58e6592dfbc\t80546dc70f4a91f1283c4b6247e31bcf\t26e6412a2421eb923d9d40258ca9ca69\t1\t3a12e8b8e3e91a4d092b833d583c7e30\tDK\t82\tOdder\t8300\t42.0001\t42.003\tCentral Jutland\tTDC Danmark\tTDC Danmark\t\t\thttp://snowplowanalytics.com/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\tMarket basket analysis - identifying products and content that go well together – Snowplow\thttp://snowplowanalytics.com/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\thttp\tsnowplowanalytics.com\t80\t/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\thttp\tsnowplowanalytics.com\t80\t/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\tinternal\t\t\t\t\t\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0\",\"data\":{\"id\":\"05862d26-0dde-4d7a-a494-fc9aae283d23\"}},{\"schema\":\"iglu:org.schema/WebPage/jsonschema/1-0-0\",\"data\":{\"genre\":\"documentation\",\"inLanguage\":\"en-US\"}},{\"schema\":\"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0\",\"data\":{\"navigationStart\":1459236496534,\"unloadEventStart\":1459236496838,\"unloadEventEnd\":1459236496838,\"redirectStart\":0,\"redirectEnd\":0,\"fetchStart\":1459236496534,\"domainLookupStart\":1459236496534,\"domainLookupEnd\":1459236496534,\"connectStart\":1459236496534,\"connectEnd\":1459236496534,\"secureConnectionStart\":0,\"requestStart\":1459236496580,\"responseStart\":1459236496834,\"responseEnd\":1459236496844,\"domLoading\":1459236496853,\"domInteractive\":1459236497780,\"domContentLoadedEventStart\":1459236497780,\"domContentLoadedEventEnd\":1459236498038,\"domComplete\":0,\"loadEventStart\":0,\"loadEventEnd\":0,\"chromeFirstPaint\":1459236498203}}]}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tMozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36\tChrome 49\tChrome\t49.0.2623.87\tBrowser\tWEBKIT\ten-US\t1\t1\t0\t0\t0\t0\t0\t0\t0\t1\t24\t1920\t1075\tWindows 7\tWindows\tMicrosoft Corporation\tEurope/Berlin\tComputer\t0\t1920\t1200\tUTF-8\t1903\t11214\t\t\t\t\t\t\t\tEurope/Copenhagen\t\t\t\t2016-03-29 07:28:18.636\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0\",\"data\":{\"useragentFamily\":\"Chrome\",\"useragentMajor\":\"49\",\"useragentMinor\":\"0\",\"useragentPatch\":\"2623\",\"useragentVersion\":\"Chrome 49.0.2623\",\"osFamily\":\"Windows\",\"osMajor\":\"7\",\"osMinor\":null,\"osPatch\":null,\"osPatchMinor\":null,\"osVersion\":\"Windows 7\",\"deviceFamily\":\"Other\"}}]}\t88c23330-ac4d-4c82-8a18-aa83c1e0c163\t2016-03-29 07:28:18.609\tcom.snowplowanalytics.snowplow\tpage_view\tjsonschema\t1-0-0\tcab5ba164038f31d8e10befc4eb199df\t" @@ -45,7 +50,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action eventIds <- query.action uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action } yield (eventIds, uaParserCtxs) @@ -68,7 +73,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action eventIds <- query.action rows <- count("com_getvero_bounced_1").action } yield (eventIds, rows) @@ -106,7 +111,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient)).compile.drain.action + _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action rows <- count("me_chuwy_pg_test_1").action table <- describeTable("me_chuwy_pg_test_1").action } yield (rows, table) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala new file mode 100644 index 0000000..ce62fd1 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.config + +import java.util.{UUID, Date} +import java.time.Instant + +import scala.jdk.CollectionConverters._ + +import cats.syntax.either._ + +import io.circe.Decoder +import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.semiauto.deriveConfiguredDecoder + +import AppConfig.Source +import LoaderConfig.Purpose + +import software.amazon.awssdk.regions.Region +import software.amazon.kinesis.common.InitialPositionInStream + +case class AppConfig(name: String, + id: UUID, + source: Source, + host: String, + port: Int, + database: String, + username: String, + password: String, // TODO: can be EC2 store + sslMode: String, + schema: String, + purpose: Purpose) { + def getLoaderConfig: LoaderConfig = + LoaderConfig(host, port, database, username, password, sslMode, schema, purpose) +} + +object AppConfig { + + implicit val awsRegionDecoder: Decoder[Region] = + Decoder.decodeString.emap { s => + val allRegions = Region.regions().asScala.toSet.map((r: Region) => r.id()) + if (allRegions.contains(s)) Region.of(s).asRight + else s"Region $s is unknown, choose from [${allRegions.mkString(", ")}]".asLeft + } + + sealed trait InitPosition { + /** Turn it into fs2-aws-compatible structure */ + def unwrap: Either[InitialPositionInStream, Date] = this match { + case InitPosition.Latest => InitialPositionInStream.LATEST.asLeft + case InitPosition.TrimHorizon => InitialPositionInStream.TRIM_HORIZON.asLeft + case InitPosition.AtTimestamp(date) => Date.from(date).asRight + } + } + object InitPosition { + case object Latest extends InitPosition + case object TrimHorizon extends InitPosition + case class AtTimestamp(timestamp: Instant) extends InitPosition + + implicit val ioCirceInitPositionDecoder: Decoder[InitPosition] = + Decoder.decodeJson.emap { json => + json.asString match { + case Some("TRIM_HORIZON") => TrimHorizon.asRight + case Some("LATEST") => Latest.asRight + case Some(other) => + s"Initial position $other is unknown. Choose from LATEST and TRIM_HORIZEON. AT_TIMESTAMP must provide the timestamp".asLeft + case None => + val result = for { + root <- json.asObject.map(_.toMap) + atTimestamp <- root.get("AT_TIMESTAMP") + atTimestampObj <- atTimestamp.asObject.map(_.toMap) + timestampStr <- atTimestampObj.get("timestamp") + timestamp <- timestampStr.as[Instant].toOption + } yield AtTimestamp(timestamp) + result match { + case Some(atTimestamp) => atTimestamp.asRight + case None => "Initial position can be either LATEST or TRIM_HORIZON string or AT_TIMESTAMP object (e.g. 2020-06-03T00:00:00Z)".asLeft + } + } + } + } + + sealed trait Source extends Product with Serializable + object Source { + + case class Kinesis(appName: String, streamName: String, region: Region, initialPosition: InitPosition) extends Source + case class PubSub(projectId: String, subscriptionId: String) extends Source + + implicit val config: Configuration = + Configuration.default.withSnakeCaseConstructorNames + + implicit def ioCirceConfigSourceDecoder: Decoder[Source] = + deriveConfiguredDecoder[Source] + } + + implicit def ioCirceConfigDecoder: Decoder[AppConfig] = + deriveDecoder[AppConfig] + +} diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala similarity index 94% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala index 1e8e6f7..50f0b8a 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala @@ -35,7 +35,7 @@ import com.monovore.decline._ import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo -case class Cli[F[_]](postgres: LoaderConfig, iglu: Client[F, Json], debug: Boolean) +case class Cli[F[_]](config: AppConfig, iglu: Client[F, Json], debug: Boolean) object Cli { @@ -55,8 +55,8 @@ object Cli { configJson <- PathOrJson.load(rawConfig.config) configData <- SelfDescribingData.parse(configJson).leftMap(e => s"Configuration JSON is not self-describing, ${e.message(configJson.noSpaces)}").toEitherT[F] _ <- igluClient.check(configData).leftMap(e => s"Iglu validation failed with following error\n: ${e.asJson.spaces2}") - pgConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") - } yield Cli(pgConfig, igluClient, rawConfig.debug) + appConfig <- configData.data.as[AppConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") + } yield Cli(appConfig, igluClient, rawConfig.debug) } /** Config files for Loader can be passed either as FS path diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala index f075a69..94609e0 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -16,30 +16,35 @@ import cats.effect.{ExitCode, IO, IOApp} import doobie.util.log.LogHandler +import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.postgres.api.DB import com.snowplowanalytics.snowplow.postgres.config.Cli import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo import com.snowplowanalytics.snowplow.postgres.resources import com.snowplowanalytics.snowplow.postgres.storage.utils import com.snowplowanalytics.snowplow.postgres.streaming.{sink, source} object Main extends IOApp { + + val processor = Processor(BuildInfo.name, BuildInfo.version) + def run(args: List[String]): IO[ExitCode] = Cli.parse[IO](args).value.flatMap { - case Right(Cli(postgres, iglu, debug)) => + case Right(Cli(appConfig, iglu, debug)) => val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop - resources.initialize[IO](postgres, logger, iglu).use { + resources.initialize[IO](appConfig.getLoaderConfig, logger, iglu).use { case (blocker, xa, state, badQueue) => - source.getSource[IO](blocker, postgres.purpose, postgres.source) match { + source.getSource[IO](blocker, appConfig.purpose, appConfig.source) match { case Right(dataStream) => - val meta = postgres.purpose.snowplow - implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, postgres.schema, meta) + val meta = appConfig.purpose.snowplow + implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, appConfig.schema, meta) for { - _ <- postgres.purpose match { - case Purpose.Enriched => utils.prepare[IO](postgres.schema, xa, logger) + _ <- appConfig.purpose match { + case Purpose.Enriched => utils.prepare[IO](appConfig.schema, xa, logger) case Purpose.SelfDescribing => IO.unit } - goodSink = sink.goodSink[IO](state, badQueue, iglu) + goodSink = sink.goodSink[IO](state, badQueue, iglu, processor) badSink = sink.badSink[IO](badQueue) s = dataStream.observeEither(badSink, goodSink) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala similarity index 82% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala index 84421d2..a8fd2e9 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala @@ -32,8 +32,9 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} -import com.snowplowanalytics.snowplow.postgres.config.{LoaderConfig, Cli} +import com.snowplowanalytics.snowplow.postgres.config.{AppConfig, Cli} import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.streaming.data.{BadData, Data} import com.google.pubsub.v1.PubsubMessage import com.permutive.pubsub.consumer.Model.{Subscription, ProjectId} @@ -50,17 +51,17 @@ object source { * @return either error or stream of parsed payloads */ def getSource[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, - purpose: LoaderConfig.Purpose, - config: LoaderConfig.Source) = + purpose: Purpose, + config: AppConfig.Source) = config match { - case LoaderConfig.Source.Kinesis(appName, streamName, region, position) => + case AppConfig.Source.Kinesis(appName, streamName, region, position) => KinesisConsumerSettings.apply(streamName, appName, region, initialPositionInStream = position.unwrap) match { case Right(settings) => readFromKinesisStream[F](settings).evalMap(record => record.checkpoint.as(parseRecord(purpose, record))).asRight case Left(error) => error.asLeft } - case LoaderConfig.Source.PubSub(projectId, subscriptionId) => + case AppConfig.Source.PubSub(projectId, subscriptionId) => implicit val decoder: MessageDecoder[Either[BadData, Data]] = pubsubDataDecoder(purpose) val project = ProjectId(projectId) val subscription = Subscription(subscriptionId) @@ -109,28 +110,6 @@ object source { .flatMap(json => SelfDescribingData.parse[Json](json).leftMap(_.message(json.noSpaces))) .leftMap(error => BadData.BadJson(s, error)) - /** Kind of data flowing through the Loader */ - sealed trait Data extends Product with Serializable { - def snowplow: Boolean = this match { - case _: Data.Snowplow => true - case _: Data.SelfDescribing => false - } - } - - object Data { - case class Snowplow(data: Event) extends Data - case class SelfDescribing(data: SelfDescribingData[Json]) extends Data - } - - /** Data that for some reasons cannot be inserted into DB */ - sealed trait BadData extends Throwable with Product with Serializable - object BadData { - /** Typical Snowplow bad row (Loader Iglu Error etc) */ - case class BadEnriched(data: BadRow) extends BadData - /** Non-enriched error */ - case class BadJson(payload: String, error: String) extends BadData - } - def pubsubDataDecoder(purpose: Purpose): MessageDecoder[Either[BadData, Data]] = purpose match { case Purpose.Enriched => diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala similarity index 93% rename from modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala rename to modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala index b20cdd6..439b532 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/config/CliSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala @@ -17,7 +17,8 @@ import java.util.UUID import cats.effect.{IO, Clock} -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{Source, InitPosition, Purpose} +import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.config.AppConfig.{Source, InitPosition} import org.specs2.mutable.Specification import software.amazon.awssdk.regions.Region @@ -31,7 +32,7 @@ class CliSpec extends Specification { val resolver = Paths.get(getClass.getResource("/resolver.json").toURI) val argv = List("--config", config.toString, "--resolver", resolver.toString) - val expected = LoaderConfig( + val expected = AppConfig( "Acme Ltd. Snowplow Postgres", UUID.fromString("5c5e4353-4eeb-43da-98f8-2de6dc7fa947"), Source.Kinesis("acme-postgres-loader", "enriched-events", Region.EU_CENTRAL_1, InitPosition.TrimHorizon), diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index b522a85..cd0ea85 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -33,7 +33,7 @@ object BuildSettings { lazy val projectSettings = Seq( organization := "com.snowplowanalytics", - version := "0.1.0-rc3", + version := "0.1.0-rc4", scalaVersion := scala213, crossScalaVersions := Seq(scala212, scala213), description := "Loading Snowplow enriched data into PostgreSQL in real-time", From 5313d95f3242a7e38ff81a7ff1f6b9e9edd9bc7d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 11 Aug 2020 16:57:37 +0100 Subject: [PATCH 4/7] Replaces the queue with pure pipes --- .../snowplow/postgres/resources.scala | 6 +-- .../snowplow/postgres/streaming/sink.scala | 43 ++++++++----------- .../postgres/streaming/sinkspec.scala | 12 ++---- .../snowplow/postgres/loader/Main.scala | 8 ++-- 4 files changed, 27 insertions(+), 42 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala index daa3cd1..9a831c9 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala @@ -22,8 +22,6 @@ import doobie.util.ExecutionContexts import doobie.util.log.LogHandler import doobie.util.transactor.Transactor -import fs2.concurrent.Queue - import io.circe.Json import com.snowplowanalytics.iglu.client.Client @@ -31,7 +29,6 @@ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.snowplow.postgres.api.State import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri -import com.snowplowanalytics.snowplow.postgres.streaming.data.BadData object resources { @@ -41,7 +38,6 @@ object resources { iglu: Client[F, Json]) = for { blocker <- Blocker[F] - badQueue <- Resource.liftF(Queue.bounded[F, BadData](128)) xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker) keysF = for { ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate) @@ -57,7 +53,7 @@ object resources { Sync[F].pure(state) } state <- Resource.liftF(initState) - } yield (blocker, xa, state, badQueue) + } yield (blocker, xa, state) /** Get a HikariCP transactor */ def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] = diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala index eee18a7..4c6f0cd 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala @@ -12,12 +12,12 @@ */ package com.snowplowanalytics.snowplow.postgres.streaming +import cats.data.EitherT import cats.implicits._ import cats.effect.{Sync, Clock, Concurrent} import fs2.Pipe -import fs2.concurrent.Queue import doobie._ import doobie.implicits._ @@ -40,31 +40,31 @@ object sink { /** * Sink good events into Postgres. During sinking, payloads go through all transformation steps * and checking the state of the DB itself. - * Events that could not be transformed (due Iglu errors or DB unavailability) are sent back - * to `badQueue` + * Events that could not be transformed (due Iglu errors or DB unavailability) are emitted from + * the pipe * @param state mutable Loader state - * @param badQueue queue where all unsucessful actions can unload its results * @param client Iglu Client * @param processor The actor processing these events */ def goodSink[F[_]: Concurrent: Clock: DB](state: State[F], - badQueue: Queue[F, BadData], client: Client[F, Json], - processor: Processor): Pipe[F, Data, Unit] = - _.parEvalMapUnordered(32)(sinkPayload(state, badQueue, client, processor)) - - /** Sink bad data coming directly into the `Pipe` and data coming from `badQueue` */ - def badSink[F[_]: Concurrent](badQueue: Queue[F, BadData]): Pipe[F, BadData, Unit] = - _.merge(badQueue.dequeue).evalMap { + processor: Processor): Pipe[F, Data, BadData] = + _.parEvalMapUnordered(32)(sinkPayload(state, client, processor)) + .collect { + case Left(badData) => badData + } + + /** Sink bad data coming directly into the `Pipe` */ + def badSink[F[_]: Concurrent]: Pipe[F, BadData, Unit] = + _.evalMap { case BadData.BadEnriched(row) => Sync[F].delay(println(row.compact)) case BadData.BadJson(payload, error) => Sync[F].delay(println(s"Cannot parse $payload. $error")) } /** Implementation for [[goodSink]] */ def sinkPayload[F[_]: Sync: Clock: DB](state: State[F], - badQueue: Queue[F, BadData], client: Client[F, Json], - processor: Processor)(payload: Data): F[Unit] = { + processor: Processor)(payload: Data): F[Either[BadData, Unit]] = { val result = for { entities <- payload match { case Data.Snowplow(event) => @@ -77,25 +77,18 @@ object sink { .map(entity => List(entity)) .leftMap(errors => BadData.BadJson(json.normalize.noSpaces, errors.toString)) } - insert = DB.process(entities, state).attempt.flatMap { - case Right(_) => Sync[F].unit - case Left(error) => payload match { + insert <- EitherT(DB.process(entities, state).attempt).leftMap { + case error => payload match { case Data.Snowplow(event) => val badRow = BadRow.LoaderRuntimeError(processor, error.getMessage, Payload.LoaderPayload(event)) - val pgBadRow = BadData.BadEnriched(badRow) - badQueue.enqueue1(pgBadRow) + BadData.BadEnriched(badRow) case Data.SelfDescribing(json) => - val pgBadRow = BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}") - badQueue.enqueue1(pgBadRow) - + BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: ${error.getMessage}") } } } yield insert - result.value.flatMap { - case Right(action) => action - case Left(error) => badQueue.enqueue1(error) - } + result.value } /** diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala index 6d22c5e..ef6f70b 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala @@ -17,7 +17,6 @@ import java.util.UUID import cats.effect.IO import fs2.Stream -import fs2.concurrent.Queue import io.circe.Json import io.circe.literal._ @@ -30,7 +29,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.postgres.Database import com.snowplowanalytics.snowplow.postgres.api.{State, DB} -import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData} +import com.snowplowanalytics.snowplow.postgres.streaming.data.Data class sinkspec extends Database { @@ -49,8 +48,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action eventIds <- query.action uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action } yield (eventIds, uaParserCtxs) @@ -72,8 +70,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action eventIds <- query.action rows <- count("com_getvero_bounced_1").action } yield (eventIds, rows) @@ -110,8 +107,7 @@ class sinkspec extends Database { val action = for { state <- State.init[IO](List(), igluClient.resolver) - queue <- Queue.bounded[IO, BadData](1).action - _ <- stream.through(sink.goodSink(state, queue, igluClient, processor)).compile.drain.action + _ <- stream.through(sink.goodSink(state, igluClient, processor)).compile.drain.action rows <- count("me_chuwy_pg_test_1").action table <- describeTable("me_chuwy_pg_test_1").action } yield (rows, table) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala index 94609e0..578cf1d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -34,7 +34,7 @@ object Main extends IOApp { case Right(Cli(appConfig, iglu, debug)) => val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop resources.initialize[IO](appConfig.getLoaderConfig, logger, iglu).use { - case (blocker, xa, state, badQueue) => + case (blocker, xa, state) => source.getSource[IO](blocker, appConfig.purpose, appConfig.source) match { case Right(dataStream) => val meta = appConfig.purpose.snowplow @@ -44,9 +44,9 @@ object Main extends IOApp { case Purpose.Enriched => utils.prepare[IO](appConfig.schema, xa, logger) case Purpose.SelfDescribing => IO.unit } - goodSink = sink.goodSink[IO](state, badQueue, iglu, processor) - badSink = sink.badSink[IO](badQueue) - s = dataStream.observeEither(badSink, goodSink) + goodSink = sink.goodSink[IO](state, iglu, processor) + badSink = sink.badSink[IO] + s = dataStream.observeEither(badSink, goodSink.andThen(_.through(badSink))) _ <- s.compile.drain } yield ExitCode.Success From 1a0650629974841fbbeff5d1ac660bc36a4a8c85 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 12 Aug 2020 08:28:53 +0100 Subject: [PATCH 5/7] Adjust separation of DB and non-DB config classes --- .../{LoaderConfig.scala => DBConfig.scala} | 33 +++---------------- .../snowplow/postgres/resources.scala | 6 ++-- .../snowplow/postgres/Database.scala | 2 +- .../snowplow/postgres/config/Cli.scala | 4 +-- .../{AppConfig.scala => LoaderConfig.scala} | 33 ++++++++++++++----- .../snowplow/postgres/loader/Main.scala | 14 ++++---- .../snowplow/postgres/streaming/source.scala | 8 ++--- .../config/CliSpec.scala | 5 ++- 8 files changed, 48 insertions(+), 57 deletions(-) rename modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/{LoaderConfig.scala => DBConfig.scala} (56%) rename modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/{AppConfig.scala => LoaderConfig.scala} (81%) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala similarity index 56% rename from modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala rename to modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala index 0925d84..9a7769e 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala @@ -12,50 +12,25 @@ */ package com.snowplowanalytics.snowplow.postgres.config -import cats.syntax.either._ +import DBConfig.JdbcUri -import io.circe.Decoder -import LoaderConfig.{JdbcUri, Purpose} -import io.circe.generic.semiauto.deriveDecoder - -case class LoaderConfig(host: String, +case class DBConfig(host: String, port: Int, database: String, username: String, password: String, // TODO: can be EC2 store sslMode: String, - schema: String, - purpose: Purpose) { + schema: String) { def getJdbc: JdbcUri = JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-')) } -object LoaderConfig { +object DBConfig { - sealed trait Purpose extends Product with Serializable { - def snowplow: Boolean = this match { - case Purpose.Enriched => true - case Purpose.SelfDescribing => false - } - } - object Purpose { - case object Enriched extends Purpose - case object SelfDescribing extends Purpose - - implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] = - Decoder.decodeString.emap { - case "ENRICHED_EVENTS" => Enriched.asRight - case "JSON" => SelfDescribing.asRight - case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft - } - } case class JdbcUri(host: String, port: Int, database: String, sslMode: String) { override def toString = s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode" } - implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] = - deriveDecoder[LoaderConfig] - } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala index 9a831c9..87bc01a 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala @@ -27,13 +27,13 @@ import io.circe.Json import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.snowplow.postgres.api.State -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri +import com.snowplowanalytics.snowplow.postgres.config.DBConfig +import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri object resources { /** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */ - def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: LoaderConfig, + def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, logger: LogHandler, iglu: Client[F, Json]) = for { diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala index 974d7f1..7be64a6 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala @@ -23,7 +23,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry.{HttpConne import com.snowplowanalytics.iglu.client.validator.CirceValidator import com.snowplowanalytics.snowplow.badrows.FailureDetails -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri +import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri trait Database extends Specification with BeforeAfterEach { import Database._ diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala index 50f0b8a..0528434 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/Cli.scala @@ -35,7 +35,7 @@ import com.monovore.decline._ import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo -case class Cli[F[_]](config: AppConfig, iglu: Client[F, Json], debug: Boolean) +case class Cli[F[_]](config: LoaderConfig, iglu: Client[F, Json], debug: Boolean) object Cli { @@ -55,7 +55,7 @@ object Cli { configJson <- PathOrJson.load(rawConfig.config) configData <- SelfDescribingData.parse(configJson).leftMap(e => s"Configuration JSON is not self-describing, ${e.message(configJson.noSpaces)}").toEitherT[F] _ <- igluClient.check(configData).leftMap(e => s"Iglu validation failed with following error\n: ${e.asJson.spaces2}") - appConfig <- configData.data.as[AppConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") + appConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") } yield Cli(appConfig, igluClient, rawConfig.debug) } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala similarity index 81% rename from modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala rename to modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index ce62fd1..4397817 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/AppConfig.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -24,13 +24,12 @@ import io.circe.generic.semiauto.deriveDecoder import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredDecoder -import AppConfig.Source -import LoaderConfig.Purpose +import LoaderConfig.{Purpose, Source} import software.amazon.awssdk.regions.Region import software.amazon.kinesis.common.InitialPositionInStream -case class AppConfig(name: String, +case class LoaderConfig(name: String, id: UUID, source: Source, host: String, @@ -41,11 +40,11 @@ case class AppConfig(name: String, sslMode: String, schema: String, purpose: Purpose) { - def getLoaderConfig: LoaderConfig = - LoaderConfig(host, port, database, username, password, sslMode, schema, purpose) + def getDBConfig: DBConfig = + DBConfig(host, port, database, username, password, sslMode, schema) } -object AppConfig { +object LoaderConfig { implicit val awsRegionDecoder: Decoder[Region] = Decoder.decodeString.emap { s => @@ -90,6 +89,24 @@ object AppConfig { } } + sealed trait Purpose extends Product with Serializable { + def snowplow: Boolean = this match { + case Purpose.Enriched => true + case Purpose.SelfDescribing => false + } + } + object Purpose { + case object Enriched extends Purpose + case object SelfDescribing extends Purpose + + implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] = + Decoder.decodeString.emap { + case "ENRICHED_EVENTS" => Enriched.asRight + case "JSON" => SelfDescribing.asRight + case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft + } + } + sealed trait Source extends Product with Serializable object Source { @@ -103,7 +120,7 @@ object AppConfig { deriveConfiguredDecoder[Source] } - implicit def ioCirceConfigDecoder: Decoder[AppConfig] = - deriveDecoder[AppConfig] + implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] = + deriveDecoder[LoaderConfig] } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala index 578cf1d..75da1e3 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -31,17 +31,17 @@ object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = Cli.parse[IO](args).value.flatMap { - case Right(Cli(appConfig, iglu, debug)) => + case Right(Cli(loaderConfig, iglu, debug)) => val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop - resources.initialize[IO](appConfig.getLoaderConfig, logger, iglu).use { + resources.initialize[IO](loaderConfig.getDBConfig, logger, iglu).use { case (blocker, xa, state) => - source.getSource[IO](blocker, appConfig.purpose, appConfig.source) match { + source.getSource[IO](blocker, loaderConfig.purpose, loaderConfig.source) match { case Right(dataStream) => - val meta = appConfig.purpose.snowplow - implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, appConfig.schema, meta) + val meta = loaderConfig.purpose.snowplow + implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, loaderConfig.schema, meta) for { - _ <- appConfig.purpose match { - case Purpose.Enriched => utils.prepare[IO](appConfig.schema, xa, logger) + _ <- loaderConfig.purpose match { + case Purpose.Enriched => utils.prepare[IO](loaderConfig.schema, xa, logger) case Purpose.SelfDescribing => IO.unit } goodSink = sink.goodSink[IO](state, iglu, processor) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala index a8fd2e9..28e8e4b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala @@ -32,7 +32,7 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} -import com.snowplowanalytics.snowplow.postgres.config.{AppConfig, Cli} +import com.snowplowanalytics.snowplow.postgres.config.{LoaderConfig, Cli} import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose import com.snowplowanalytics.snowplow.postgres.streaming.data.{BadData, Data} @@ -52,16 +52,16 @@ object source { */ def getSource[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, purpose: Purpose, - config: AppConfig.Source) = + config: LoaderConfig.Source) = config match { - case AppConfig.Source.Kinesis(appName, streamName, region, position) => + case LoaderConfig.Source.Kinesis(appName, streamName, region, position) => KinesisConsumerSettings.apply(streamName, appName, region, initialPositionInStream = position.unwrap) match { case Right(settings) => readFromKinesisStream[F](settings).evalMap(record => record.checkpoint.as(parseRecord(purpose, record))).asRight case Left(error) => error.asLeft } - case AppConfig.Source.PubSub(projectId, subscriptionId) => + case LoaderConfig.Source.PubSub(projectId, subscriptionId) => implicit val decoder: MessageDecoder[Either[BadData, Data]] = pubsubDataDecoder(purpose) val project = ProjectId(projectId) val subscription = Subscription(subscriptionId) diff --git a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala index 439b532..1aefd45 100644 --- a/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala +++ b/modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala @@ -17,8 +17,7 @@ import java.util.UUID import cats.effect.{IO, Clock} -import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose -import com.snowplowanalytics.snowplow.postgres.config.AppConfig.{Source, InitPosition} +import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{Purpose, Source, InitPosition} import org.specs2.mutable.Specification import software.amazon.awssdk.regions.Region @@ -32,7 +31,7 @@ class CliSpec extends Specification { val resolver = Paths.get(getClass.getResource("/resolver.json").toURI) val argv = List("--config", config.toString, "--resolver", resolver.toString) - val expected = AppConfig( + val expected = LoaderConfig( "Acme Ltd. Snowplow Postgres", UUID.fromString("5c5e4353-4eeb-43da-98f8-2de6dc7fa947"), Source.Kinesis("acme-postgres-loader", "enriched-events", Region.EU_CENTRAL_1, InitPosition.TrimHorizon), From 00c1e1251e14227b57cd0b4b1982a3d1cd87bb5e Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 13 Aug 2020 15:53:26 +0100 Subject: [PATCH 6/7] Shredded events carry whether they include meta columns --- .../snowplow/postgres/api/DB.scala | 23 +++++++++++-------- .../postgres/shredding/Shredded.scala | 20 ++++++++++++++++ .../postgres/shredding/transform.scala | 9 ++++---- .../snowplow/postgres/streaming/sink.scala | 1 - .../postgres/streaming/sinkspec.scala | 6 ++--- .../postgres/config/LoaderConfig.scala | 7 +----- .../snowplow/postgres/loader/Main.scala | 3 +-- 7 files changed, 43 insertions(+), 26 deletions(-) create mode 100644 modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala index 69a2ff2..9969070 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/api/DB.scala @@ -27,14 +27,14 @@ import com.snowplowanalytics.iglu.client.Resolver import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList -import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, schema} +import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema} import com.snowplowanalytics.snowplow.postgres.storage.ddl import com.snowplowanalytics.snowplow.postgres.streaming.sink trait DB[F[_]] { def insert(event: List[Entity]): F[Unit] def alter(schemaKey: SchemaKey): F[Unit] - def create(schemaKey: SchemaKey): F[Unit] + def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] def getSchemaList(schemaKey: SchemaKey): F[SchemaList] } @@ -43,14 +43,18 @@ object DB { def apply[F[_]](implicit ev: DB[F]): DB[F] = ev - def process[F[_]](event: List[Entity], state: State[F]) + def process[F[_]](shredded: Shredded, state: State[F]) (implicit D: DB[F], B: Bracket[F, Throwable]): F[Unit] = { - val insert = D.insert(event) + val (includeMeta, entities) = shredded match { + case Shredded.ShreddedSnowplow(atomic, entities) => (true, atomic :: entities) + case Shredded.ShreddedSelfDescribing(entity) => (false, List(entity)) + } + val insert = D.insert(entities) // Mutate table and Loader's mutable variable. Only for locked state! def mutate(missing: Set[SchemaKey], outdated: Set[SchemaKey]): F[Unit] = for { - _ <- missing.toList.traverse(D.create) // Create missing tables if any + _ <- missing.toList.traverse(key => D.create(key, includeMeta)) // Create missing tables if any _ <- outdated.toList.traverse(D.alter) // Updated outdated tables if any _ <- (missing ++ outdated).toList.traverse_ { entity => for { // Update state with new schemas @@ -60,7 +64,7 @@ object DB { } } yield () - state.checkAndRun(_.checkEvent(event), insert, mutate) + state.checkAndRun(_.checkEvent(entities), insert, mutate) } @@ -87,8 +91,7 @@ object DB { def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], logger: LogHandler, - schemaName: String, - meta: Boolean): DB[F] = new DB[F] { + schemaName: String): DB[F] = new DB[F] { def insert(event: List[Entity]): F[Unit] = event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa) @@ -97,8 +100,8 @@ object DB { rethrow(result.semiflatMap(_.transact(xa))) } - def create(schemaKey: SchemaKey): F[Unit] = { - val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, meta) + def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] = { + val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, includeMeta) rethrow(result.semiflatMap(_.transact(xa))) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala new file mode 100644 index 0000000..3042bd9 --- /dev/null +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/Shredded.scala @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.shredding + +sealed trait Shredded + +object Shredded { + case class ShreddedSnowplow(event: Entity, entities: List[Entity]) extends Shredded + case class ShreddedSelfDescribing(entity: Entity) extends Shredded +} diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala index dc6767e..a7f1062 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala @@ -36,19 +36,20 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload, Processor} import Entity.Column +import Shredded.{ShreddedSelfDescribing, ShreddedSnowplow} object transform { val Atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) /** Transform the whole `Event` (canonical and JSONs) into list of independent entities ready to be inserted */ - def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], processor: Processor, event: Event): EitherT[F, BadRow, List[Entity]] = { + def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], processor: Processor, event: Event): EitherT[F, BadRow, ShreddedSnowplow] = { val entities = event.contexts.data ++ event.derived_contexts.data ++ event.unstruct_event.data.toList val wholeEvent = entities .parTraverse(shredJson(client)) .value .map { shreddedOrError => (shreddedOrError, shredAtomic(Map())(event)).mapN { - (shreddedEntities, atomic) => atomic :: shreddedEntities.map(addMetadata(event.event_id, event.collector_tstamp)) + (shreddedEntities, atomic) => ShreddedSnowplow(atomic, shreddedEntities.map(_.entity).map(addMetadata(event.event_id, event.collector_tstamp))) } } EitherT(wholeEvent).leftMap[BadRow](buildBadRow(processor, event)) @@ -83,7 +84,7 @@ object transform { /** Transform JSON into [[Entity]] */ def shredJson[F[_]: Sync: Clock](client: Client[F, Json]) - (data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], Entity] = { + (data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], ShreddedSelfDescribing] = { val key = data.schema schema.getOrdered(client.resolver)(key.vendor, key.name, key.version.model) .leftMap { error => NonEmptyList.of(error) } @@ -105,7 +106,7 @@ object transform { case Atomic => "events" case other => StringUtils.getTableName(SchemaMap(other)) } - Entity(tableName, data.schema, columns) + ShreddedSelfDescribing(Entity(tableName, data.schema, columns)) } } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala index 4c6f0cd..ae504bb 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala @@ -74,7 +74,6 @@ object sink { case Data.SelfDescribing(json) => transform .shredJson(client)(json) - .map(entity => List(entity)) .leftMap(errors => BadData.BadJson(json.normalize.noSpaces, errors.toString)) } insert <- EitherT(DB.process(entities, state).attempt).leftMap { diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala index ef6f70b..d8c56f9 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala @@ -44,7 +44,7 @@ class sinkspec extends Database { val event = Event.parse(line).getOrElse(throw new RuntimeException("Event is invalid")) val stream = Stream.emit[IO, Data](Data.Snowplow(event)) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, true) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) @@ -66,7 +66,7 @@ class sinkspec extends Database { val json = SelfDescribingData.parse(row).getOrElse(throw new RuntimeException("Invalid SelfDescribingData")) val stream = Stream.emit[IO, Data](Data.SelfDescribing(json)) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, false) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) @@ -103,7 +103,7 @@ class sinkspec extends Database { ColumnInfo("big_int", None, true, "bigint", None), ) - implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema, false) + implicit val D = DB.interpreter[IO](igluClient.resolver, xa, logger, Schema) val action = for { state <- State.init[IO](List(), igluClient.resolver) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala index 4397817..3b4d290 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala @@ -89,12 +89,7 @@ object LoaderConfig { } } - sealed trait Purpose extends Product with Serializable { - def snowplow: Boolean = this match { - case Purpose.Enriched => true - case Purpose.SelfDescribing => false - } - } + sealed trait Purpose extends Product with Serializable object Purpose { case object Enriched extends Purpose case object SelfDescribing extends Purpose diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala index 75da1e3..bc5ad34 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -37,8 +37,7 @@ object Main extends IOApp { case (blocker, xa, state) => source.getSource[IO](blocker, loaderConfig.purpose, loaderConfig.source) match { case Right(dataStream) => - val meta = loaderConfig.purpose.snowplow - implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, loaderConfig.schema, meta) + implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, loaderConfig.schema) for { _ <- loaderConfig.purpose match { case Purpose.Enriched => utils.prepare[IO](loaderConfig.schema, xa, logger) From d6f595e0b924005c01bd1eabb2279a360852ef41 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 13 Aug 2020 17:36:52 +0100 Subject: [PATCH 7/7] Helper function for initializing db state --- .../snowplow/postgres/resources.scala | 28 ++++++++++++------- .../snowplow/postgres/streaming/sink.scala | 3 +- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala index 87bc01a..c90759f 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala @@ -32,6 +32,8 @@ import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri object resources { + val FixedThreadPoolSize: Int = 32 + /** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */ def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, logger: LogHandler, @@ -39,26 +41,32 @@ object resources { for { blocker <- Blocker[F] xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker) - keysF = for { - ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate) - (issues, comments) = ci - _ <- issues.traverse_(issue => Sync[F].delay(println(issue))) - } yield comments - keys <- Resource.liftF(keysF) - initState = State.init[F](keys, iglu.resolver).value.flatMap { + state <- Resource.liftF(initializeState(postgres, logger, iglu, xa)) + } yield (blocker, xa, state) + + def initializeState[F[_]: Concurrent: Clock](postgres: DBConfig, + logger: LogHandler, + iglu: Client[F, Json], + xa: Transactor[F]): F[State[F]] = { + for { + ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate) + (issues, comments) = ci + _ <- issues.traverse_(issue => Sync[F].delay(println(issue))) + initState = State.init[F](comments, iglu.resolver).value.flatMap { case Left(error) => val exception = new RuntimeException(s"Couldn't initalise the state $error") Sync[F].raiseError[State[F]](exception) case Right(state) => Sync[F].pure(state) } - state <- Resource.liftF(initState) - } yield (blocker, xa, state) + state <- initState + } yield state + } /** Get a HikariCP transactor */ def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] = for { - ce <- ExecutionContexts.fixedThreadPool[F](32) + ce <- ExecutionContexts.fixedThreadPool[F](FixedThreadPoolSize) xa <- HikariTransactor.newHikariTransactor[F]("org.postgresql.Driver", jdbcUri.toString, user, password, ce, be) } yield xa diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala index ae504bb..5195ace 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/sink.scala @@ -30,6 +30,7 @@ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor} import com.snowplowanalytics.snowplow.postgres.api.{State, DB} +import com.snowplowanalytics.snowplow.postgres.resources.FixedThreadPoolSize import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, transform} import com.snowplowanalytics.snowplow.postgres.streaming.data.{Data, BadData} @@ -49,7 +50,7 @@ object sink { def goodSink[F[_]: Concurrent: Clock: DB](state: State[F], client: Client[F, Json], processor: Processor): Pipe[F, Data, BadData] = - _.parEvalMapUnordered(32)(sinkPayload(state, client, processor)) + _.parEvalMapUnordered(FixedThreadPoolSize)(sinkPayload(state, client, processor)) .collect { case Left(badData) => badData }