Skip to content

Commit

Permalink
Ignore missing comment on atomic events table (close #36)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 7, 2021
1 parent f49941a commit 49c2953
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 16 deletions.
Expand Up @@ -35,6 +35,7 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.postgres.storage.definitions.EventsTableName
import Entity.Column
import Shredded.{ShreddedSelfDescribing, ShreddedSnowplow}

Expand Down Expand Up @@ -103,7 +104,7 @@ object transform {
.map { cols =>
val columns = cols.collect { case Some(c) => c }
val tableName = data.schema match {
case Atomic => "events"
case Atomic => EventsTableName
case other => StringUtils.getTableName(SchemaMap(other))
}
ShreddedSelfDescribing(Entity(tableName, data.schema, columns))
Expand Down
Expand Up @@ -30,7 +30,6 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaL

import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.postgres.shredding.schema.fetch
import com.snowplowanalytics.snowplow.postgres.shredding.transform.Atomic
import com.snowplowanalytics.snowplow.postgres.streaming.IgluErrors
import com.snowplowanalytics.snowplow.postgres.streaming.sink.Insert

Expand Down Expand Up @@ -90,16 +89,10 @@ object ddl {

DdlSchemaList.fromSchemaList(list, fetch[F](resolver)).leftMap(IgluErrors.of).map { list =>
val statement = generator(list)
val tableName = getTableName(origin)
val tableName = StringUtils.getTableName(SchemaMap(origin))
statement.update().run.void *>
sql.commentTable(schema, tableName, list.latest)
}
}
}

def getTableName(schemaKey: SchemaKey): String =
schemaKey match {
case Atomic => "events"
case other => StringUtils.getTableName(SchemaMap(other))
}
}
Expand Up @@ -203,12 +203,15 @@ object definitions {
}

val schemaFr = Fragment.const0(schema)
val tableFr = Fragment.const0(EventsTableName)

fr"""CREATE TABLE $schemaFr.events ($columns) WITH (OIDS=FALSE)"""
fr"""CREATE TABLE $schemaFr.$tableFr ($columns) WITH (OIDS=FALSE)"""
}

def columnToString(columnName: String, dataType: Type, nullable: Boolean) = {
val notNull = if (nullable) "NULL" else "NOT NULL"
s""""$columnName" ${dataType.ddl} $notNull"""
}

val EventsTableName = "events"
}
Expand Up @@ -14,6 +14,7 @@ package com.snowplowanalytics.snowplow.postgres.storage

import cats.syntax.traverse._
import cats.syntax.either._
import cats.syntax.eq._
import cats.instances.list._

import doobie.ConnectionIO
Expand All @@ -22,6 +23,7 @@ import org.log4s.getLogger

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.snowplow.postgres.logging.Slf4jLogHandler
import definitions.EventsTableName

/** Functions to query the storage for state and metadata */
object query {
Expand Down Expand Up @@ -55,5 +57,5 @@ object query {
}

def getComments(schema: String): ConnectionIO[List[Either[CommentIssue, SchemaKey]]] =
listTables(schema).flatMap(_.traverse(getComment(schema)))
listTables(schema).flatMap(_.filterNot(_ === EventsTableName).traverse(getComment(schema)))
}
Expand Up @@ -23,6 +23,7 @@ import doobie.implicits._
import doobie.util.transactor.Transactor

import query.tableExists
import definitions.EventsTableName

object utils {

Expand All @@ -31,12 +32,12 @@ object utils {
def prepareEventsTable(schema: String): ConnectionIO[Boolean] = {
val create = ddl.createEventsTable(schema).as(false)
val exists = Monad[ConnectionIO].pure(true)
Monad[ConnectionIO].ifM(tableExists(schema, "events"))(exists, create)
Monad[ConnectionIO].ifM(tableExists(schema, EventsTableName))(exists, create)
}

def prepare[F[_]: Sync](schema: String, xa: Transactor[F]): F[Unit] =
prepareEventsTable(schema).transact(xa).flatMap {
case true => Sync[F].delay(logger.info(s"$schema.events table already exists"))
case false => Sync[F].delay(logger.info(s"$schema.events table created"))
case true => Sync[F].delay(logger.info(s"$schema.$EventsTableName table already exists"))
case false => Sync[F].delay(logger.info(s"$schema.$EventsTableName table created"))
}
}
Expand Up @@ -36,6 +36,7 @@ import com.snowplowanalytics.iglu.client.validator.CirceValidator

import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri
import com.snowplowanalytics.snowplow.postgres.storage.definitions.EventsTableName

trait Database extends Specification with BeforeAfterEach {
import Database._
Expand Down Expand Up @@ -70,8 +71,10 @@ object Database {
characterMaximumLength: Option[Int]
)

def query: IO[List[UUID]] =
fr"SELECT event_id FROM events".query[UUID].to[List].transact(xa)
def query: IO[List[UUID]] = {
val tablefr = Fragment.const0(EventsTableName)
fr"SELECT event_id FROM $tablefr".query[UUID].to[List].transact(xa)
}

def count(table: String): IO[Int] =
(fr"SELECT count(*) FROM " ++ Fragment.const(table)).query[Int].unique.transact(xa)
Expand Down

0 comments on commit 49c2953

Please sign in to comment.