-
Notifications
You must be signed in to change notification settings - Fork 392
Expand file tree
/
Copy pathReprsOf.scala
More file actions
410 lines (334 loc) · 16.6 KB
/
ReprsOf.scala
File metadata and controls
410 lines (334 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package polynote.runtime
import java.io.DataOutput
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import polynote.runtime
import scala.collection.{GenSeq, mutable}
import scala.concurrent.Future
import scala.util.Success
trait ReprsOf[T] extends Serializable {
def apply(value: T): Array[ValueRepr]
}
object ReprsOf extends ExpandedScopeReprs {
// If a data value is larger than 1 MiB, we'll make it lazy so it doesn't get spammed to the client
// In the future we could make this configurable by the client.
private val EagerSizeThreshold = 1024 * 1024
def instance[T](reprs: T => Array[ValueRepr]): ReprsOf[T] = new ReprsOf[T] {
def apply(value: T): Array[ValueRepr] = reprs(value)
}
abstract class DataReprsOf[T](val dataType: DataType) extends ReprsOf[T] {
val encode: T => ByteBuffer
}
class StrictDataReprsOf[T](dataType: DataType, val encode: T => ByteBuffer) extends DataReprsOf[T](dataType) {
def apply(t: T): Array[ValueRepr] = try {
Array(DataRepr(dataType, encode(t)))
} catch {
case err: Throwable => Array()
}
}
object DataReprsOf {
def apply[T](dataType: DataType)(encode: T => ByteBuffer): DataReprsOf[T] = new StrictDataReprsOf(dataType, encode)
implicit val byte: DataReprsOf[Byte] = DataReprsOf(ByteType)(byte => ByteBuffer.wrap(Array(byte)))
implicit val boolean: DataReprsOf[Boolean] = DataReprsOf(BoolType)(bool => ByteBuffer.wrap(Array(if (bool) 1.toByte else 0.toByte)))
implicit val short: DataReprsOf[Short] = DataReprsOf(ShortType)(short => ByteBuffer.wrap(Array((0xFF & (short >> 8)).toByte, (0xFF & short).toByte)))
implicit val int: DataReprsOf[Int] = DataReprsOf(IntType)(intToBuf)
implicit val long: DataReprsOf[Long] = DataReprsOf(LongType)(longToBuf)
implicit val float: DataReprsOf[Float] = DataReprsOf(FloatType)(f => intToBuf(java.lang.Float.floatToIntBits(f)))
implicit val double: DataReprsOf[Double] = DataReprsOf(DoubleType)(d => longToBuf(java.lang.Double.doubleToLongBits(d)))
implicit val string: DataReprsOf[String] = DataReprsOf(StringType) {
str =>
val bytes = str.getBytes(StandardCharsets.UTF_8)
val buf = ByteBuffer.allocate(bytes.length + 4)
buf.putInt(bytes.length)
buf.put(bytes)
buf.rewind()
buf
}
implicit val byteArray: DataReprsOf[Array[Byte]] = DataReprsOf(BinaryType)(ByteBuffer.wrap)
implicit def fromDataEncoder[T](implicit dataEncoder: DataEncoder[T]): DataReprsOf[T] = new DataReprsOf[T](dataEncoder.dataType) {
val encode: T => ByteBuffer = t => DataEncoder.writeSized(t)
override def apply(value: T): Array[ValueRepr] = {
val stringRepr = StringRepr(dataEncoder.encodeDisplayString(value))
dataEncoder.sizeOf(value) match {
case s if s >= 0 && s <= EagerSizeThreshold => Array(stringRepr, DataRepr(dataType, DataEncoder.writeSized(value, s)))
case s if s >= 0 => Array(stringRepr, LazyDataRepr(dataType, DataEncoder.writeSized(value, s), Some(s))) // writeSized is suspended byname
case _ => Array(stringRepr, LazyDataRepr(dataType, DataEncoder.writeSized(value), None)) // writeSized is suspended byname
}
}
}
}
private[runtime] trait ExpandedScopeDataReprs { self: DataReprsOf.type =>
implicit def expanded[T]: DataReprsOf[T] = macro macros.ExpandedScopeMacros.resolveFromScope
}
@inline private def intToBuf(int: Int) =
ByteBuffer.wrap(Array((0xFF & (int >> 24)).toByte, (0xFF & (int >> 16)).toByte, (0xFF & (int >> 8)).toByte, (0xFF & int).toByte))
@inline private def longToBuf(long: Long) =
ByteBuffer.wrap(
Array(
(0xFF & (long >> 56)).toByte, (0xFF & (long >> 48)).toByte, (0xFF & (long >> 40)).toByte, (0xFF & (long >> 32)).toByte,
(0xFF & (long >> 24)).toByte, (0xFF & (long >> 16)).toByte, (0xFF & (long >> 8)).toByte, (0xFF & long).toByte))
val empty: ReprsOf[Any] = instance(_ => Array.empty)
implicit val polynoteRuntime: ReprsOf[Runtime.type] = {
instance {
r =>
val html =
s"""<div class="object-display server-info">
| <span class="field-name">Server Version</span><span class="string">${r.version}</span></br>
| <span class="field-name">Server Commit</span><span class="string">${r.commit}</span></br>
|</div>
""".stripMargin
Array(MIMERepr("text/html", html))
}
}
}
private[runtime] trait ExpandedScopeReprs extends CollectionReprs { self: ReprsOf.type =>
implicit def expanded[T]: ReprsOf[T] = macro macros.ExpandedScopeMacros.resolveFromScope
}
private[runtime] trait CollectionReprs extends FromDataReprs { self: ReprsOf.type =>
implicit def structSeq[F[X] <: Seq[X], A](implicit structEncoder: DataEncoder.StructDataEncoder[A]): ReprsOf[F[A]] =
instance(seq => Array(StreamingDataRepr.fromHandle(new StructSeqStreamHandle[A, A](_, seq, identity, structEncoder))))
implicit def numericSeq[F[X] <: Seq[X], A : Numeric](implicit dataEncoder: DataEncoder[A]): ReprsOf[F[A]] = instance {
seq => Array(StreamingDataRepr.fromHandle(new StructSeqStreamHandle[A, (Int, A)](_, seq, _.zipWithIndex.map(_.swap), DataEncoder.StructDataEncoder.forScalar(dataEncoder))))
}
implicit def seq[F[X] <: GenSeq[X], A](implicit dataReprsOfA: DataReprsOf[A]): ReprsOf[F[A]] = instance {
seq => Array(StreamingDataRepr(dataReprsOfA.dataType, seq.size, seq.iterator.map(dataReprsOfA.encode)))
}
implicit def numericArray[A : Numeric](implicit dataEncoder: DataEncoder[A]): ReprsOf[Array[A]] = instance {
seq => Array(StreamingDataRepr.fromHandle(new StructSeqStreamHandle[A, (Int, A)](_, seq, _.zipWithIndex.map(_.swap), DataEncoder.StructDataEncoder.forScalar(dataEncoder))))
}
implicit def array[A](implicit dataReprsOfA: DataReprsOf[A]): ReprsOf[Array[A]] = instance {
arr => Array(StreamingDataRepr(dataReprsOfA.dataType, arr.length, arr.iterator.map(dataReprsOfA.encode)))
}
implicit def future[A](implicit dataReprsOfA: DataReprsOf[A]): ReprsOf[Future[A]] = instance {
fut =>
val repr = UpdatingDataRepr(dataReprsOfA.dataType)
fut.onComplete {
case Success(a) => repr.tryUpdate(dataReprsOfA.encode(a))
case _ =>
}(scala.concurrent.ExecutionContext.global)
Array(repr)
}
private case class PendingHandle[A, B](data: Seq[A], transform: Seq[A] => Seq[B], enc: DataEncoder.StructDataEncoder[B])
extends (Int => StructSeqStreamHandle[A, B]) {
def apply(handle: Int): StructSeqStreamHandle[A, B] = StructSeqStreamHandle(handle, data, transform, enc)
private trait Aggregator[T] {
def accumulate(value: B): Unit
def summarize(): T
def encoder: DataEncoder[T]
def resultName: String
}
private class QuartileAggregator(name: String, getter: B => Double) extends Aggregator[Quartiles] {
private val values = new Array[Double](data.size)
private var index = 0
private var mean = 0.0
override def accumulate(value: B): Unit = {
val x = getter(value)
values(index) = x
index += 1
val delta = x - mean
mean += delta / index
}
override def summarize(): Quartiles = {
java.util.Arrays.sort(values, 0, index)
val quarter = index >> 2
Quartiles(
values(0),
values(quarter),
values(index >> 1),
mean,
values(index - quarter),
values(index - 1)
)
}
val encoder: DataEncoder[Quartiles] = Quartiles.dataEncoder
val resultName: String = s"quartiles($name)"
}
private class SumAggregator(name: String, getter: B => Double) extends Aggregator[Double] {
private var sum = 0.0
override def accumulate(value: B): Unit = sum += getter(value)
override def summarize(): Double = sum
val encoder: DataEncoder[Double] = DataEncoder.double
val resultName: String = s"sum($name)"
}
private class CountAggregator(name: String) extends Aggregator[Long] {
private var count = 0L
override def accumulate(value: B): Unit = count += 1
override def summarize(): Long = count
val encoder: DataEncoder[Long] = DataEncoder.long
val resultName: String = s"count($name)"
}
private class CountDistinctAggregator(name: String, approx: Boolean) extends Aggregator[Long] {
private val seenValues = new mutable.HashSet[B]()
override def accumulate(value: B): Unit = seenValues += value
override def summarize(): Long = seenValues.size
val encoder: DataEncoder[Long] = DataEncoder.long
val resultName: String = if (approx) s"approx_count_distinct($name)" else s"count_distinct($name)"
}
private class MeanAggregator(name: String, getter: B => Double) extends Aggregator[Double] {
private var count = 0
private var mean = 0.0
private var sumSquaredDiffs = 0.0
override def accumulate(value: B): Unit = {
val x = getter(value)
count += 1
val delta = x - mean
mean += delta / count
}
override def summarize(): Double = mean
val encoder: DataEncoder[Double] = DataEncoder.double
val resultName: String = s"mean($name)"
}
private def aggregate(col: String, aggName: String): Aggregator[_] = {
def numericEncoder = enc.field(col) match {
case Some((getter, colEnc)) if colEnc.numeric.nonEmpty =>
val numeric = (colEnc.numeric.get.toDouble(_)).asInstanceOf[Any => Double]
getter andThen numeric
case Some(_) => throw new IllegalArgumentException(s"Field $col is not numeric; cannot compute $aggName")
case None => throw new IllegalArgumentException(s"No field $col in struct")
}
aggName match {
case "quartiles" => new QuartileAggregator(col, numericEncoder)
case "sum" => new SumAggregator(col, numericEncoder)
case "count" => new CountAggregator(col)
case "count_distinct" => new CountDistinctAggregator(col, false)
case "approx_count_distinct" => new CountDistinctAggregator(col, true)
case "mean" => new MeanAggregator(col, numericEncoder)
case _ => throw new IllegalArgumentException(s"No aggregation $aggName available")
}
}
// TODO: this needs a refactor
private def applyOp(op: TableOp): PendingHandle[A, _] = op match {
case GroupAgg(cols, aggs) if cols.nonEmpty =>
val groupingFields = cols.map {
col => enc.field(col).getOrElse(throw new IllegalArgumentException(s"No field $col in struct"))
}
val getters = groupingFields.map(_._1)
val (aggregateResultTypes, aggregateEncoders) = aggs.map {
case (col, aggName) =>
val a = aggregate(col, aggName)
(a.resultName -> a.encoder.dataType, a.encoder.asInstanceOf[DataEncoder[Any]])
}.unzip
val groupTransform = (bs: Seq[B]) => bs.groupBy(b => getters.map(_.apply(b))).toSeq.map {
case (groupCols, group) =>
val aggregators = aggs.map {
case (col, aggName) => aggregate(col, aggName)
}
group.foreach {
b => aggregators.foreach {
agg => agg.accumulate(b)
}
}
val aggregates = aggregators.map(_.summarize())
(groupCols ::: aggregates).toArray
}
val groupedType = StructType(
(cols.zip(groupingFields.map(_._2.dataType)) ++ aggregateResultTypes)
.map((StructField.apply _).tupled))
val groupedEncoders = groupingFields.map(_._2.asInstanceOf[DataEncoder[Any]]) ++ aggregateEncoders
val groupedEncoder = new runtime.DataEncoder.StructDataEncoder[Array[Any]](groupedType) {
def field(name: String): Option[(Array[Any] => Any, DataEncoder[_])] = {
groupedType.fields.indexWhere(_.name == name) match {
case -1 => None
case index => Some((arr => arr(index), groupedEncoders(index)))
}
}
def encode(dataOutput: DataOutput, value: Array[Any]): Unit = {
val encs = groupedEncoders.iterator
var i = 0
while (i < value.length) {
encs.next().encode(dataOutput, value(i))
i += 1
}
}
def sizeOf(t: Array[Any]): Int = {
val encs = groupedEncoders.iterator
var size = encs.next().sizeOf(t(0))
var i = 1
while (i < t.length) {
size = DataEncoder.combineSize(size, encs.next().sizeOf(t(i)))
i += 1
}
size
}
}
copy[A, Array[Any]](transform = transform andThen groupTransform, enc = groupedEncoder)
case QuantileBin(col, binCount, err) =>
??? // TODO
case Select(cols) =>
val prevFields = enc.dataType.fields.map(field => field.name -> field).toMap
val (fieldEncoders, fieldSizes) = cols
.map(col => enc.field(col).getOrElse(throw new UnsupportedOperationException(s"$col cannot be selected (its parent encoder cannot encode it separately)")))
.map {
case (extractor, encoder) =>
val castEncoder = encoder.asInstanceOf[DataEncoder[Any]]
val encodeFn = (output: DataOutput, value: B) => castEncoder.encode(output, extractor(value))
val sizeFn = (value: B) => castEncoder.sizeOf(extractor(value))
(encodeFn, sizeFn)
}.unzip
val dataType = StructType(cols.map(prevFields))
val encoder = new DataEncoder.StructDataEncoder[B](
dataType
) {
override def field(name: String): Option[(B => Any, DataEncoder[_])] = enc.field(name)
override def encode(dataOutput: DataOutput, value: B): Unit =
fieldEncoders.foreach(_.apply(dataOutput, value))
override def sizeOf(t: B): Int =
fieldSizes.map(_.apply(t)).sum
}
copy(enc = encoder)
case Sample(sampleRate) =>
val sampled = transform andThen {
seq => seq.filter(_ => scala.util.Random.nextDouble() <= sampleRate)
}
copy(transform = sampled)
case SampleN(n) =>
val sampleRate = n.toDouble / data.size
val sampled = transform andThen {
seq => seq.filter(_ => scala.util.Random.nextDouble() <= sampleRate)
}
copy(transform = sampled)
case Histogram(field, binCount) =>
val (getField, fieldEncoder) = enc.field(field).getOrElse(throw new IllegalArgumentException(s"Field $field does not exist in the schema"))
val fieldNumeric = fieldEncoder.numeric
.getOrElse(throw new IllegalArgumentException(s"Field $field is not numeric"))
.asInstanceOf[Numeric[Any]]
// TODO: this is a pretty inefficient implementation
val mkHistogram: Seq[B] => Seq[HistogramBin] = {
bs =>
val values = bs.map(b => fieldNumeric.toDouble(getField(b)))
val min = values.min
val max = values.max
val binWidth = (max - min) / binCount;
val boundaries = (0 until binCount).map(_ * binWidth + min) :+ max
val binned = values.groupBy {
value => math.floor((value - min) / binWidth).toInt // TODO: this isn't very accurate. Better to search boundaries instead?
}.mapValues(_.size)
boundaries.sliding(2, 1).zipWithIndex.toSeq.map {
case (Seq(start, end), index) =>
val count = binned.getOrElse(index, 0)
HistogramBin(start, end, count)
}
}
copy(transform = transform andThen mkHistogram, enc = HistogramBin.encoder)
}
def modify(ops: List[TableOp]): Either[Throwable, Int => StreamingDataRepr.Handle] = {
try {
Right(ops.foldLeft(this.asInstanceOf[PendingHandle[A, Any]]) {
(accum, op) => accum.applyOp(op).asInstanceOf[PendingHandle[A, Any]]
})
} catch {
case err: Throwable => Left(err)
}
}
}
private[runtime] case class StructSeqStreamHandle[A, B](handle: Int, data: Seq[A], transform: Seq[A] => Seq[B], enc: DataEncoder.StructDataEncoder[B]) extends StreamingDataRepr.Handle {
def dataType: DataType = enc.dataType
lazy val knownSize: Option[Int] = if (data.hasDefiniteSize) Some(data.size) else None
def iterator: Iterator[ByteBuffer] = transform(data).iterator.map(b => DataEncoder.writeSized[B](b)(enc))
def modify(ops: List[TableOp]): Either[Throwable, Int => StreamingDataRepr.Handle] =
PendingHandle(data, transform, enc).modify(ops)
}
}
private[runtime] trait FromDataReprs { self: ReprsOf.type =>
implicit def fromDataReprs[T](implicit dataReprsOfT: DataReprsOf[T]): ReprsOf[T] = dataReprsOfT
}