Skip to content

Commit

Permalink
Merge d6f595e into 91a6738
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 14, 2020
2 parents 91a6738 + d6f595e commit f0a3b9b
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 208 deletions.
17 changes: 11 additions & 6 deletions build.sbt
Expand Up @@ -20,18 +20,14 @@ lazy val common = project
.settings(name := "snowplow-postgres")
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.projectSettings)
.settings(BuildSettings.buildInfoSettings)
.settings(BuildSettings.scoverageSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(BuildSettings.mavenSettings)
.settings(
resolvers += Dependencies.SnowplowBintray,
libraryDependencies ++= Seq(
Dependencies.logger,
Dependencies.postgres,
Dependencies.commons,
Dependencies.catsEffect,
Dependencies.decline,
Dependencies.circe,
Dependencies.circeGeneric,
Dependencies.circeExtras,
Expand All @@ -41,8 +37,6 @@ lazy val common = project
Dependencies.doobiePg,
Dependencies.doobiePgCirce,
Dependencies.doobieHikari,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.analyticsSdk,
Dependencies.badRows,
Dependencies.schemaDdl,
Expand All @@ -57,6 +51,17 @@ lazy val loader = project
.settings(name := "snowplow-postgres-loader")
.settings(BuildSettings.projectSettings)
.settings(BuildSettings.dockerSettings)
.settings(BuildSettings.buildInfoSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(
libraryDependencies ++= Seq(
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.decline,
Dependencies.specs2
)
)
.dependsOn(common)
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

Expand Down
Expand Up @@ -27,14 +27,14 @@ import com.snowplowanalytics.iglu.client.Resolver

import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, schema}
import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema}
import com.snowplowanalytics.snowplow.postgres.storage.ddl
import com.snowplowanalytics.snowplow.postgres.streaming.sink

trait DB[F[_]] {
def insert(event: List[Entity]): F[Unit]
def alter(schemaKey: SchemaKey): F[Unit]
def create(schemaKey: SchemaKey): F[Unit]
def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit]

def getSchemaList(schemaKey: SchemaKey): F[SchemaList]
}
Expand All @@ -43,14 +43,18 @@ object DB {

def apply[F[_]](implicit ev: DB[F]): DB[F] = ev

def process[F[_]](event: List[Entity], state: State[F])
def process[F[_]](shredded: Shredded, state: State[F])
(implicit D: DB[F], B: Bracket[F, Throwable]): F[Unit] = {
val insert = D.insert(event)
val (includeMeta, entities) = shredded match {
case Shredded.ShreddedSnowplow(atomic, entities) => (true, atomic :: entities)
case Shredded.ShreddedSelfDescribing(entity) => (false, List(entity))
}
val insert = D.insert(entities)

// Mutate table and Loader's mutable variable. Only for locked state!
def mutate(missing: Set[SchemaKey], outdated: Set[SchemaKey]): F[Unit] =
for {
_ <- missing.toList.traverse(D.create) // Create missing tables if any
_ <- missing.toList.traverse(key => D.create(key, includeMeta)) // Create missing tables if any
_ <- outdated.toList.traverse(D.alter) // Updated outdated tables if any
_ <- (missing ++ outdated).toList.traverse_ { entity =>
for { // Update state with new schemas
Expand All @@ -60,7 +64,7 @@ object DB {
}
} yield ()

state.checkAndRun(_.checkEvent(event), insert, mutate)
state.checkAndRun(_.checkEvent(entities), insert, mutate)
}


Expand All @@ -87,8 +91,7 @@ object DB {
def interpreter[F[_]: Sync: Clock](resolver: Resolver[F],
xa: Transactor[F],
logger: LogHandler,
schemaName: String,
meta: Boolean): DB[F] = new DB[F] {
schemaName: String): DB[F] = new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa)

Expand All @@ -97,8 +100,8 @@ object DB {
rethrow(result.semiflatMap(_.transact(xa)))
}

def create(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, meta)
def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] = {
val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, includeMeta)
rethrow(result.semiflatMap(_.transact(xa)))
}

Expand Down

This file was deleted.

@@ -0,0 +1,36 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.postgres.config

import DBConfig.JdbcUri

case class DBConfig(host: String,
port: Int,
database: String,
username: String,
password: String, // TODO: can be EC2 store
sslMode: String,
schema: String) {
def getJdbc: JdbcUri =
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
}

object DBConfig {


case class JdbcUri(host: String, port: Int, database: String, sslMode: String) {
override def toString =
s"jdbc:postgresql://$host:$port/$database?sslmode=$sslMode"
}

}
Expand Up @@ -22,47 +22,51 @@ import doobie.util.ExecutionContexts
import doobie.util.log.LogHandler
import doobie.util.transactor.Transactor

import fs2.concurrent.Queue

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.snowplow.postgres.api.State
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig
import com.snowplowanalytics.snowplow.postgres.config.LoaderConfig.JdbcUri
import com.snowplowanalytics.snowplow.postgres.streaming.source.BadData
import com.snowplowanalytics.snowplow.postgres.config.DBConfig
import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri

object resources {

val FixedThreadPoolSize: Int = 32

/** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: LoaderConfig,
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig,
logger: LogHandler,
iglu: Client[F, Json]) =
for {
blocker <- Blocker[F]
badQueue <- Resource.liftF(Queue.bounded[F, BadData](128))
xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker)
keysF = for {
ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate)
(issues, comments) = ci
_ <- issues.traverse_(issue => Sync[F].delay(println(issue)))
} yield comments
keys <- Resource.liftF(keysF)
initState = State.init[F](keys, iglu.resolver).value.flatMap {
state <- Resource.liftF(initializeState(postgres, logger, iglu, xa))
} yield (blocker, xa, state)

def initializeState[F[_]: Concurrent: Clock](postgres: DBConfig,
logger: LogHandler,
iglu: Client[F, Json],
xa: Transactor[F]): F[State[F]] = {
for {
ci <- storage.query.getComments(postgres.schema, logger).transact(xa).map(_.separate)
(issues, comments) = ci
_ <- issues.traverse_(issue => Sync[F].delay(println(issue)))
initState = State.init[F](comments, iglu.resolver).value.flatMap {
case Left(error) =>
val exception = new RuntimeException(s"Couldn't initalise the state $error")
Sync[F].raiseError[State[F]](exception)
case Right(state) =>
Sync[F].pure(state)
}
state <- Resource.liftF(initState)
} yield (blocker, xa, state, badQueue)
state <- initState
} yield state
}

/** Get a HikariCP transactor */
def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] =
for {
ce <- ExecutionContexts.fixedThreadPool[F](32)
ce <- ExecutionContexts.fixedThreadPool[F](FixedThreadPoolSize)
xa <- HikariTransactor.newHikariTransactor[F]("org.postgresql.Driver", jdbcUri.toString, user, password, ce, be)
} yield xa

Expand Down
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.postgres.shredding

sealed trait Shredded

object Shredded {
case class ShreddedSnowplow(event: Entity, entities: List[Entity]) extends Shredded
case class ShreddedSelfDescribing(entity: Entity) extends Shredded
}
Expand Up @@ -34,25 +34,25 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, Schema}
import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload}
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload, Processor}
import Entity.Column
import com.snowplowanalytics.snowplow.postgres.config.Cli
import Shredded.{ShreddedSelfDescribing, ShreddedSnowplow}

object transform {
val Atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))

/** Transform the whole `Event` (canonical and JSONs) into list of independent entities ready to be inserted */
def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], event: Event): EitherT[F, BadRow, List[Entity]] = {
def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], processor: Processor, event: Event): EitherT[F, BadRow, ShreddedSnowplow] = {
val entities = event.contexts.data ++ event.derived_contexts.data ++ event.unstruct_event.data.toList
val wholeEvent = entities
.parTraverse(shredJson(client))
.value
.map { shreddedOrError =>
(shreddedOrError, shredAtomic(Map())(event)).mapN {
(shreddedEntities, atomic) => atomic :: shreddedEntities.map(addMetadata(event.event_id, event.collector_tstamp))
(shreddedEntities, atomic) => ShreddedSnowplow(atomic, shreddedEntities.map(_.entity).map(addMetadata(event.event_id, event.collector_tstamp)))
}
}
EitherT(wholeEvent).leftMap[BadRow](buildBadRow(event))
EitherT(wholeEvent).leftMap[BadRow](buildBadRow(processor, event))
}

def addMetadata(eventId: UUID, tstamp: Instant)(entity: Entity): Entity = {
Expand Down Expand Up @@ -84,7 +84,7 @@ object transform {

/** Transform JSON into [[Entity]] */
def shredJson[F[_]: Sync: Clock](client: Client[F, Json])
(data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], Entity] = {
(data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], ShreddedSelfDescribing] = {
val key = data.schema
schema.getOrdered(client.resolver)(key.vendor, key.name, key.version.model)
.leftMap { error => NonEmptyList.of(error) }
Expand All @@ -106,7 +106,7 @@ object transform {
case Atomic => "events"
case other => StringUtils.getTableName(SchemaMap(other))
}
Entity(tableName, data.schema, columns)
ShreddedSelfDescribing(Entity(tableName, data.schema, columns))
}
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ object transform {
(columnName, dataType, value)
}

private def buildBadRow(event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) =
BadRow.LoaderIgluError(Cli.processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event))
private def buildBadRow(processor: Processor, event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) =
BadRow.LoaderIgluError(processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event))

}
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.postgres.streaming

import com.snowplowanalytics.iglu.core.SelfDescribingData

import io.circe.Json

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.BadRow

object data {

/** Kind of data flowing through the Loader */
sealed trait Data extends Product with Serializable {
def snowplow: Boolean = this match {
case _: Data.Snowplow => true
case _: Data.SelfDescribing => false
}
}

object Data {
case class Snowplow(data: Event) extends Data
case class SelfDescribing(data: SelfDescribingData[Json]) extends Data
}

/** Data that for some reasons cannot be inserted into DB */
sealed trait BadData extends Throwable with Product with Serializable
object BadData {
/** Typical Snowplow bad row (Loader Iglu Error etc) */
case class BadEnriched(data: BadRow) extends BadData
/** Non-enriched error */
case class BadJson(payload: String, error: String) extends BadData
}
}

0 comments on commit f0a3b9b

Please sign in to comment.