-
Notifications
You must be signed in to change notification settings - Fork 348
/
connection.scala
83 lines (64 loc) · 3.15 KB
/
connection.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright (c) 2013-2017 Rob Norris
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package doobie.postgres.hi
import cats.~>
import cats.data.Kleisli
import org.postgresql.{ PGConnection, PGNotification }
import doobie._, doobie.implicits._
import doobie.postgres.free.KleisliInterpreter
/** Module of safe `PGConnectionIO` operations lifted into `ConnectionIO`. */
object connection {
// An intepreter for lifting PGConnectionIO into ConnectionIO
val defaultInterpreter: PFPC.PGConnectionOp ~> Kleisli[ConnectionIO, PGConnection, ?] =
KleisliInterpreter[ConnectionIO].PGConnectionInterpreter
val pgGetBackendPID: ConnectionIO[Int] =
pgGetConnection(PFPC.getBackendPID)
def pgGetConnection[A](k: PGConnectionIO[A]): ConnectionIO[A] =
FC.unwrap(classOf[PGConnection]).flatMap(k.foldMap(defaultInterpreter).run)
def pgGetCopyAPI[A](k: CopyManagerIO[A]): ConnectionIO[A] =
pgGetConnection(PHPC.getCopyAPI(k))
def pgGetFastpathAPI[A](k: FastpathIO[A]): ConnectionIO[A] =
pgGetConnection(PHPC.getFastpathAPI(k))
def pgGetLargeObjectAPI[A](k: LargeObjectManagerIO[A]): ConnectionIO[A] =
pgGetConnection(PHPC.getLargeObjectAPI(k))
val pgGetNotifications: ConnectionIO[List[PGNotification]] =
pgGetConnection(PHPC.getNotifications)
val pgGetPrepareThreshold: ConnectionIO[Int] =
pgGetConnection(PHPC.getPrepareThreshold)
def pgSetPrepareThreshold(threshold: Int): ConnectionIO[Unit] =
pgGetConnection(PHPC.setPrepareThreshold(threshold))
/**
* Construct a program that notifies on the given channel. Note that the channel is NOT sanitized;
* it cannot be passed as a parameter and is simply interpolated into the statement. DO NOT pass
* user input here.
*/
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
def pgNotify(channel: String): ConnectionIO[Unit] =
execVoid("NOTIFY " + channel)
/**
* Construct a program that notifies on the given channel, with a payload. Note that neither the
* channel nor the payload are sanitized; neither can be passed as parameters and are simply
* interpolated into the statement. DO NOT pass user input here.
*/
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
def pgNotify(channel: String, payload: String): ConnectionIO[Unit] =
execVoid(s"NOTIFY $channel, '$payload'")
/**
* Construct a program that starts listening on the given channel. Note that the channel is NOT
* sanitized; it cannot be passed as a parameter and is simply interpolated into the statement.
* DO NOT pass user input here.
*/
def pgListen(channel: String): ConnectionIO[Unit] =
execVoid("LISTEN " + channel)
/**
* Construct a program that stops listening on the given channel. Note that the channel is NOT
* sanitized; it cannot be passed as a parameter and is simply interpolated into the statement.
* DO NOT pass user input here.
*/
def pgUnlisten(channel: String): ConnectionIO[Unit] =
execVoid("UNLISTEN " + channel)
// a helper
private def execVoid(sql: String): ConnectionIO[Unit] =
HC.prepareStatement(sql)(HPS.executeUpdate).map(_ => ())
}