Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ trait Session[F[_]] {
*/
def option[A, B](query: Query[A, B], args: A): F[Option[B]]

/**
* Returns a stream that prepare if needed, then execute a parameterized query
*
* @param chunkSize how many rows must be fetched by page
* @group Commands
*/
def stream[A, B](command: Query[A, B], args: A, chunkSize: Int): Stream[F, B]

/**
* Prepare if needed, then execute a parameterized query and returns a resource wrapping a cursor in the result set.
*
* @group Queries
*/
def cursor[A, B](query: Query[A, B], args: A): Resource[F, Cursor[F, B]]

/**
* Execute a non-parameterized command and yield a `Completion`. If you have parameters use
* `prepare` instead.
Expand Down Expand Up @@ -179,7 +194,7 @@ trait Session[F[_]] {
*
* Note: this method only exists to ease migration from Skunk 0.3 and prior. Use the
* non-resource variant instead.
*
*
* @group Queries
*/
def prepareR[A, B](query: Query[A, B]): Resource[F, PreparedQuery[F, A, B]] =
Expand All @@ -191,7 +206,7 @@ trait Session[F[_]] {
*
* Note: this method only exists to ease migration from Skunk 0.3 and prior. Use the
* non-resource variant instead.
*
*
* @group Commands
*/
def prepareR[A](command: Command[A]): Resource[F, PreparedCommand[F, A]] =
Expand All @@ -201,17 +216,15 @@ trait Session[F[_]] {
* Transform a `Command` into a `Pipe` from inputs to `Completion`s.
* @group Commands
*/
def pipe[A](command: Command[A]): Pipe[F, A, Completion] = fa =>
Stream.eval(prepare(command)).flatMap(pc => fa.evalMap(pc.execute)).scope
def pipe[A](command: Command[A]): Pipe[F, A, Completion]

/**
* Transform a `Query` into a `Pipe` from inputs to outputs.
*
* @param chunkSize how many rows must be fetched by page
* @group Commands
*/
def pipe[A, B](query: Query[A, B], chunkSize: Int): Pipe[F, A, B] = fa =>
Stream.eval(prepare(query)).flatMap(pq => fa.flatMap(a => pq.stream(a, chunkSize))).scope
def pipe[A, B](query: Query[A, B], chunkSize: Int): Pipe[F, A, B]

/**
* A named asynchronous channel that can be used for inter-process communication.
Expand Down Expand Up @@ -252,7 +265,7 @@ trait Session[F[_]] {
* the cache through this accessor.
*/
def describeCache: Describe.Cache[F]

/**
* Each session has access to a cache of all statements that have been parsed by the
* `Parse` protocol, which allows us to skip a network round-trip. Users can inspect and clear
Expand Down Expand Up @@ -283,9 +296,21 @@ object Session {
override def option[A, B](query: Query[A, B], args: A): F[Option[B]] =
Monad[F].flatMap(prepare(query))(_.option(args))

override def stream[A, B](command: Query[A, B], args: A, chunkSize: Int): Stream[F, B] =
Stream.eval(prepare(command)).flatMap(_.stream(args, chunkSize)).scope

override def cursor[A, B](query: Query[A, B], args: A): Resource[F, Cursor[F, B]] =
Resource.eval(prepare(query)).flatMap(_.cursor(args))

override def pipe[A, B](query: Query[A, B], chunkSize: Int): Pipe[F, A, B] = fa =>
Stream.eval(prepare(query)).flatMap(pq => fa.flatMap(a => pq.stream(a, chunkSize))).scope

override def execute[A](command: Command[A], args: A): F[Completion] =
Monad[F].flatMap(prepare(command))(_.execute(args))

override def pipe[A](command: Command[A]): Pipe[F, A, Completion] = fa =>
Stream.eval(prepare(command)).flatMap(pc => fa.evalMap(pc.execute)).scope

}

val DefaultConnectionParameters: Map[String, String] =
Expand Down Expand Up @@ -556,9 +581,6 @@ object Session {

override def execute[A](query: Query[Void,A]): G[List[A]] = fk(outer.execute(query))

override def pipe[A](command: Command[A]): Pipe[G, A, Completion] = fa =>
Stream.eval(prepare(command)).flatMap(pc => fa.evalMap(pc.execute)).scope

override def option[A](query: Query[Void,A]): G[Option[A]] = fk(outer.option(query))

override def parameter(key: String): Stream[G,String] = outer.parameter(key).translate(fk)
Expand All @@ -569,10 +591,10 @@ object Session {

override def prepare[A](command: Command[A]): G[PreparedCommand[G,A]] = fk(outer.prepare(command)).map(_.mapK(fk))

override def transaction[A]: Resource[G,Transaction[G]] = outer.transaction[A].mapK(fk).map(_.mapK(fk))
override def transaction[A]: Resource[G,Transaction[G]] = outer.transaction.mapK(fk).map(_.mapK(fk))

override def transaction[A](isolationLevel: TransactionIsolationLevel, accessMode: TransactionAccessMode): Resource[G,Transaction[G]] =
outer.transaction[A](isolationLevel, accessMode).mapK(fk).map(_.mapK(fk))
outer.transaction(isolationLevel, accessMode).mapK(fk).map(_.mapK(fk))

override def transactionStatus: Signal[G,TransactionStatus] = outer.transactionStatus.mapK(fk)

Expand Down