Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions modules/core/shared/src/main/scala/data/Completion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,31 @@ object Completion {
case object Analyze extends Completion
// more ...

// weird Redshift variations

/**
* This is a variation on the protocol included to better support Redshift. Redshift
* doesn't return a count for a <code>SELECT</code> statement. The
* [[https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COMMANDCOMPLETE protocol documentation]]
* appears to say it should:
*
* <blockquote>For a <code>SELECT</code> or <code>CREATE TABLE AS</code> command, the tag is <code>SELECT rows</code> where <code>rows</code> is the number of rows retrieved.</blockquote>
*
* but comments in the [[https://github.com/pgjdbc/pgjdbc/blob/5d1f2e8cd399cedfdee86728f9044c0e32d74129/pgjdbc/src/main/java/org/postgresql/core/CommandCompleteParser.java#L37-L44 JDBC driver]]
* state
*
* <blockquote>
* Parses <code>CommandComplete (B)</code> message.
* Status is in the format of <code>COMMAND OID ROWS</code> where both <code>OID</code> and <code>ROWS</code> are optional
* and <code>COMMAND</code> can have spaces within it, like <code>CREATE TABLE</code>.
* </blockquote>
*
* so clients should probably be prepared to handle this.
*
* Practically speaking, true Postgres servers do return row counts, but Redshift does not.
*/
case object SelectWithoutCount extends Completion

/**
* Instead of crashing (which breaks the protocol and hangs everything) let's allow for unknown
* completion messages and print out a stacktrace on construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ object CommandComplete {
case "ANALYZE" => apply(Completion.Analyze)
// more .. fill in as we hit them

// weird Redshift variations
case "SELECT" => apply(Completion.SelectWithoutCount)

case s => apply(Completion.Unknown(s))
}

Expand Down
123 changes: 63 additions & 60 deletions modules/tests/shared/src/test/scala/CommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package tests

import cats.effect.IO
import skunk._
import skunk.codec.all._
import skunk.*
import skunk.codec.all.*
import skunk.data.Completion
import skunk.implicits._
import skunk.implicits.*
import cats.Contravariant
import fs2._
import cats.kernel.Eq
import fs2.*
import skunk.exception.UnexpectedRowsException

class CommandTest extends SkunkTest {
Expand Down Expand Up @@ -368,189 +369,191 @@ class CommandTest extends SkunkTest {
val analyzeVerbose: Command[Void] =
sql"ANALYZE VERBOSE city".command

private implicit val eq: Eq[Completion] = Eq.fromUniversalEquals

sessionTest("create table, create index, alter table, alter index, drop index and drop table") { s =>
for {
c <- s.execute(createTable)
_ <- assert("completion", c == Completion.CreateTable)
_ <- assertEqual("completion", c, Completion.CreateTable)
c <- s.execute(createIndex)
_ <- assert("completion", c == Completion.CreateIndex)
_ <- assertEqual("completion", c, Completion.CreateIndex)
c <- s.execute(alterTable)
_ <- assert("completion", c == Completion.AlterTable)
_ <- assertEqual("completion", c, Completion.AlterTable)
c <- s.execute(alterIndex)
_ <- assert("completion", c == Completion.AlterIndex)
_ <- assertEqual("completion", c, Completion.AlterIndex)
c <- s.execute(dropIndex)
_ <- assert("completion", c == Completion.DropIndex)
_ <- assertEqual("completion", c, Completion.DropIndex)
c <- s.execute(dropTable)
_ <- assert("completion", c == Completion.DropTable)
_ <- assertEqual("completion", c, Completion.DropTable)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create and drop trigger") { s =>
for {
c <- s.execute(createFunction)
_ <- assert("completion", c == Completion.CreateFunction)
_ <- assertEqual("completion", c, Completion.CreateFunction)
c <- s.execute(createTrigger)
_ <- assert("completion", c == Completion.CreateTrigger)
_ <- assertEqual("completion", c, Completion.CreateTrigger)
c <- s.execute(alterTrigger)
_ <- assert("completion", c == Completion.AlterTrigger)
_ <- assertEqual("completion", c, Completion.AlterTrigger)
c <- s.execute(dropTrigger)
_ <- assert("completion", c == Completion.DropTrigger)
_ <- assertEqual("completion", c, Completion.DropTrigger)
c <- s.execute(alterFunction)
_ <- assert("completion", c == Completion.AlterFunction)
_ <- assertEqual("completion", c, Completion.AlterFunction)
c <- s.execute(dropFunction)
_ <- assert("completion", c == Completion.DropFunction)
_ <- assertEqual("completion", c, Completion.DropFunction)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create and drop schema") { s =>
for {
c <- s.execute(createSchema)
_ <- assert("completion", c == Completion.CreateSchema)
_ <- assertEqual("completion", c, Completion.CreateSchema)
c <- s.execute(dropSchema)
_ <- assert("completion", c == Completion.DropSchema)
_ <- assertEqual("completion", c, Completion.DropSchema)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create, alter and drop type") { s =>
for {
c <- s.execute(createType)
_ <- assert("completion", c == Completion.CreateType)
_ <- assertEqual("completion", c, Completion.CreateType)
c <- s.execute(alterType)
_ <- assert("completion", c == Completion.AlterType)
_ <- assertEqual("completion", c, Completion.AlterType)
c <- s.execute(dropType)
_ <- assert("completion", c == Completion.DropType)
_ <- assertEqual("completion", c, Completion.DropType)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create, alter and drop policy") { s =>
for {
c <- s.execute(createPolicy)
_ <- assert("completion", c == Completion.CreatePolicy)
_ <- assertEqual("completion", c, Completion.CreatePolicy)
c <- s.execute(alterPolicy)
_ <- assert("completion", c == Completion.AlterPolicy)
_ <- assertEqual("completion", c, Completion.AlterPolicy)
c <- s.execute(dropPolicy)
_ <- assert("completion", c == Completion.DropPolicy)
_ <- assertEqual("completion", c, Completion.DropPolicy)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create view, drop view"){ s=>
for{
c <- s.execute(createView)
_ <- assert("completion", c == Completion.CreateView)
_ <- assertEqual("completion", c, Completion.CreateView)
c <- s.execute(dropView)
_ <- assert("completion", c == Completion.DropView)
_ <- assertEqual("completion", c, Completion.DropView)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("refresh materialized view, refresh materialized view concurrently") { s =>
for {
c <- s.execute(createMaterializedView)
_ <- assert("completion", c == Completion.Select(1))
_ <- assertEqual("completion", c, Completion.Select(1))
c <- s.execute(createMaterializedView)
_ <- assert("completion", c == Completion.CreateMaterializedView)
_ <- assertEqual("completion", c, Completion.CreateMaterializedView)
c <- s.execute(refreshMaterializedView)
_ <- assert("completion", c == Completion.RefreshMaterializedView)
_ <- assertEqual("completion", c, Completion.RefreshMaterializedView)
c <- s.execute(createUniqueIndexForMaterializedView)
_ <- assert("completion", c == Completion.CreateIndex)
_ <- assertEqual("completion", c, Completion.CreateIndex)
c <- s.execute(refreshMaterializedViewConcurrently)
_ <- assert("completion", c == Completion.RefreshMaterializedView)
_ <- assertEqual("completion", c, Completion.RefreshMaterializedView)
c <- s.execute(dropMaterializedView)
_ <- assert("completion", c == Completion.DropMaterializedView)
_ <- assertEqual("completion", c, Completion.DropMaterializedView)
} yield "ok"
}

sessionTest("create, call and drop procedure"){ s=>
for{
c <- s.execute(createProcedure)
_ <- assert("completion", c == Completion.CreateProcedure)
_ <- assertEqual("completion", c, Completion.CreateProcedure)
c <- s.execute(callProcedure)
_ <- assert("completion", c == Completion.Call)
_ <- assertEqual("completion", c, Completion.Call)
c <- s.execute(dropProcedure)
_ <- assert("completion", c == Completion.DropProcedure)
_ <- assertEqual("completion", c, Completion.DropProcedure)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create domain, drop domain"){ s=>
for{
c <- s.execute(dropDomain)
_ <- assert("completion", c == Completion.DropDomain)
_ <- assertEqual("completion", c, Completion.DropDomain)
c <- s.execute(createDomain)
_ <- assert("completion", c == Completion.CreateDomain)
_ <- assertEqual("completion", c, Completion.CreateDomain)
c <- s.execute(dropDomain)
_ <- assert("completion", c == Completion.DropDomain)
_ <- assertEqual("completion", c, Completion.DropDomain)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("create sequence, alter sequence, drop sequence"){ s=>
for{
c <- s.execute(createSequence)
_ <- assert("completion", c == Completion.CreateSequence)
_ <- assertEqual("completion", c, Completion.CreateSequence)
c <- s.execute(alterSequence)
_ <- assert("completion", c == Completion.AlterSequence)
_ <- assertEqual("completion", c, Completion.AlterSequence)
c <- s.execute(dropSequence)
_ <- assert("completion", c == Completion.DropSequence)
_ <- assertEqual("completion", c, Completion.DropSequence)
} yield "ok"
}

sessionTest("create database, drop database"){ s=>
for{
c <- s.execute(createDatabase)
_ <- assert("completion", c == Completion.CreateDatabase)
_ <- assertEqual("completion", c, Completion.CreateDatabase)
c <- s.execute(dropDatabase)
_ <- assert("completion", c == Completion.DropDatabase)
_ <- assertEqual("completion", c, Completion.DropDatabase)
} yield "ok"
}

sessionTest("create role, alter role, drop role") { s =>
for {
c <- s.execute(createRole)
_ <- assert("completion", c == Completion.CreateRole)
_ <- assertEqual("completion", c, Completion.CreateRole)
c <- s.execute(alterRole)
_ <- assert("completion", c == Completion.AlterRole)
_ <- assertEqual("completion", c, Completion.AlterRole)
c <- s.execute(dropRole)
_ <- assert("completion", c == Completion.DropRole)
_ <- assertEqual("completion", c, Completion.DropRole)
} yield "ok"
}

sessionTest("create extension, drop extension") { s =>
for {
c <- s.execute(createExtension)
_ <- assert("completion", c == Completion.CreateExtension)
_ <- assertEqual("completion", c, Completion.CreateExtension)
c <- s.execute(dropExtension)
_ <- assert("completion", c == Completion.DropExtension)
_ <- assertEqual("completion", c, Completion.DropExtension)
} yield "ok"
}

sessionTest("do command") { s =>
for{
c <- s.execute(doCommand)
_ <- assert("completion", c == Completion.Do)
_ <- assertEqual("completion", c, Completion.Do)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("add comment, remove comment") { s =>
for{
c <- s.execute(addComment)
_ <- assert("completion", c == Completion.Comment)
_ <- assertEqual("completion", c, Completion.Comment)
c <- s.execute(removeComment)
_ <- assert("completion", c == Completion.Comment)
_ <- assertEqual("completion", c, Completion.Comment)
_ <- s.assertHealthy
} yield "ok"
}

sessionTest("analyze") { s =>
for{
c <- s.execute(analyze)
_ <- assert("completion", c == Completion.Analyze)
_ <- assertEqual("completion", c, Completion.Analyze)
v <- s.execute(analyzeVerbose)
_ <- assert("completion", v == Completion.Analyze)
_ <- s.assertHealthy
Expand All @@ -561,20 +564,20 @@ class CommandTest extends SkunkTest {
s.transaction.use { _ =>
for {
c <- s.execute(sql"set constraints all deferred".command)
_ <- assert("completion", c == Completion.SetConstraints)
_ <- assertEqual("completion", c, Completion.SetConstraints)
} yield "ok"
} >> s.assertHealthy
}

sessionTest("insert, update and delete record") { s =>
for {
c <- s.execute(insertCity)(Garin)
_ <- assert("completion", c == Completion.Insert(1))
_ <- assertEqual("completion", c, Completion.Insert(1))
c <- s.unique(selectCity)(Garin.id)
_ <- assert("read", c == Garin)
p <- IO(Garin.pop + 1000)
c <- s.execute(updateCityPopulation)((p, Garin.id))
_ <- assert("completion", c == Completion.Update(1))
_ <- assertEqual("completion", c, Completion.Update(1))
c <- s.unique(selectCity)(Garin.id)
_ <- assert("read", c == Garin.copy(pop = p))
_ <- s.execute(deleteCity)(Garin.id)
Expand All @@ -585,7 +588,7 @@ class CommandTest extends SkunkTest {
sessionTest("insert and delete record with contramapped command") { s =>
for {
c <- s.prepare(insertCity2).flatMap(_.execute(Garin2))
_ <- assert("completion", c == Completion.Insert(1))
_ <- assertEqual("completion", c, Completion.Insert(1))
c <- s.prepare(selectCity).flatMap(_.unique(Garin2.id))
_ <- assert("read", c == Garin2)
_ <- s.prepare(deleteCity).flatMap(_.execute(Garin2.id))
Expand All @@ -596,7 +599,7 @@ class CommandTest extends SkunkTest {
sessionTest("insert and delete record with contramapped command (via Contravariant instance") { s =>
for {
c <- s.prepare(insertCity2a).flatMap(_.execute(Garin3))
_ <- assert("completion", c == Completion.Insert(1))
_ <- assertEqual("completion", c, Completion.Insert(1))
c <- s.prepare(selectCity).flatMap(_.unique(Garin3.id))
_ <- assert("read", c == Garin3)
_ <- s.prepare(deleteCity).flatMap(_.execute(Garin3.id))
Expand All @@ -611,7 +614,7 @@ class CommandTest extends SkunkTest {
if (majorVersion >= 15) {
for {
c <- s.prepare(insertCity).flatMap(_.execute(Garin))
_ <- assert("completion", c == Completion.Insert(1))
_ <- assertEqual("completion", c, Completion.Insert(1))
c <- s.prepare(mergeCity).flatMap(_.execute(Garin.id))
_ <- assert("merge", c == Completion.Merge(1))
c <- s.prepare(selectCity).flatMap(_.option(Garin.id))
Expand Down Expand Up @@ -653,9 +656,9 @@ class CommandTest extends SkunkTest {
for{
_ <- s.execute(createRole)
c <- s.execute(grant)
_ <- assert("completion", c == Completion.Grant)
_ <- assertEqual("completion", c, Completion.Grant)
c <- s.execute(revoke)
_ <- assert("completion", c == Completion.Revoke)
_ <- assertEqual("completion", c, Completion.Revoke)
_ <- s.execute(dropRole)
_ <- s.assertHealthy
} yield "ok"
Expand Down