Skip to content

Commit

Permalink
Merge 8bd5863 into 91a6738
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 24, 2020
2 parents 91a6738 + 8bd5863 commit e7b4bf9
Show file tree
Hide file tree
Showing 34 changed files with 935 additions and 801 deletions.
17 changes: 11 additions & 6 deletions build.sbt
Expand Up @@ -20,18 +20,14 @@ lazy val common = project
.settings(name := "snowplow-postgres")
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.projectSettings)
.settings(BuildSettings.buildInfoSettings)
.settings(BuildSettings.scoverageSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(BuildSettings.mavenSettings)
.settings(
resolvers += Dependencies.SnowplowBintray,
libraryDependencies ++= Seq(
Dependencies.logger,
Dependencies.postgres,
Dependencies.commons,
Dependencies.catsEffect,
Dependencies.decline,
Dependencies.circe,
Dependencies.circeGeneric,
Dependencies.circeExtras,
Expand All @@ -41,8 +37,6 @@ lazy val common = project
Dependencies.doobiePg,
Dependencies.doobiePgCirce,
Dependencies.doobieHikari,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.analyticsSdk,
Dependencies.badRows,
Dependencies.schemaDdl,
Expand All @@ -57,6 +51,17 @@ lazy val loader = project
.settings(name := "snowplow-postgres-loader")
.settings(BuildSettings.projectSettings)
.settings(BuildSettings.dockerSettings)
.settings(BuildSettings.buildInfoSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(
libraryDependencies ++= Seq(
Dependencies.commons,
Dependencies.fs2Aws,
Dependencies.fs2PubSub,
Dependencies.decline,
Dependencies.specs2
)
)
.dependsOn(common)
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

Expand Down
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.postgres.api
import cats.data.EitherT
import cats.implicits._

import cats.effect.{Bracket, Sync, Clock}
import cats.effect.{Bracket, Clock, Sync}

import doobie.implicits._
import doobie.util.log.LogHandler
Expand All @@ -27,14 +27,14 @@ import com.snowplowanalytics.iglu.client.Resolver

import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, schema}
import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, Shredded, schema}
import com.snowplowanalytics.snowplow.postgres.storage.ddl
import com.snowplowanalytics.snowplow.postgres.streaming.sink

trait DB[F[_]] {
def insert(event: List[Entity]): F[Unit]
def alter(schemaKey: SchemaKey): F[Unit]
def create(schemaKey: SchemaKey): F[Unit]
def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit]

def getSchemaList(schemaKey: SchemaKey): F[SchemaList]
}
Expand All @@ -43,37 +43,39 @@ object DB {

def apply[F[_]](implicit ev: DB[F]): DB[F] = ev

def process[F[_]](event: List[Entity], state: State[F])
(implicit D: DB[F], B: Bracket[F, Throwable]): F[Unit] = {
val insert = D.insert(event)
def process[F[_]](shredded: Shredded, state: State[F])(implicit D: DB[F], B: Bracket[F, Throwable]): F[Unit] = {
val (includeMeta, entities) = shredded match {
case Shredded.ShreddedSnowplow(atomic, entities) => (true, atomic :: entities)
case Shredded.ShreddedSelfDescribing(entity) => (false, List(entity))
}
val insert = D.insert(entities)

// Mutate table and Loader's mutable variable. Only for locked state!
def mutate(missing: Set[SchemaKey], outdated: Set[SchemaKey]): F[Unit] =
for {
_ <- missing.toList.traverse(D.create) // Create missing tables if any
_ <- outdated.toList.traverse(D.alter) // Updated outdated tables if any
_ <- missing.toList.traverse(key => D.create(key, includeMeta)) // Create missing tables if any
_ <- outdated.toList.traverse(D.alter) // Updated outdated tables if any
_ <- (missing ++ outdated).toList.traverse_ { entity =>
for { // Update state with new schemas
for { // Update state with new schemas
list <- D.getSchemaList(entity)
_ <- state.put(list)
_ <- state.put(list)
} yield ()
}
} yield ()

state.checkAndRun(_.checkEvent(event), insert, mutate)
state.checkAndRun(_.checkEvent(entities), insert, mutate)
}



sealed trait StateCheck {
def missing: Set[SchemaKey]
def outdated: Set[SchemaKey]

final def add(entity: SchemaKey, state: TableState): StateCheck = state match {
case TableState.Match => this
case TableState.Missing => StateCheck.Block(missing + entity, outdated)
case TableState.Outdated => StateCheck.Block(missing, outdated + entity)
}
final def add(entity: SchemaKey, state: TableState): StateCheck =
state match {
case TableState.Match => this
case TableState.Missing => StateCheck.Block(missing + entity, outdated)
case TableState.Outdated => StateCheck.Block(missing, outdated + entity)
}
}

object StateCheck {
Expand All @@ -84,33 +86,30 @@ object DB {
}
}

def interpreter[F[_]: Sync: Clock](resolver: Resolver[F],
xa: Transactor[F],
logger: LogHandler,
schemaName: String,
meta: Boolean): DB[F] = new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa)

def alter(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.alterTable[F](resolver, logger, schemaName, schemaKey)
rethrow(result.semiflatMap(_.transact(xa)))
}
def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], logger: LogHandler, schemaName: String): DB[F] =
new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa)

def create(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, meta)
rethrow(result.semiflatMap(_.transact(xa)))
}
def alter(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.alterTable[F](resolver, logger, schemaName, schemaKey)
rethrow(result.semiflatMap(_.transact(xa)))
}

def getSchemaList(schemaKey: SchemaKey): F[SchemaList] = {
val result = schema.getSchemaList[F](resolver)(schemaKey.vendor, schemaKey.name, schemaKey.version.model)
rethrow(result)
}
def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] = {
val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, includeMeta)
rethrow(result.semiflatMap(_.transact(xa)))
}

private def rethrow[A, E](f: EitherT[F, E, A]): F[A] =
f.value.flatMap {
case Right(result) => Sync[F].pure(result)
case Left(error) => Sync[F].raiseError(new RuntimeException(error.toString))
def getSchemaList(schemaKey: SchemaKey): F[SchemaList] = {
val result = schema.getSchemaList[F](resolver)(schemaKey.vendor, schemaKey.name, schemaKey.version.model)
rethrow(result)
}
}

private def rethrow[A, E](f: EitherT[F, E, A]): F[A] =
f.value.flatMap {
case Right(result) => Sync[F].pure(result)
case Left(error) => Sync[F].raiseError(new RuntimeException(error.toString))
}
}
}
Expand Up @@ -16,7 +16,7 @@ import cats.data.EitherT
import cats.implicits._

import cats.effect.concurrent.Ref
import cats.effect.{Sync, Clock}
import cats.effect.{Clock, Sync}

import com.snowplowanalytics.iglu.core.SchemaKey

Expand All @@ -30,40 +30,42 @@ import com.snowplowanalytics.snowplow.postgres.loader._
import com.snowplowanalytics.snowplow.postgres.shredding

/**
* State of the DB schema, where every `ModelGroup` (read "table")
* is associated with list of schemas. Each of these schemas is reflected
* in the structure of the table. If `SchemaKey` matches the `ModelGroup`,
* but not associated with it - the table is outdated. After table has been
* migrated to reflect the newest schema - state need to be updated up to
* that schema
*/
* State of the DB schema, where every `ModelGroup` (read "table")
* is associated with list of schemas. Each of these schemas is reflected
* in the structure of the table. If `SchemaKey` matches the `ModelGroup`,
* but not associated with it - the table is outdated. After table has been
* migrated to reflect the newest schema - state need to be updated up to
* that schema
*/
case class SchemaState(tables: Map[ModelGroup, SchemaList]) {

/**
* Check if `SchemaKey` is known to the state
* @param entity `SchemaKey` taken from table comment
* @return one of three possible tables states
*/
* Check if `SchemaKey` is known to the state
* @param entity `SchemaKey` taken from table comment
* @return one of three possible tables states
*/
private[postgres] def check(entity: SchemaKey): TableState = {
val Atomic = shredding.transform.Atomic
val group = (entity.vendor, entity.name, entity.version.model)

group match {
case (Atomic.vendor, Atomic.name, Atomic.version.model) =>
TableState.Match
case _ => tables.get(group) match {
case Some(SchemaList.Full(schemas)) =>
if (schemas.toList.map(_.self.schemaKey).contains(entity)) TableState.Match else TableState.Outdated
case Some(SchemaList.Single(schema)) =>
if (schema.self.schemaKey === entity) TableState.Match else TableState.Outdated
case None =>
TableState.Missing
}
case _ =>
tables.get(group) match {
case Some(SchemaList.Full(schemas)) =>
if (schemas.toList.map(_.self.schemaKey).contains(entity)) TableState.Match else TableState.Outdated
case Some(SchemaList.Single(schema)) =>
if (schema.self.schemaKey === entity) TableState.Match else TableState.Outdated
case None =>
TableState.Missing
}
}
}

/** Check if any entities from an event are missing in current state */
def checkEvent(entities: List[shredding.Entity]): DB.StateCheck =
entities.foldLeft(DB.StateCheck.Ok: DB.StateCheck) { (acc, key) => acc.add(key.origin, check(key.origin)) }
entities.foldLeft(DB.StateCheck.Ok: DB.StateCheck)((acc, key) => acc.add(key.origin, check(key.origin)))

/** Add a whole `SchemaList` to the state (replace if it exists) */
def put(list: SchemaList): SchemaState = {
Expand All @@ -74,30 +76,32 @@ case class SchemaState(tables: Map[ModelGroup, SchemaList]) {
}

object SchemaState {

/**
* Initialize internal mutable state by traversing all table comments to get their latest version
* For every schema URI, the whole list will be fetched to keep ordering consistent
* All newer versions (present on registry, but not reflected on table) will be dropped
*
* Initialize internal mutable state by traversing all table comments to get their latest version
* For every schema URI, the whole list will be fetched to keep ordering consistent
* All newer versions (present on registry, but not reflected on table) will be dropped
*
* @param resolver Iglu Resolver attached to Iglu Server
* @return a list of potential schema issues (not fatal errors, to be logged) and
* an actual mutable reference with the state
*/
* @return a list of potential schema issues (not fatal errors, to be logged) and
* an actual mutable reference with the state
*/
def init[F[_]: Sync: Clock](keys: List[SchemaKey], resolver: Resolver[F]) = {
val initial = SchemaState(Map.empty)
val availableSchemas = keys.traverse { key =>
EitherT(resolver.listSchemas(key.vendor, key.name, key.version.model))
.leftMap { resolutionError => LoaderIgluError.IgluError(key, resolutionError) }
.flatMap { schemaKeyList => SchemaList.fromSchemaList(schemaKeyList, shredding.schema.fetch(resolver)) }
.map { list => list.until(key) match {
case Some(updatedList) => updatedList
case None => throw new IllegalStateException(s"SchemaList $list doesn't match vendor of ${key.toSchemaUri}")
} }
.leftMap(resolutionError => LoaderIgluError.IgluError(key, resolutionError))
.flatMap(schemaKeyList => SchemaList.fromSchemaList(schemaKeyList, shredding.schema.fetch(resolver)))
.map { list =>
list.until(key) match {
case Some(updatedList) => updatedList
case None => throw new IllegalStateException(s"SchemaList $list doesn't match vendor of ${key.toSchemaUri}")
}
}
}

availableSchemas
.map { list => list.foldLeft(initial) { (acc, cur) => acc.put(cur) } }
.flatMap { state => EitherT.liftF[F, LoaderIgluError, Ref[F, SchemaState]](Ref.of[F, SchemaState](state)) }
availableSchemas.map(list => list.foldLeft(initial)((acc, cur) => acc.put(cur))).flatMap { state =>
EitherT.liftF[F, LoaderIgluError, Ref[F, SchemaState]](Ref.of[F, SchemaState](state))
}
}
}

Expand Up @@ -16,7 +16,7 @@ import cats.Monad
import cats.data.EitherT
import cats.implicits._

import cats.effect.concurrent.{Ref, MVar}
import cats.effect.concurrent.{MVar, Ref}
import cats.effect.{Bracket, Clock, Concurrent}
import cats.effect.implicits._

Expand All @@ -30,31 +30,31 @@ import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError
import com.snowplowanalytics.snowplow.postgres.api.DB.StateCheck

/**
* Mutable variable, protected by by lock.
* [[checkAndRun]] is the only function that should be able to mutate this structure
*/
* Mutable variable, protected by by lock.
* [[checkAndRun]] is the only function that should be able to mutate this structure
*/
final class State[F[_]](lock: MVar[F, Unit], state: Ref[F, SchemaState]) {

/**
* Primary state-handling and the only state-mutation function.
*
* Primary state-handling and the only state-mutation function.
*
* Most of the time `stateCheck` returns `StateCheck.Ok`, meaning that data can be
* inserted without state or DB schema mutation and lock is not acquired, while
* `action` gets executed.
*
* inserted without state or DB schema mutation and lock is not acquired, while
* `action` gets executed.
*
* If new schemas are coming through and state and DB schema have to be changed
* it acquires a lock, preventing other threads from mutating data first, then checks
* if state is still outdated (in case other thread acquired the lock first) and
* performs `mutate` and `action`, releasing the lock afterwards
* If another thread already updated the state it just performs `action`
*
* it acquires a lock, preventing other threads from mutating data first, then checks
* if state is still outdated (in case other thread acquired the lock first) and
* performs `mutate` and `action`, releasing the lock afterwards
* If another thread already updated the state it just performs `action`
*
* @param stateCheck check if lock has to be acquired
* @param action primary IO - DB insert statement
* @param mutate IO that mutates the internal state and DB schema
*/
def checkAndRun(stateCheck: SchemaState => StateCheck,
action: F[Unit],
mutate: (Set[SchemaKey], Set[SchemaKey]) => F[Unit])
(implicit F: Bracket[F, Throwable]): F[Unit] = {
* @param action primary IO - DB insert statement
* @param mutate IO that mutates the internal state and DB schema
*/
def checkAndRun(stateCheck: SchemaState => StateCheck, action: F[Unit], mutate: (Set[SchemaKey], Set[SchemaKey]) => F[Unit])(implicit
F: Bracket[F, Throwable]
): F[Unit] = {
// Just insert OR mutate and insert
def check(update: (Set[SchemaKey], Set[SchemaKey]) => F[Unit]) =
state.get.map(stateCheck).flatMap {
Expand All @@ -64,7 +64,7 @@ final class State[F[_]](lock: MVar[F, Unit], state: Ref[F, SchemaState]) {
update(missingTables, outdatedTables)
}

check { (_, _) => withLock(check(mutate)) } *> action
check((_, _) => withLock(check(mutate))) *> action
}

/** Update [[SchemaState]] with new `SchemaList` */
Expand All @@ -83,4 +83,3 @@ object State {
state <- SchemaState.init[F](keys, resolver)
} yield new State[F](lock, state)
}

Expand Up @@ -12,7 +12,7 @@
*/
package com.snowplowanalytics.snowplow.postgres.api

private[postgres] sealed trait TableState extends Product with Serializable
sealed private[postgres] trait TableState extends Product with Serializable
private[postgres] object TableState {
case object Match extends TableState
case object Outdated extends TableState
Expand Down

0 comments on commit e7b4bf9

Please sign in to comment.