From 49c2953bc28f48fd412ae9d12acb1a4bbd7ef00f Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 7 Jul 2021 11:00:00 +0100 Subject: [PATCH] Ignore missing comment on atomic events table (close #36) --- .../snowplow/postgres/shredding/transform.scala | 3 ++- .../snowplow/postgres/storage/ddl.scala | 9 +-------- .../snowplow/postgres/storage/definitions.scala | 5 ++++- .../snowplow/postgres/storage/query.scala | 4 +++- .../snowplow/postgres/storage/utils.scala | 7 ++++--- .../snowplowanalytics/snowplow/postgres/Database.scala | 7 +++++-- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala index b2c9366..3c457ab 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala @@ -35,6 +35,7 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor} +import com.snowplowanalytics.snowplow.postgres.storage.definitions.EventsTableName import Entity.Column import Shredded.{ShreddedSelfDescribing, ShreddedSnowplow} @@ -103,7 +104,7 @@ object transform { .map { cols => val columns = cols.collect { case Some(c) => c } val tableName = data.schema match { - case Atomic => "events" + case Atomic => EventsTableName case other => StringUtils.getTableName(SchemaMap(other)) } ShreddedSelfDescribing(Entity(tableName, data.schema, columns)) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/ddl.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/ddl.scala index 3b695aa..158ab82 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/ddl.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/ddl.scala @@ -30,7 +30,6 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaL import com.snowplowanalytics.snowplow.badrows.FailureDetails import com.snowplowanalytics.snowplow.postgres.shredding.schema.fetch -import com.snowplowanalytics.snowplow.postgres.shredding.transform.Atomic import com.snowplowanalytics.snowplow.postgres.streaming.IgluErrors import com.snowplowanalytics.snowplow.postgres.streaming.sink.Insert @@ -90,16 +89,10 @@ object ddl { DdlSchemaList.fromSchemaList(list, fetch[F](resolver)).leftMap(IgluErrors.of).map { list => val statement = generator(list) - val tableName = getTableName(origin) + val tableName = StringUtils.getTableName(SchemaMap(origin)) statement.update().run.void *> sql.commentTable(schema, tableName, list.latest) } } } - - def getTableName(schemaKey: SchemaKey): String = - schemaKey match { - case Atomic => "events" - case other => StringUtils.getTableName(SchemaMap(other)) - } } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/definitions.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/definitions.scala index 12dcefb..f9d900a 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/definitions.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/definitions.scala @@ -203,12 +203,15 @@ object definitions { } val schemaFr = Fragment.const0(schema) + val tableFr = Fragment.const0(EventsTableName) - fr"""CREATE TABLE $schemaFr.events ($columns) WITH (OIDS=FALSE)""" + fr"""CREATE TABLE $schemaFr.$tableFr ($columns) WITH (OIDS=FALSE)""" } def columnToString(columnName: String, dataType: Type, nullable: Boolean) = { val notNull = if (nullable) "NULL" else "NOT NULL" s""""$columnName" ${dataType.ddl} $notNull""" } + + val EventsTableName = "events" } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/query.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/query.scala index 53dcf3d..5ecc3f0 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/query.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/query.scala @@ -14,6 +14,7 @@ package com.snowplowanalytics.snowplow.postgres.storage import cats.syntax.traverse._ import cats.syntax.either._ +import cats.syntax.eq._ import cats.instances.list._ import doobie.ConnectionIO @@ -22,6 +23,7 @@ import org.log4s.getLogger import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.snowplow.postgres.logging.Slf4jLogHandler +import definitions.EventsTableName /** Functions to query the storage for state and metadata */ object query { @@ -55,5 +57,5 @@ object query { } def getComments(schema: String): ConnectionIO[List[Either[CommentIssue, SchemaKey]]] = - listTables(schema).flatMap(_.traverse(getComment(schema))) + listTables(schema).flatMap(_.filterNot(_ === EventsTableName).traverse(getComment(schema))) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/utils.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/utils.scala index 70826c0..46dd5ce 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/utils.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/storage/utils.scala @@ -23,6 +23,7 @@ import doobie.implicits._ import doobie.util.transactor.Transactor import query.tableExists +import definitions.EventsTableName object utils { @@ -31,12 +32,12 @@ object utils { def prepareEventsTable(schema: String): ConnectionIO[Boolean] = { val create = ddl.createEventsTable(schema).as(false) val exists = Monad[ConnectionIO].pure(true) - Monad[ConnectionIO].ifM(tableExists(schema, "events"))(exists, create) + Monad[ConnectionIO].ifM(tableExists(schema, EventsTableName))(exists, create) } def prepare[F[_]: Sync](schema: String, xa: Transactor[F]): F[Unit] = prepareEventsTable(schema).transact(xa).flatMap { - case true => Sync[F].delay(logger.info(s"$schema.events table already exists")) - case false => Sync[F].delay(logger.info(s"$schema.events table created")) + case true => Sync[F].delay(logger.info(s"$schema.$EventsTableName table already exists")) + case false => Sync[F].delay(logger.info(s"$schema.$EventsTableName table created")) } } diff --git a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala index aae93cc..50a4b52 100644 --- a/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala +++ b/modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/Database.scala @@ -36,6 +36,7 @@ import com.snowplowanalytics.iglu.client.validator.CirceValidator import com.snowplowanalytics.snowplow.badrows.FailureDetails import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri +import com.snowplowanalytics.snowplow.postgres.storage.definitions.EventsTableName trait Database extends Specification with BeforeAfterEach { import Database._ @@ -70,8 +71,10 @@ object Database { characterMaximumLength: Option[Int] ) - def query: IO[List[UUID]] = - fr"SELECT event_id FROM events".query[UUID].to[List].transact(xa) + def query: IO[List[UUID]] = { + val tablefr = Fragment.const0(EventsTableName) + fr"SELECT event_id FROM $tablefr".query[UUID].to[List].transact(xa) + } def count(table: String): IO[Int] = (fr"SELECT count(*) FROM " ++ Fragment.const(table)).query[Int].unique.transact(xa)