-
-
Notifications
You must be signed in to change notification settings - Fork 609
/
MemoryProfile.scala
264 lines (222 loc) · 12.9 KB
/
MemoryProfile.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package slick.memory
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import scala.reflect.ClassTag
import slick.ast.*
import slick.ast.TypeUtil.*
import slick.basic.{FixedBasicAction, FixedBasicStreamingAction}
import slick.compiler.*
import slick.dbio.*
import slick.relational.{CompiledMapping, RelationalProfile, ResultConverter, ResultConverterCompiler}
import slick.compat.collection.*
import slick.util.{??, DumpInfo}
/** A profile for interpreted queries on top of the in-memory database. */
trait MemoryProfile extends RelationalProfile with MemoryQueryingProfile { self: MemoryProfile =>
type SchemaDescription = SchemaDescriptionDef
type InsertInvoker[T] = InsertInvokerDef[T]
type Backend = HeapBackend
val backend: Backend = HeapBackend
val api: MemoryAPI = new MemoryAPI {}
lazy val queryCompiler = compiler + new MemoryCodeGen
lazy val updateCompiler = compiler
lazy val deleteCompiler = compiler
lazy val insertCompiler =
QueryCompiler(
Phase.assignUniqueSymbols,
Phase.inferTypes,
new InsertCompiler(InsertCompiler.NonAutoInc),
new MemoryInsertCodeGen
)
override protected def computeCapabilities = super.computeCapabilities ++ MemoryCapabilities.all
def createInsertInvoker[T](tree: Node): InsertInvoker[T] = new InsertInvokerDef[T](tree)
def buildSequenceSchemaDescription(seq: Sequence[?]): SchemaDescription = ??
def buildTableSchemaDescription(table: Table[?]): SchemaDescription = new DDL(Vector(table))
type QueryActionExtensionMethods[R, S <: NoStream] = MemoryQueryActionExtensionMethodsImpl[R, S]
type StreamingQueryActionExtensionMethods[R, T] = MemoryStreamingQueryActionExtensionMethodsImpl[R, T]
type SchemaActionExtensionMethods = MemorySchemaActionExtensionMethodsImpl
type InsertActionExtensionMethods[T] = MemoryInsertActionExtensionMethodsImpl[T]
def createQueryActionExtensionMethods[R, S <: NoStream](tree: Node, param: Any): QueryActionExtensionMethods[R, S] =
new QueryActionExtensionMethods[R, S](tree, param)
def createStreamingQueryActionExtensionMethods[R, T](tree: Node,
param: Any): StreamingQueryActionExtensionMethods[R, T] =
new StreamingQueryActionExtensionMethods[R, T](tree, param)
def createSchemaActionExtensionMethods(schema: SchemaDescription): SchemaActionExtensionMethods =
new MemorySchemaActionExtensionMethodsImpl(schema)
def createInsertActionExtensionMethods[T](compiled: CompiledInsert): InsertActionExtensionMethods[T] =
new MemoryInsertActionExtensionMethodsImpl[T](compiled)
override lazy val MappedColumnType: MemoryMappedColumnTypeFactory = new MemoryMappedColumnTypeFactory
class MemoryMappedColumnTypeFactory extends MappedColumnTypeFactory {
def base[T : ClassTag, U : BaseColumnType](tmap: T => U, tcomap: U => T): BaseColumnType[T] = {
assertNonNullType(implicitly[BaseColumnType[U]])
new MappedColumnType(implicitly[BaseColumnType[U]], tmap, tcomap)
}
}
class MappedColumnType[T, U](val baseType: ColumnType[U], toBase: T => U, toMapped: U => T)
(implicit val classTag: ClassTag[T]) extends ScalaType[T] with BaseTypedType[T] {
def nullable: Boolean = baseType.nullable
def ordered: Boolean = baseType.ordered
def scalaOrderingFor(ord: Ordering): scala.math.Ordering[T] = new scala.math.Ordering[T] {
val uOrdering = baseType.scalaOrderingFor(ord)
def compare(x: T, y: T): Int = uOrdering.compare(toBase(x), toBase(y))
}
}
trait MemoryAPI extends RelationalAPI with MemoryQueryingAPI {
type SimpleDBIO[+R] = SimpleMemoryAction[R]
val SimpleDBIO = SimpleMemoryAction
}
protected def createInterpreter(db: Backend#Database, param: Any): QueryInterpreter =
new QueryInterpreter(db, param) {
override def run(n: Node) = n match {
case ResultSetMapping(_, from, CompiledMapping(converter, _)) :@ CollectionType(cons, el) =>
val fromV = run(from).asInstanceOf[IterableOnce[Any]]
val b = cons.createBuilder(el.classTag).asInstanceOf[mutable.Builder[Any, Any]]
b ++= fromV.iterator.map { v =>
converter.asInstanceOf[ResultConverter[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing, ?]]
.read(v.asInstanceOf[QueryInterpreter.ProductValue])
}
b.result()
case n => super.run(n)
}
}
def runSynchronousQuery[R](tree: Node, param: Any)(implicit session: backend.Session): R =
createInterpreter(session.database, param).run(tree).asInstanceOf[R]
class InsertInvokerDef[T](tree: Node) {
protected[this] val ResultSetMapping(_, Insert(_, table: TableNode, _, _), CompiledMapping(converter, _)) =
tree: @unchecked
type SingleInsertResult = Unit
type MultiInsertResult = Unit
def += (value: T)(implicit session: Backend#Session): Unit = {
val heapTable = session.database.getTable(table.tableName)
val buf = heapTable.createInsertRow
converter.asInstanceOf[ResultConverter[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing, Any]].set(value, buf, 0)
heapTable.append(buf.toIndexedSeq)
}
def ++= (values: Iterable[T])(implicit session: Backend#Session): Unit =
values.foreach(this += _)
}
class DDL(val tables: Vector[Table[?]]) extends SchemaDescriptionDef {
def ++(other: SchemaDescription): SchemaDescription =
new DDL(tables ++ other.asInstanceOf[DDL].tables)
}
type ProfileAction[+R, +S <: NoStream, -E <: Effect] = FixedBasicAction[R, S, E]
type StreamingProfileAction[+R, +T, -E <: Effect] = FixedBasicStreamingAction[R, T, E]
protected[this] def dbAction[R, S <: NoStream, E <: Effect](f: Backend#Session => R): ProfileAction[R, S, E] =
new ProfileAction[R, S, E] with SynchronousDatabaseAction[R, S, HeapBackend#BasicActionContext, HeapBackend#BasicStreamingActionContext, E] {
def run(ctx: HeapBackend#BasicActionContext): R = f(ctx.session)
def getDumpInfo = DumpInfo("MemoryProfile.ProfileAction")
}
class StreamingQueryAction[R, T](tree: Node, param: Any)
extends StreamingProfileAction[R, T, Effect.Read]
with SynchronousDatabaseAction[R, Streaming[T], HeapBackend#BasicActionContext, HeapBackend#BasicStreamingActionContext, Effect.Read] {
type StreamState = Iterator[T]
protected[this] def getIterator(ctx: HeapBackend#BasicActionContext): Iterator[T] = {
val inter = createInterpreter(ctx.session.database, param)
val ResultSetMapping(_, from, CompiledMapping(converter, _)) = tree: @unchecked
val productValueIterator = inter.run(from).asInstanceOf[IterableOnce[QueryInterpreter.ProductValue]].iterator
productValueIterator.map(converter.asInstanceOf[ResultConverter[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing, T]].read)
}
def run(ctx: HeapBackend#BasicActionContext): R =
createInterpreter(ctx.session.database, param).run(tree).asInstanceOf[R]
override def emitStream(ctx: HeapBackend#BasicStreamingActionContext, limit: Long, state: StreamState): StreamState = {
val it = if(state ne null) state else getIterator(ctx.asInstanceOf[Backend#Context]) //TODO why does Dotty need this cast?
var count = 0L
while(count < limit && it.hasNext) {
count += 1
ctx.emit(it.next())
}
if(it.hasNext) it else null
}
def head: ProfileAction[T, NoStream, Effect.Read] =
new ProfileAction[T, NoStream, Effect.Read]
with SynchronousDatabaseAction[T, NoStream, HeapBackend#BasicActionContext, HeapBackend#BasicStreamingActionContext, Effect.Read] {
def run(ctx: HeapBackend#BasicActionContext): T = getIterator(ctx).next()
def getDumpInfo = DumpInfo("MemoryProfile.StreamingQueryAction.first")
}
def headOption: ProfileAction[Option[T], NoStream, Effect.Read] =
new ProfileAction[Option[T], NoStream, Effect.Read]
with SynchronousDatabaseAction[Option[T], NoStream, HeapBackend#BasicActionContext, HeapBackend#BasicStreamingActionContext, Effect.Read] {
def run(ctx: HeapBackend#BasicActionContext): Option[T] = {
val it = getIterator(ctx)
if (it.hasNext) Some(it.next()) else None
}
def getDumpInfo = DumpInfo("MemoryProfile.StreamingQueryAction.firstOption")
}
def getDumpInfo = DumpInfo("MemoryProfile.StreamingQueryAction")
}
class MemoryQueryActionExtensionMethodsImpl[R, S <: NoStream](tree: Node, param: Any)
extends BasicQueryActionExtensionMethodsImpl[R, S] {
def result: ProfileAction[R, S, Effect.Read] =
new StreamingQueryAction[R, Nothing](tree, param).asInstanceOf[ProfileAction[R, S, Effect.Read]]
}
class MemoryStreamingQueryActionExtensionMethodsImpl[R, T](tree: Node, param: Any)
extends MemoryQueryActionExtensionMethodsImpl[R, Streaming[T]](tree, param)
with BasicStreamingQueryActionExtensionMethodsImpl[R, T] {
override def result: StreamingProfileAction[R, T, Effect.Read] =
super.result.asInstanceOf[StreamingProfileAction[R, T, Effect.Read]]
}
class MemorySchemaActionExtensionMethodsImpl(schema: SchemaDescription)
extends RelationalSchemaActionExtensionMethodsImpl {
protected[this] val tables = schema.asInstanceOf[DDL].tables
override def create: FixedBasicAction[Unit, Nothing, Effect.Schema] = dbAction { session =>
tables.foreach(t =>
session.database.createTable(t.tableName,
t.create_*.map { fs => new HeapBackend.Column(fs, typeInfoFor(fs.tpe)) }.toIndexedSeq,
t.indexes.toIndexedSeq, t.tableConstraints.toIndexedSeq)
)
}
override def createIfNotExists: FixedBasicAction[Unit, Nothing, Effect.Schema] = dbAction { session =>
tables.foreach(t =>
session.database.createTableIfNotExists(t.tableName,
t.create_*.map { fs => new HeapBackend.Column(fs, typeInfoFor(fs.tpe)) }.toIndexedSeq,
t.indexes.toIndexedSeq, t.tableConstraints.toIndexedSeq)
)
}
override def drop: FixedBasicAction[Unit, Nothing, Effect.Schema] = dbAction { session =>
tables.foreach(t => session.database.dropTable(t.tableName))
}
override def dropIfExists: FixedBasicAction[Unit, Nothing, Effect.Schema] = dbAction { session =>
tables.foreach(t => session.database.dropTableIfExists(t.tableName))
}
override def truncate: FixedBasicAction[Unit, Nothing, Effect.Schema] = dbAction{ session =>
tables.foreach(t => session.database.truncateTable(t.tableName) )
}
}
class MemoryInsertActionExtensionMethodsImpl[T](compiled: CompiledInsert)
extends InsertActionExtensionMethodsImpl[T] {
protected[this] val inv = createInsertInvoker[T](compiled)
type SingleInsertResult = Unit
type MultiInsertResult = Unit
override def +=(value: T): FixedBasicAction[Unit, Nothing, Effect.Write] = dbAction(inv.+=(value)(_))
override def ++=(values: Iterable[T]): FixedBasicAction[Unit, Nothing, Effect.Write] = dbAction(inv.++=(values)(_))
}
override def computeQueryCompiler = super.computeQueryCompiler ++ QueryCompiler.interpreterPhases
class InsertMappingCompiler(insert: Insert) extends ResultConverterCompiler[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing] {
val Insert(_, table: TableNode, ProductNode(cols), _) = insert: @unchecked
val tableColumnIndexes = table.profileTable.asInstanceOf[Table[?]].create_*.zipWithIndex.toMap
def createColumnConverter(n: Node,
idx: Int,
column: Option[FieldSymbol]): ResultConverter[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing, ?] =
new InsertResultConverter(tableColumnIndexes(column.get))
class InsertResultConverter(tableIndex: Int) extends ResultConverter[QueryInterpreter.ProductValue, ArrayBuffer[Any], Nothing, Any] {
override def read(pr: QueryInterpreter.ProductValue): Nothing = ??
override def update(value: Any, pr: Nothing): Nothing = ??
override def set(value: Any, pp: ArrayBuffer[Any], offset: Int) = pp(tableIndex) = value
override def getDumpInfo = super.getDumpInfo.copy(mainInfo = s"tableIndex=$tableIndex")
def width = 1
}
}
class MemoryInsertCodeGen extends CodeGen {
override def compileServerSideAndMapping(serverSide: Node,
mapping: Option[Node],
state: CompilerState): (Node, Option[CompiledMapping]) =
(serverSide, mapping.map(new InsertMappingCompiler(serverSide.asInstanceOf[Insert]).compileMapping))
}
}
object MemoryProfile extends MemoryProfile
/** A non-streaming Action that wraps a synchronous MemoryProfile API call. */
case class SimpleMemoryAction[+R](f: HeapBackend#BasicActionContext => R)
extends SynchronousDatabaseAction[R, NoStream, HeapBackend#BasicActionContext, HeapBackend#BasicStreamingActionContext, Effect.All] {
def run(context: HeapBackend#BasicActionContext): R = f(context)
def getDumpInfo = DumpInfo(name = "SimpleMemoryAction")
}