/
copyin.scala
154 lines (134 loc) · 7.16 KB
/
copyin.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) 2013-2018 Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package doobie.postgres.free
import cats.~>
import cats.effect.{ Async, ExitCase }
import cats.free.{ Free => FF } // alias because some algebras have an op called Free
import org.postgresql.copy.{ CopyIn => PGCopyIn }
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
object copyin { module =>
// Algebra of operations for PGCopyIn. Each accepts a visitor as an alternatie to pattern-matching.
sealed trait CopyInOp[A] {
def visit[F[_]](v: CopyInOp.Visitor[F]): F[A]
}
// Free monad over CopyInOp.
type CopyInIO[A] = FF[CopyInOp, A]
// Module of instances and constructors of CopyInOp.
object CopyInOp {
// Given a PGCopyIn we can embed a CopyInIO program in any algebra that understands embedding.
implicit val CopyInOpEmbeddable: Embeddable[CopyInOp, PGCopyIn] =
new Embeddable[CopyInOp, PGCopyIn] {
def embed[A](j: PGCopyIn, fa: FF[CopyInOp, A]) = Embedded.CopyIn(j, fa)
}
// Interface for a natural tansformation CopyInOp ~> F encoded via the visitor pattern.
// This approach is much more efficient than pattern-matching for large algebras.
trait Visitor[F[_]] extends (CopyInOp ~> F) {
final def apply[A](fa: CopyInOp[A]): F[A] = fa.visit(this)
// Common
def raw[A](f: PGCopyIn => A): F[A]
def embed[A](e: Embedded[A]): F[A]
def delay[A](a: () => A): F[A]
def handleErrorWith[A](fa: CopyInIO[A], f: Throwable => CopyInIO[A]): F[A]
def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]
def asyncF[A](k: (Either[Throwable, A] => Unit) => CopyInIO[Unit]): F[A]
def bracketCase[A, B](acquire: CopyInIO[A])(use: A => CopyInIO[B])(release: (A, ExitCase[Throwable]) => CopyInIO[Unit]): F[B]
// PGCopyIn
def cancelCopy: F[Unit]
def endCopy: F[Long]
def flushCopy: F[Unit]
def getFieldCount: F[Int]
def getFieldFormat(a: Int): F[Int]
def getFormat: F[Int]
def getHandledRowCount: F[Long]
def isActive: F[Boolean]
def writeToCopy(a: Array[Byte], b: Int, c: Int): F[Unit]
}
// Common operations for all algebras.
final case class Raw[A](f: PGCopyIn => A) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.raw(f)
}
final case class Embed[A](e: Embedded[A]) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.embed(e)
}
final case class Delay[A](a: () => A) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.delay(a)
}
final case class HandleErrorWith[A](fa: CopyInIO[A], f: Throwable => CopyInIO[A]) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.handleErrorWith(fa, f)
}
final case class Async1[A](k: (Either[Throwable, A] => Unit) => Unit) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.async(k)
}
final case class AsyncF[A](k: (Either[Throwable, A] => Unit) => CopyInIO[Unit]) extends CopyInOp[A] {
def visit[F[_]](v: Visitor[F]) = v.asyncF(k)
}
final case class BracketCase[A, B](acquire: CopyInIO[A], use: A => CopyInIO[B], release: (A, ExitCase[Throwable]) => CopyInIO[Unit]) extends CopyInOp[B] {
def visit[F[_]](v: Visitor[F]) = v.bracketCase(acquire)(use)(release)
}
// PGCopyIn-specific operations.
final case object CancelCopy extends CopyInOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.cancelCopy
}
final case object EndCopy extends CopyInOp[Long] {
def visit[F[_]](v: Visitor[F]) = v.endCopy
}
final case object FlushCopy extends CopyInOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.flushCopy
}
final case object GetFieldCount extends CopyInOp[Int] {
def visit[F[_]](v: Visitor[F]) = v.getFieldCount
}
final case class GetFieldFormat(a: Int) extends CopyInOp[Int] {
def visit[F[_]](v: Visitor[F]) = v.getFieldFormat(a)
}
final case object GetFormat extends CopyInOp[Int] {
def visit[F[_]](v: Visitor[F]) = v.getFormat
}
final case object GetHandledRowCount extends CopyInOp[Long] {
def visit[F[_]](v: Visitor[F]) = v.getHandledRowCount
}
final case object IsActive extends CopyInOp[Boolean] {
def visit[F[_]](v: Visitor[F]) = v.isActive
}
final case class WriteToCopy(a: Array[Byte], b: Int, c: Int) extends CopyInOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.writeToCopy(a, b, c)
}
}
import CopyInOp._
// Smart constructors for operations common to all algebras.
val unit: CopyInIO[Unit] = FF.pure[CopyInOp, Unit](())
def pure[A](a: A): CopyInIO[A] = FF.pure[CopyInOp, A](a)
def raw[A](f: PGCopyIn => A): CopyInIO[A] = FF.liftF(Raw(f))
def embed[F[_], J, A](j: J, fa: FF[F, A])(implicit ev: Embeddable[F, J]): FF[CopyInOp, A] = FF.liftF(Embed(ev.embed(j, fa)))
def delay[A](a: => A): CopyInIO[A] = FF.liftF(Delay(() => a))
def handleErrorWith[A](fa: CopyInIO[A], f: Throwable => CopyInIO[A]): CopyInIO[A] = FF.liftF[CopyInOp, A](HandleErrorWith(fa, f))
def raiseError[A](err: Throwable): CopyInIO[A] = delay(throw err)
def async[A](k: (Either[Throwable, A] => Unit) => Unit): CopyInIO[A] = FF.liftF[CopyInOp, A](Async1(k))
def asyncF[A](k: (Either[Throwable, A] => Unit) => CopyInIO[Unit]): CopyInIO[A] = FF.liftF[CopyInOp, A](AsyncF(k))
def bracketCase[A, B](acquire: CopyInIO[A])(use: A => CopyInIO[B])(release: (A, ExitCase[Throwable]) => CopyInIO[Unit]): CopyInIO[B] = FF.liftF[CopyInOp, B](BracketCase(acquire, use, release))
// Smart constructors for CopyIn-specific operations.
val cancelCopy: CopyInIO[Unit] = FF.liftF(CancelCopy)
val endCopy: CopyInIO[Long] = FF.liftF(EndCopy)
val flushCopy: CopyInIO[Unit] = FF.liftF(FlushCopy)
val getFieldCount: CopyInIO[Int] = FF.liftF(GetFieldCount)
def getFieldFormat(a: Int): CopyInIO[Int] = FF.liftF(GetFieldFormat(a))
val getFormat: CopyInIO[Int] = FF.liftF(GetFormat)
val getHandledRowCount: CopyInIO[Long] = FF.liftF(GetHandledRowCount)
val isActive: CopyInIO[Boolean] = FF.liftF(IsActive)
def writeToCopy(a: Array[Byte], b: Int, c: Int): CopyInIO[Unit] = FF.liftF(WriteToCopy(a, b, c))
// CopyInIO is an Async
implicit val AsyncCopyInIO: Async[CopyInIO] =
new Async[CopyInIO] {
val M = FF.catsFreeMonadForFree[CopyInOp]
def bracketCase[A, B](acquire: CopyInIO[A])(use: A => CopyInIO[B])(release: (A, ExitCase[Throwable]) => CopyInIO[Unit]): CopyInIO[B] = module.bracketCase(acquire)(use)(release)
def pure[A](x: A): CopyInIO[A] = M.pure(x)
def handleErrorWith[A](fa: CopyInIO[A])(f: Throwable => CopyInIO[A]): CopyInIO[A] = module.handleErrorWith(fa, f)
def raiseError[A](e: Throwable): CopyInIO[A] = module.raiseError(e)
def async[A](k: (Either[Throwable,A] => Unit) => Unit): CopyInIO[A] = module.async(k)
def asyncF[A](k: (Either[Throwable,A] => Unit) => CopyInIO[Unit]): CopyInIO[A] = module.asyncF(k)
def flatMap[A, B](fa: CopyInIO[A])(f: A => CopyInIO[B]): CopyInIO[B] = M.flatMap(fa)(f)
def tailRecM[A, B](a: A)(f: A => CopyInIO[Either[A, B]]): CopyInIO[B] = M.tailRecM(a)(f)
def suspend[A](thunk: => CopyInIO[A]): CopyInIO[A] = M.flatten(module.delay(thunk))
}
}