-
Notifications
You must be signed in to change notification settings - Fork 348
/
blob.scala
187 lines (167 loc) · 8.75 KB
/
blob.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright (c) 2013-2018 Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package doobie.free
import cats.~>
import cats.effect.{ Async, ContextShift, ExitCase }
import cats.free.{ Free => FF } // alias because some algebras have an op called Free
import scala.concurrent.ExecutionContext
import java.io.InputStream
import java.io.OutputStream
import java.sql.Blob
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
object blob { module =>
// Algebra of operations for Blob. Each accepts a visitor as an alternative to pattern-matching.
sealed trait BlobOp[A] {
def visit[F[_]](v: BlobOp.Visitor[F]): F[A]
}
// Free monad over BlobOp.
type BlobIO[A] = FF[BlobOp, A]
// Module of instances and constructors of BlobOp.
object BlobOp {
// Given a Blob we can embed a BlobIO program in any algebra that understands embedding.
implicit val BlobOpEmbeddable: Embeddable[BlobOp, Blob] =
new Embeddable[BlobOp, Blob] {
def embed[A](j: Blob, fa: FF[BlobOp, A]) = Embedded.Blob(j, fa)
}
// Interface for a natural transformation BlobOp ~> F encoded via the visitor pattern.
// This approach is much more efficient than pattern-matching for large algebras.
trait Visitor[F[_]] extends (BlobOp ~> F) {
final def apply[A](fa: BlobOp[A]): F[A] = fa.visit(this)
// Common
def raw[A](f: Blob => A): F[A]
def embed[A](e: Embedded[A]): F[A]
def delay[A](a: () => A): F[A]
def handleErrorWith[A](fa: BlobIO[A], f: Throwable => BlobIO[A]): F[A]
def raiseError[A](e: Throwable): F[A]
def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]
def asyncF[A](k: (Either[Throwable, A] => Unit) => BlobIO[Unit]): F[A]
def bracketCase[A, B](acquire: BlobIO[A])(use: A => BlobIO[B])(release: (A, ExitCase[Throwable]) => BlobIO[Unit]): F[B]
def shift: F[Unit]
def evalOn[A](ec: ExecutionContext)(fa: BlobIO[A]): F[A]
// Blob
def free: F[Unit]
def getBinaryStream: F[InputStream]
def getBinaryStream(a: Long, b: Long): F[InputStream]
def getBytes(a: Long, b: Int): F[Array[Byte]]
def length: F[Long]
def position(a: Array[Byte], b: Long): F[Long]
def position(a: Blob, b: Long): F[Long]
def setBinaryStream(a: Long): F[OutputStream]
def setBytes(a: Long, b: Array[Byte]): F[Int]
def setBytes(a: Long, b: Array[Byte], c: Int, d: Int): F[Int]
def truncate(a: Long): F[Unit]
}
// Common operations for all algebras.
final case class Raw[A](f: Blob => A) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.raw(f)
}
final case class Embed[A](e: Embedded[A]) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.embed(e)
}
final case class Delay[A](a: () => A) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.delay(a)
}
final case class HandleErrorWith[A](fa: BlobIO[A], f: Throwable => BlobIO[A]) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.handleErrorWith(fa, f)
}
final case class RaiseError[A](e: Throwable) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.raiseError(e)
}
final case class Async1[A](k: (Either[Throwable, A] => Unit) => Unit) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.async(k)
}
final case class AsyncF[A](k: (Either[Throwable, A] => Unit) => BlobIO[Unit]) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.asyncF(k)
}
final case class BracketCase[A, B](acquire: BlobIO[A], use: A => BlobIO[B], release: (A, ExitCase[Throwable]) => BlobIO[Unit]) extends BlobOp[B] {
def visit[F[_]](v: Visitor[F]) = v.bracketCase(acquire)(use)(release)
}
final case object Shift extends BlobOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.shift
}
final case class EvalOn[A](ec: ExecutionContext, fa: BlobIO[A]) extends BlobOp[A] {
def visit[F[_]](v: Visitor[F]) = v.evalOn(ec)(fa)
}
// Blob-specific operations.
final case object Free extends BlobOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.free
}
final case object GetBinaryStream extends BlobOp[InputStream] {
def visit[F[_]](v: Visitor[F]) = v.getBinaryStream
}
final case class GetBinaryStream1(a: Long, b: Long) extends BlobOp[InputStream] {
def visit[F[_]](v: Visitor[F]) = v.getBinaryStream(a, b)
}
final case class GetBytes(a: Long, b: Int) extends BlobOp[Array[Byte]] {
def visit[F[_]](v: Visitor[F]) = v.getBytes(a, b)
}
final case object Length extends BlobOp[Long] {
def visit[F[_]](v: Visitor[F]) = v.length
}
final case class Position(a: Array[Byte], b: Long) extends BlobOp[Long] {
def visit[F[_]](v: Visitor[F]) = v.position(a, b)
}
final case class Position1(a: Blob, b: Long) extends BlobOp[Long] {
def visit[F[_]](v: Visitor[F]) = v.position(a, b)
}
final case class SetBinaryStream(a: Long) extends BlobOp[OutputStream] {
def visit[F[_]](v: Visitor[F]) = v.setBinaryStream(a)
}
final case class SetBytes(a: Long, b: Array[Byte]) extends BlobOp[Int] {
def visit[F[_]](v: Visitor[F]) = v.setBytes(a, b)
}
final case class SetBytes1(a: Long, b: Array[Byte], c: Int, d: Int) extends BlobOp[Int] {
def visit[F[_]](v: Visitor[F]) = v.setBytes(a, b, c, d)
}
final case class Truncate(a: Long) extends BlobOp[Unit] {
def visit[F[_]](v: Visitor[F]) = v.truncate(a)
}
}
import BlobOp._
// Smart constructors for operations common to all algebras.
val unit: BlobIO[Unit] = FF.pure[BlobOp, Unit](())
def pure[A](a: A): BlobIO[A] = FF.pure[BlobOp, A](a)
def raw[A](f: Blob => A): BlobIO[A] = FF.liftF(Raw(f))
def embed[F[_], J, A](j: J, fa: FF[F, A])(implicit ev: Embeddable[F, J]): FF[BlobOp, A] = FF.liftF(Embed(ev.embed(j, fa)))
def delay[A](a: => A): BlobIO[A] = FF.liftF(Delay(() => a))
def handleErrorWith[A](fa: BlobIO[A], f: Throwable => BlobIO[A]): BlobIO[A] = FF.liftF[BlobOp, A](HandleErrorWith(fa, f))
def raiseError[A](err: Throwable): BlobIO[A] = FF.liftF[BlobOp, A](RaiseError(err))
def async[A](k: (Either[Throwable, A] => Unit) => Unit): BlobIO[A] = FF.liftF[BlobOp, A](Async1(k))
def asyncF[A](k: (Either[Throwable, A] => Unit) => BlobIO[Unit]): BlobIO[A] = FF.liftF[BlobOp, A](AsyncF(k))
def bracketCase[A, B](acquire: BlobIO[A])(use: A => BlobIO[B])(release: (A, ExitCase[Throwable]) => BlobIO[Unit]): BlobIO[B] = FF.liftF[BlobOp, B](BracketCase(acquire, use, release))
val shift: BlobIO[Unit] = FF.liftF[BlobOp, Unit](Shift)
def evalOn[A](ec: ExecutionContext)(fa: BlobIO[A]) = FF.liftF[BlobOp, A](EvalOn(ec, fa))
// Smart constructors for Blob-specific operations.
val free: BlobIO[Unit] = FF.liftF(Free)
val getBinaryStream: BlobIO[InputStream] = FF.liftF(GetBinaryStream)
def getBinaryStream(a: Long, b: Long): BlobIO[InputStream] = FF.liftF(GetBinaryStream1(a, b))
def getBytes(a: Long, b: Int): BlobIO[Array[Byte]] = FF.liftF(GetBytes(a, b))
val length: BlobIO[Long] = FF.liftF(Length)
def position(a: Array[Byte], b: Long): BlobIO[Long] = FF.liftF(Position(a, b))
def position(a: Blob, b: Long): BlobIO[Long] = FF.liftF(Position1(a, b))
def setBinaryStream(a: Long): BlobIO[OutputStream] = FF.liftF(SetBinaryStream(a))
def setBytes(a: Long, b: Array[Byte]): BlobIO[Int] = FF.liftF(SetBytes(a, b))
def setBytes(a: Long, b: Array[Byte], c: Int, d: Int): BlobIO[Int] = FF.liftF(SetBytes1(a, b, c, d))
def truncate(a: Long): BlobIO[Unit] = FF.liftF(Truncate(a))
// BlobIO is an Async
implicit val AsyncBlobIO: Async[BlobIO] =
new Async[BlobIO] {
val asyncM = FF.catsFreeMonadForFree[BlobOp]
def bracketCase[A, B](acquire: BlobIO[A])(use: A => BlobIO[B])(release: (A, ExitCase[Throwable]) => BlobIO[Unit]): BlobIO[B] = module.bracketCase(acquire)(use)(release)
def pure[A](x: A): BlobIO[A] = asyncM.pure(x)
def handleErrorWith[A](fa: BlobIO[A])(f: Throwable => BlobIO[A]): BlobIO[A] = module.handleErrorWith(fa, f)
def raiseError[A](e: Throwable): BlobIO[A] = module.raiseError(e)
def async[A](k: (Either[Throwable,A] => Unit) => Unit): BlobIO[A] = module.async(k)
def asyncF[A](k: (Either[Throwable,A] => Unit) => BlobIO[Unit]): BlobIO[A] = module.asyncF(k)
def flatMap[A, B](fa: BlobIO[A])(f: A => BlobIO[B]): BlobIO[B] = asyncM.flatMap(fa)(f)
def tailRecM[A, B](a: A)(f: A => BlobIO[Either[A, B]]): BlobIO[B] = asyncM.tailRecM(a)(f)
def suspend[A](thunk: => BlobIO[A]): BlobIO[A] = asyncM.flatten(module.delay(thunk))
}
// BlobIO is a ContextShift
implicit val ContextShiftBlobIO: ContextShift[BlobIO] =
new ContextShift[BlobIO] {
def shift: BlobIO[Unit] = module.shift
def evalOn[A](ec: ExecutionContext)(fa: BlobIO[A]) = module.evalOn(ec)(fa)
}
}