/
JdbcInvokerComponent.scala
291 lines (231 loc) · 12.9 KB
/
JdbcInvokerComponent.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package scala.slick.driver
import java.sql.{Statement, PreparedStatement}
import scala.slick.SlickException
import scala.slick.ast.{Insert, CompiledStatement, ResultSetMapping, Node}
import scala.slick.lifted.{ShapeLevel, Query, Shape, ShapedValue}
import scala.slick.jdbc._
import scala.slick.util.SQLBuilder
import scala.slick.profile.BasicInvokerComponent
trait JdbcInvokerComponent extends BasicInvokerComponent{ driver: JdbcDriver =>
type InsertInvoker[T] = CountingInsertInvoker[T]
def createInsertInvoker[U](tree: Node) = createCountingInsertInvoker(tree)
// Create the different invokers -- these methods should be overridden by drivers as needed
def createCountingInsertInvoker[U](tree: Node) = new CountingInsertInvoker[U](tree)
def createKeysInsertInvoker[U, RU](tree: Node, keys: Node) = new KeysInsertInvoker[U, RU](tree, keys)
def createMappedKeysInsertInvoker[U, RU, R](tree: Node, keys: Node, tr: (U, RU) => R) = new MappedKeysInsertInvoker[U, RU, R](tree, keys, tr)
def createUnitQueryInvoker[R](tree: Node) = new UnitQueryInvoker[R](tree)
def createUpdateInvoker[T](tree: Node, param: Any) = new UpdateInvoker[T](tree, param)
def createDeleteInvoker(tree: Node, param: Any) = new DeleteInvoker(tree, param)
def createQueryInvoker[P,R](tree: Node): QueryInvoker[P,R] = new QueryInvoker[P, R](tree)
def createDDLInvoker(ddl: SchemaDescription) = new DDLInvoker(ddl)
// Parameters for invokers -- can be overridden by drivers as needed
val invokerMutateConcurrency: ResultSetConcurrency = ResultSetConcurrency.Updatable
val invokerMutateType: ResultSetType = ResultSetType.Auto
val invokerPreviousAfterDelete = false
/** A parameterized query invoker. */
class QueryInvoker[P, R](protected val tree: Node) extends MutatingStatementInvoker[P, R] {
override protected val mutateConcurrency = invokerMutateConcurrency
override protected val mutateType = invokerMutateType
override protected val previousAfterDelete = invokerPreviousAfterDelete
protected[this] val ResultSetMapping(_,
CompiledStatement(_, sres: SQLBuilder.Result, _),
CompiledMapping(converter, _)) = tree
protected def getStatement = sres.sql
protected def setParam(param: P, st: PreparedStatement): Unit = sres.setter(new PositionedParameters(st), param)
protected def extractValue(pr: PositionedResult): R = converter.read(pr).asInstanceOf[R]
protected def updateRowValues(pr: PositionedResult, value: R) = converter.update(value, pr)
def invoker: this.type = this
}
/** Invoker for executing queries. */
class UnitQueryInvoker[R](tree: Node) extends QueryInvoker[Unit, R](tree)
with UnitInvokerMixin[R] with MutatingUnitInvoker[R] {
override protected val delegate = this
}
class DDLInvoker(ddl: DDL) extends super.DDLInvoker {
def create(implicit session: Backend#Session): Unit = session.withTransaction {
for(s <- ddl.createStatements)
session.withPreparedStatement(s)(_.execute)
}
def drop(implicit session: Backend#Session): Unit = session.withTransaction {
for(s <- ddl.dropStatements)
session.withPreparedStatement(s)(_.execute)
}
}
/** Pseudo-invoker for running DELETE calls. */
class DeleteInvoker(protected val tree: Node, param: Any) {
protected[this] val ResultSetMapping(_, CompiledStatement(_, sres: SQLBuilder.Result, _), _) = tree
def deleteStatement = sres.sql
def delete(implicit session: Backend#Session): Int = session.withPreparedStatement(deleteStatement) { st =>
sres.setter(new PositionedParameters(st), param)
st.executeUpdate
}
def deleteInvoker: this.type = this
}
/** Pseudo-invoker for running INSERT calls. */
abstract class BaseInsertInvoker[U](tree: Node) extends InsertInvokerDef[U] {
protected[this] val ResultSetMapping(_, insertNode: Insert, CompiledMapping(converter, _)) = tree
protected[this] lazy val builder = createInsertBuilder(insertNode)
protected def retOne(st: Statement, value: U, updateCount: Int): SingleInsertResult
protected def retMany(values: Seq[U], individual: Seq[SingleInsertResult]): MultiInsertResult
protected def retManyBatch(st: Statement, values: Seq[U], updateCounts: Array[Int]): MultiInsertResult
protected lazy val insertResult = builder.buildInsert(forced = false)
protected lazy val insertForcedResult = builder.buildInsert(forced = true)
lazy val insertStatement = insertResult.sql
lazy val forceInsertStatement = insertForcedResult.sql
def insertStatementFor[TT](query: Query[TT, U]): String = builder.buildInsert(query).sql
def insertStatementFor[TT](c: TT)(implicit shape: Shape[_ <: ShapeLevel.Flat, TT, U, _]): String = insertStatementFor(Query(c)(shape))
def useBatchUpdates(implicit session: Backend#Session) = session.capabilities.supportsBatchUpdates
protected def prepared[T](sql: String)(f: PreparedStatement => T)(implicit session: Backend#Session) =
session.withPreparedStatement(sql)(f)
/** Insert a single row, skipping AutoInc columns. */
final def insert(value: U)(implicit session: Backend#Session): SingleInsertResult = internalInsert(false, value)
/** Insert a single row, including AutoInc columns. This is not supported
* by all database engines (see
* [[scala.slick.driver.JdbcProfile.capabilities.forceInsert]]). */
final def forceInsert(value: U)(implicit session: Backend#Session): SingleInsertResult = internalInsert(true, value)
protected def internalInsert(forced: Boolean, value: U)(implicit session: Backend#Session): SingleInsertResult =
prepared(if(forced) forceInsertStatement else insertStatement) { st =>
st.clearParameters()
converter.set(value, new PositionedParameters(st), forced)
val count = st.executeUpdate()
retOne(st, value, count)
}
/** Insert multiple rows, skipping AutoInc columns.
* Uses JDBC's batch update feature if supported by the JDBC driver.
* Returns Some(rowsAffected), or None if the database returned no row
* count for some part of the batch. If any part of the batch fails, an
* exception is thrown. */
final def insertAll(values: U*)(implicit session: Backend#Session): MultiInsertResult = internalInsertAll(false, values: _*)
/** Insert multiple rows, including AutoInc columns.
* This is not supported by all database engines (see
* [[scala.slick.driver.JdbcProfile.capabilities.forceInsert]]).
* Uses JDBC's batch update feature if supported by the JDBC driver.
* Returns Some(rowsAffected), or None if the database returned no row
* count for some part of the batch. If any part of the batch fails, an
* exception is thrown. */
final def forceInsertAll(values: U*)(implicit session: Backend#Session): MultiInsertResult = internalInsertAll(true, values: _*)
protected def internalInsertAll(forced: Boolean, values: U*)(implicit session: Backend#Session): MultiInsertResult = session.withTransaction {
if(!useBatchUpdates || (values.isInstanceOf[IndexedSeq[_]] && values.length < 2)) {
retMany(values, values.map(insert))
} else {
prepared(if(forced) forceInsertStatement else insertStatement) { st =>
st.clearParameters()
for(value <- values) {
converter.set(value, new PositionedParameters(st), forced)
st.addBatch()
}
val counts = st.executeBatch()
retManyBatch(st, values, counts)
}
}
}
def += (value: U)(implicit session: Backend#Session): SingleInsertResult = insert(value)
def ++= (values: Iterable[U])(implicit session: Backend#Session): MultiInsertResult = insertAll(values.toSeq: _*)
}
/** An InsertInvoker that can also insert from another query. */
trait FullInsertInvoker[U] { this: BaseInsertInvoker[U] =>
type QueryInsertResult
protected def retQuery(st: Statement, updateCount: Int): QueryInsertResult
def insertExpr[TT](c: TT)(implicit shape: Shape[_ <: ShapeLevel.Flat, TT, U, _], session: Backend#Session): QueryInsertResult =
insert(Query(c)(shape))(session)
def insert[TT](query: Query[TT, U])(implicit session: Backend#Session): QueryInsertResult = {
val sbr = builder.buildInsert(query)
prepared(insertStatementFor(query)) { st =>
st.clearParameters()
sbr.setter(new PositionedParameters(st), null)
val count = st.executeUpdate()
retQuery(st, count)
}
}
}
/** Pseudo-invoker for running INSERT calls and returning affected row counts. */
class CountingInsertInvoker[U](tree: Node) extends BaseInsertInvoker[U](tree) with FullInsertInvoker[U] {
type SingleInsertResult = Int
type MultiInsertResult = Option[Int]
type QueryInsertResult = Int
protected def retOne(st: Statement, value: U, updateCount: Int) = updateCount
protected def retMany(values: Seq[U], individual: Seq[SingleInsertResult]) = Some(individual.sum)
protected def retManyBatch(st: Statement, values: Seq[U], updateCounts: Array[Int]) = {
var unknown = false
var count = 0
for((res, idx) <- updateCounts.zipWithIndex) res match {
case Statement.SUCCESS_NO_INFO => unknown = true
case Statement.EXECUTE_FAILED =>
throw new SlickException("Failed to insert row #" + (idx+1))
case i => count += i
}
if(unknown) None else Some(count)
}
protected def retQuery(st: Statement, updateCount: Int) = updateCount
def returning[RT, RU](value: Query[RT, RU]) =
createKeysInsertInvoker[U, RU](tree, value.toNode)
}
/** Base class with common functionality for KeysInsertInvoker and MappedKeysInsertInvoker. */
abstract class AbstractKeysInsertInvoker[U, RU](tree: Node, keys: Node)
extends BaseInsertInvoker[U](tree) {
protected def buildKeysResult(st: Statement): UnitInvoker[RU] =
ResultSetInvoker[RU](_ => st.getGeneratedKeys)(pr => keyConverter.read(pr).asInstanceOf[RU])
// Returning keys from batch inserts is generally not supported
override def useBatchUpdates(implicit session: Backend#Session) = false
protected lazy val (keyColumns, keyConverter) =
builder.buildReturnColumns(keys, insertResult.table)
override protected def prepared[T](sql: String)(f: PreparedStatement => T)(implicit session: Backend#Session) =
session.withPreparedInsertStatement(sql, keyColumns.toArray)(f)
}
/** Pseudo-invoker for running INSERT calls and returning generated keys. */
class KeysInsertInvoker[U, RU](tree: Node, keys: Node)
extends AbstractKeysInsertInvoker[U, RU](tree, keys) with FullInsertInvoker[U] {
type SingleInsertResult = RU
type MultiInsertResult = Seq[RU]
type QueryInsertResult = MultiInsertResult
protected def retOne(st: Statement, value: U, updateCount: Int) =
buildKeysResult(st).first()(null)
protected def retMany(values: Seq[U], individual: Seq[SingleInsertResult]) = individual
protected def retManyBatch(st: Statement, values: Seq[U], updateCounts: Array[Int]) = {
implicit val session: Backend#Session = null
buildKeysResult(st).buildColl[Vector]
}
protected def retQuery(st: Statement, updateCount: Int) = {
implicit val session: Backend#Session = null
buildKeysResult(st).buildColl[Vector]
}
/**
* Specifies a mapping from inserted values and generated keys to a desired value.
* @param f Function that maps inserted values and generated keys to a desired value.
* @tparam R target type of the mapping
*/
def into[R](f: (U, RU) => R) = createMappedKeysInsertInvoker[U, RU, R](tree, keys, f)
}
/** Pseudo-invoker for running INSERT calls and returning generated keys combined with the values. */
class MappedKeysInsertInvoker[U, RU, R](tree: Node, keys: Node, tr: (U, RU) => R)
extends AbstractKeysInsertInvoker[U, RU](tree, keys) {
type SingleInsertResult = R
type MultiInsertResult = Seq[R]
protected def retOne(st: Statement, value: U, updateCount: Int) = {
val ru = buildKeysResult(st).first()(null)
tr(value, ru)
}
protected def retMany(values: Seq[U], individual: Seq[SingleInsertResult]) = individual
protected def retManyBatch(st: Statement, values: Seq[U], updateCounts: Array[Int]) = {
implicit val session: Backend#Session = null
val ru = buildKeysResult(st).buildColl[Vector]
(values, ru).zipped.map(tr)
}
}
/** Pseudo-invoker for running UPDATE calls. */
class UpdateInvoker[T](protected val tree: Node, param: Any) {
protected[this] val ResultSetMapping(_,
CompiledStatement(_, sres: SQLBuilder.Result, _),
CompiledMapping(converter, _)) = tree
def updateStatement = getStatement
protected def getStatement = sres.sql
def update(value: T)(implicit session: Backend#Session): Int = session.withPreparedStatement(updateStatement) { st =>
st.clearParameters
val pp = new PositionedParameters(st)
converter.set(value, pp, true)
sres.setter(pp, param)
st.executeUpdate
}
def updateInvoker: this.type = this
}
}