Skip to content

Commit

Permalink
Adjust separation of DB and non-DB config classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter committed Aug 12, 2020
1 parent 5313d95 commit 1a06506
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,25 @@
*/
package com.snowplowanalytics.snowplow.postgres.config

import cats.syntax.either._
import DBConfig.JdbcUri

import io.circe.Decoder
import LoaderConfig.{JdbcUri, Purpose}
import io.circe.generic.semiauto.deriveDecoder

case class LoaderConfig(host: String,
case class DBConfig(host: String,
port: Int,
database: String,
username: String,
password: String, // TODO: can be EC2 store
sslMode: String,
schema: String,
purpose: Purpose) {
schema: String) {
def getJdbc: JdbcUri =
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
}

object LoaderConfig {
object DBConfig {

sealed trait Purpose extends Product with Serializable {
def snowplow: Boolean = this match {
case Purpose.Enriched => true
case Purpose.SelfDescribing => false
}
}
object Purpose {
case object Enriched extends Purpose
case object SelfDescribing extends Purpose

implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] =
Decoder.decodeString.emap {
case "ENRICHED_EVENTS" => Enriched.asRight
case "JSON" => SelfDescribing.asRight
case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft
}
}

case class JdbcUri(host: String, port: Int, database: String, sslMode: String) {
override def toString =
s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode"
}

implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] =
deriveDecoder[LoaderConfig]

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import io.circe.Json
import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.snowplow.postgres.api.State
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri
import com.snowplowanalytics.snowplow.postgres.config.DBConfig
import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri

object resources {

/** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: LoaderConfig,
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig,
logger: LogHandler,
iglu: Client[F, Json]) =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry.{HttpConne
import com.snowplowanalytics.iglu.client.validator.CirceValidator

import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri
import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri

trait Database extends Specification with BeforeAfterEach {
import Database._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.monovore.decline._

import com.snowplowanalytics.snowplow.postgres.generated.BuildInfo

case class Cli[F[_]](config: AppConfig, iglu: Client[F, Json], debug: Boolean)
case class Cli[F[_]](config: LoaderConfig, iglu: Client[F, Json], debug: Boolean)

object Cli {

Expand All @@ -55,7 +55,7 @@ object Cli {
configJson <- PathOrJson.load(rawConfig.config)
configData <- SelfDescribingData.parse(configJson).leftMap(e => s"Configuration JSON is not self-describing, ${e.message(configJson.noSpaces)}").toEitherT[F]
_ <- igluClient.check(configData).leftMap(e => s"Iglu validation failed with following error\n: ${e.asJson.spaces2}")
appConfig <- configData.data.as[AppConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}")
appConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}")
} yield Cli(appConfig, igluClient, rawConfig.debug)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import io.circe.generic.semiauto.deriveDecoder
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder

import AppConfig.Source
import LoaderConfig.Purpose
import LoaderConfig.{Purpose, Source}

import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.common.InitialPositionInStream

case class AppConfig(name: String,
case class LoaderConfig(name: String,
id: UUID,
source: Source,
host: String,
Expand All @@ -41,11 +40,11 @@ case class AppConfig(name: String,
sslMode: String,
schema: String,
purpose: Purpose) {
def getLoaderConfig: LoaderConfig =
LoaderConfig(host, port, database, username, password, sslMode, schema, purpose)
def getDBConfig: DBConfig =
DBConfig(host, port, database, username, password, sslMode, schema)
}

object AppConfig {
object LoaderConfig {

implicit val awsRegionDecoder: Decoder[Region] =
Decoder.decodeString.emap { s =>
Expand Down Expand Up @@ -90,6 +89,24 @@ object AppConfig {
}
}

sealed trait Purpose extends Product with Serializable {
def snowplow: Boolean = this match {
case Purpose.Enriched => true
case Purpose.SelfDescribing => false
}
}
object Purpose {
case object Enriched extends Purpose
case object SelfDescribing extends Purpose

implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] =
Decoder.decodeString.emap {
case "ENRICHED_EVENTS" => Enriched.asRight
case "JSON" => SelfDescribing.asRight
case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft
}
}

sealed trait Source extends Product with Serializable
object Source {

Expand All @@ -103,7 +120,7 @@ object AppConfig {
deriveConfiguredDecoder[Source]
}

implicit def ioCirceConfigDecoder: Decoder[AppConfig] =
deriveDecoder[AppConfig]
implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] =
deriveDecoder[LoaderConfig]

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ object Main extends IOApp {

def run(args: List[String]): IO[ExitCode] =
Cli.parse[IO](args).value.flatMap {
case Right(Cli(appConfig, iglu, debug)) =>
case Right(Cli(loaderConfig, iglu, debug)) =>
val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop
resources.initialize[IO](appConfig.getLoaderConfig, logger, iglu).use {
resources.initialize[IO](loaderConfig.getDBConfig, logger, iglu).use {
case (blocker, xa, state) =>
source.getSource[IO](blocker, appConfig.purpose, appConfig.source) match {
source.getSource[IO](blocker, loaderConfig.purpose, loaderConfig.source) match {
case Right(dataStream) =>
val meta = appConfig.purpose.snowplow
implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, appConfig.schema, meta)
val meta = loaderConfig.purpose.snowplow
implicit val db: DB[IO] = DB.interpreter[IO](iglu.resolver, xa, logger, loaderConfig.schema, meta)
for {
_ <- appConfig.purpose match {
case Purpose.Enriched => utils.prepare[IO](appConfig.schema, xa, logger)
_ <- loaderConfig.purpose match {
case Purpose.Enriched => utils.prepare[IO](loaderConfig.schema, xa, logger)
case Purpose.SelfDescribing => IO.unit
}
goodSink = sink.goodSink[IO](state, iglu, processor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.snowplowanalytics.iglu.core.circe.implicits._
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload}
import com.snowplowanalytics.snowplow.postgres.config.{AppConfig, Cli}
import com.snowplowanalytics.snowplow.postgres.config.{LoaderConfig, Cli}
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose
import com.snowplowanalytics.snowplow.postgres.streaming.data.{BadData, Data}

Expand All @@ -52,16 +52,16 @@ object source {
*/
def getSource[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker,
purpose: Purpose,
config: AppConfig.Source) =
config: LoaderConfig.Source) =
config match {
case AppConfig.Source.Kinesis(appName, streamName, region, position) =>
case LoaderConfig.Source.Kinesis(appName, streamName, region, position) =>
KinesisConsumerSettings.apply(streamName, appName, region, initialPositionInStream = position.unwrap) match {
case Right(settings) =>
readFromKinesisStream[F](settings).evalMap(record => record.checkpoint.as(parseRecord(purpose, record))).asRight
case Left(error) =>
error.asLeft
}
case AppConfig.Source.PubSub(projectId, subscriptionId) =>
case LoaderConfig.Source.PubSub(projectId, subscriptionId) =>
implicit val decoder: MessageDecoder[Either[BadData, Data]] = pubsubDataDecoder(purpose)
val project = ProjectId(projectId)
val subscription = Subscription(subscriptionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import java.util.UUID

import cats.effect.{IO, Clock}

import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.Purpose
import com.snowplowanalytics.snowplow.postgres.config.AppConfig.{Source, InitPosition}
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.{Purpose, Source, InitPosition}

import org.specs2.mutable.Specification
import software.amazon.awssdk.regions.Region
Expand All @@ -32,7 +31,7 @@ class CliSpec extends Specification {
val resolver = Paths.get(getClass.getResource("/resolver.json").toURI)
val argv = List("--config", config.toString, "--resolver", resolver.toString)

val expected = AppConfig(
val expected = LoaderConfig(
"Acme Ltd. Snowplow Postgres",
UUID.fromString("5c5e4353-4eeb-43da-98f8-2de6dc7fa947"),
Source.Kinesis("acme-postgres-loader", "enriched-events", Region.EU_CENTRAL_1, InitPosition.TrimHorizon),
Expand Down

0 comments on commit 1a06506

Please sign in to comment.