Permalink
Browse files

Default to no string structure in SSTs when compressing.

  • Loading branch information...
wemrysi committed Sep 11, 2018
1 parent ebee52c commit a971bbcef9277ab573c37e97ed2e99f42b6cd57d
@@ -16,6 +16,7 @@
package quasar.impl.schema
import slamdata.Predef.Boolean
import quasar.api.SchemaConfig
import quasar.fp.numeric.{Natural, Positive}
@@ -24,35 +25,39 @@ import eu.timepit.refined.auto._
/** Configuration for SST-based schema, allowing for control over various aspects
* of compression.
*
* @param arrayMaxLength arrays larger than this will be compressed
* @param mapMaxSize maps larger than this will be compressed
* @param retainKeysSize the number of map keys to retain, per type, during compression
* @param stringMaxLength all strings longer than this are compressed to char[]
* @param unionMaxSize unions larger than this will be compressed
* @param arrayMaxLength arrays larger than this will be compressed
* @param mapMaxSize maps larger than this will be compressed
* @param retainKeysSize the number of map keys to retain, per type, during compression
* @param stringMaxLength all strings longer than this are compressed
* @param stringPreserveStructure whether to preserve structure when compressing strings
* @param unionMaxSize unions larger than this will be compressed
*/
final case class SstConfig[J, A](
arrayMaxLength: Natural,
mapMaxSize: Natural,
retainKeysSize: Natural,
arrayMaxLength: Natural,
mapMaxSize: Natural,
retainKeysSize: Natural,
stringMaxLength: Natural,
unionMaxSize: Positive)
stringPreserveStructure: Boolean,
unionMaxSize: Positive)
extends SchemaConfig {
type Schema = SstSchema[J, A]
}
object SstConfig {
val DefaultArrayMaxLength: Natural = 10L
val DefaultMapMaxSize: Natural = 32L
val DefaultRetainKeysSize: Natural = 0L
val DefaultStringMaxLength: Natural = 0L
val DefaultUnionMaxSize: Positive = 1L
val DefaultArrayMaxLength: Natural = 10L
val DefaultMapMaxSize: Natural = 32L
val DefaultRetainKeysSize: Natural = 0L
val DefaultStringMaxLength: Natural = 0L
val DefaultStringPreserveStructure: Boolean = false
val DefaultUnionMaxSize: Positive = 1L
def Default[J, A]: SstConfig[J, A] =
SstConfig[J, A](
arrayMaxLength = DefaultArrayMaxLength,
mapMaxSize = DefaultMapMaxSize,
retainKeysSize = DefaultRetainKeysSize,
arrayMaxLength = DefaultArrayMaxLength,
mapMaxSize = DefaultMapMaxSize,
retainKeysSize = DefaultRetainKeysSize,
stringMaxLength = DefaultStringMaxLength,
unionMaxSize = DefaultUnionMaxSize)
stringPreserveStructure = DefaultStringPreserveStructure,
unionMaxSize = DefaultUnionMaxSize)
}
@@ -68,7 +68,7 @@ package object schema {
val thresholding: ElgotCoalgebra[SST[J, A] \/ ?, SSTF[J, A, ?], SST[J, A]] = {
val independent =
orOriginal(applyTransforms(
compression.limitStrings[J, A](config.stringMaxLength)))
compression.limitStrings[J, A](config.stringMaxLength, config.stringPreserveStructure)))
compression.limitArrays[J, A](config.arrayMaxLength)
.andThen(_.bimap(_.transAna[SST[J, A]](independent), independent))
@@ -78,8 +78,8 @@ package object schema {
applyTransforms(
compression.coalesceWithUnknown[J, A](config.retainKeysSize),
compression.coalesceKeys[J, A](config.mapMaxSize, config.retainKeysSize),
compression.coalescePrimary[J, A],
compression.narrowUnion[J, A](config.unionMaxSize))
compression.coalescePrimary[J, A](config.stringPreserveStructure),
compression.narrowUnion[J, A](config.unionMaxSize, config.stringPreserveStructure))
@SuppressWarnings(Array("org.wartremover.warts.Var"))
def iterate(sst: SST[J, A]): Option[SST[J, A]] = {
@@ -45,7 +45,7 @@ final class ExtractSstSpec extends quasar.Qspec {
Show.showFromToString
val J = Fixed[J]
val config = SstConfig[J, Real](1000L, 1000L, 1000L, 1000L, 1000L)
val config = SstConfig[J, Real](1000L, 1000L, 1000L, 1000L, true, 1000L)
def verify(cfg: SstConfig[J, Real], input: List[Data], expected: S) =
Stream.emits(input)
@@ -115,6 +115,27 @@ final class ExtractSstSpec extends quasar.Qspec {
verify(config.copy(stringMaxLength = 5L), input, expected)
}
"compress long strings without structure" >> {
val input = List(
_obj(ListMap("foo" -> _str("abcdef"))),
_obj(ListMap("bar" -> _str("abcde")))
)
val expected = envT(
TypeStat.coll(Real(2), Real(1).some, Real(1).some),
TypeST(TypeF.map[J, S](IMap(
J.str("foo") -> envT(
TypeStat.coll(Real(1), Real(6).some, Real(6).some),
TypeST(TypeF.simple[J, S](SimpleType.Str))).embed,
J.str("bar") -> envT(
TypeStat.coll(Real(1), Real(5).some, Real(5).some),
TypeST(TypeF.const[J, S](J.str("abcde")))).embed
), None))).embed
verify(config.copy(stringMaxLength = 5L, stringPreserveStructure = false), input, expected)
}
"coalesce map keys until <= max size" >> {
val input = List(
_obj(ListMap("foo" -> _int(1))),
@@ -70,6 +70,7 @@ object compression {
* also appears in the union.
*/
def coalescePrimary[J: Order, A: Order: Field: ConvertableTo](
stringPreserveStructure: Boolean)(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
@@ -82,7 +83,7 @@ object compression {
grouped.minView flatMap { case (nonPrimary, m0) =>
val coalesced = nonPrimary.foldLeft(m0 map (_.suml1)) { (m, sst) =>
sst.project.primaryTag flatMap { pt =>
m.member(some(pt)) option m.adjust(some(pt), _ |+| widenConst(sst))
m.member(some(pt)) option m.adjust(some(pt), _ |+| widenConst(stringPreserveStructure)(sst))
} getOrElse m.updateAppend(none, sst)
}
coalesced.suml1Opt map (csst => envT(ts, csst.project.lower))
@@ -151,22 +152,29 @@ object compression {
other.right
}
/** Replace literal string types longer than the given limit with `char[]`. */
def limitStrings[J, A: ConvertableTo: Field: Order](maxLength: Natural)(
/** Replace literal string types longer than the given limit with `char[]`
* or `SimpleType.String` if `preserveStructure` is `false`. */
def limitStrings[J, A: ConvertableTo: Field: Order](
maxLength: Natural,
preserveStructure: Boolean)(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
: SSTF[J, A, SST[J, A]] => Option[SSTF[J, A, SST[J, A]]] =
_.some collect {
case EnvT((ts, TypeST(TypeF.Const(Embed(C(Str(s))))))) if s.length > maxLength.value =>
strings.compress[SST[J, A], J, A](ts, s)
if (preserveStructure)
strings.compress[SST[J, A], J, A](ts, s)
else
strings.simple[SST[J, A], J, A](ts)
}
/** Compress a union larger than `maxSize` by reducing the largest group of
* values sharing a primary type to their shared type.
*/
def narrowUnion[J: Order, A: Order: Field: ConvertableTo](
maxSize: Positive)(
maxSize: Positive,
stringPreserveStructure: Boolean)(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
@@ -175,7 +183,9 @@ object compression {
val grouped = xs.list groupBy (_.project.primaryTag)
val compressed = (grouped - none).toList.maximumBy(_._2.length) map {
case (pt, ssts) => grouped.insert(pt, ssts.foldMap1(widenConst[J, A]).wrapNel)
case (pt, ssts) =>
val compress1 = ssts.foldMap1(widenConst[J, A](stringPreserveStructure)).wrapNel
grouped.insert(pt, compress1)
}
compressed flatMap (_.foldMap(_.list).toNel) collect {
@@ -190,14 +200,19 @@ object compression {
}
/** Returns the SST of the primary tag of the given EJson value. */
def primarySst[J: Order, A: ConvertableTo: Field: Order](cnt: A, j: J)(
def primarySst[J: Order, A: ConvertableTo: Field: Order](
stringPreserveStructure: Boolean)(
cnt: A, j: J)(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
: SST[J, A] = j match {
case Embed(C(Str(s))) =>
case Embed(C(Str(s))) if stringPreserveStructure =>
strings.widen[J, A](cnt, s).embed
case Embed(C(Str(_))) =>
strings.simple[SST[J, A], J, A](TypeStat.fromEJson(cnt, j)).embed
case SimpleEJson(s) =>
envT(TypeStat.fromEJson(cnt, j), TypeST(TypeF.simple[J, SST[J, A]](s))).embed
@@ -208,13 +223,15 @@ object compression {
* the argument itself.
*/
def widenConst[J: Order, A: ConvertableTo: Field: Order](
stringPreserveStructure: Boolean)(
sst: SST[J, A])(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
: SST[J, A] = {
def psst(ts: TypeStat[A], j: J): SST[J, A] =
StructuralType.measure[J, TypeStat[A]].set(ts)(primarySst(ts.size, j))
StructuralType.measure[J, TypeStat[A]]
.set(ts)(primarySst(stringPreserveStructure)(ts.size, j))
sst.project match {
case ConstST(None, ts, j) =>
@@ -37,12 +37,12 @@ object strings {
/** Compresses a string into a generic char[]. */
def compress[T, J, A: ConvertableTo: Order](strStat: TypeStat[A], s: String)(
implicit
A: Field[A],
C: Corecursive.Aux[T, SSTF[J, A, ?]],
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson]
): SSTF[J, A, T] = {
implicit
A: Field[A],
C: Corecursive.Aux[T, SSTF[J, A, ?]],
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
: SSTF[J, A, T] = {
// NB: Imported here so as not to pollute outer scope given Iterable's
// pervasiveness.
import scalaz.std.iterable._
@@ -58,15 +58,19 @@ object strings {
stringTagged(strStat, C.embed(envT(strStat, TypeST(TypeF.arr(charArr)))))
}
def simple[T, J, A](strStat: TypeStat[A]): SSTF[J, A, T] =
envT(strStat, TypeST[J, T](TypeF.Simple(SimpleType.Str)))
/** Widens a string into an array of its characters.
*
* FIXME: Overly specific, define in terms of [Co]Recursive.
*/
def widen[J: Order, A: ConvertableTo: Field: Order](count: A, s: String)(
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson]
): SSTF[J, A, SST[J, A]] = {
implicit
JC: Corecursive.Aux[J, EJson],
JR: Recursive.Aux[J, EJson])
: SSTF[J, A, SST[J, A]] = {
val charArr =
SST.fromEJson(count, EJson.arr(s.map(EJson.char[J](_)) : _*))
@@ -153,21 +153,21 @@ final class CompressionSpec extends quasar.Qspec
"coalescePrimary" >> {
"combines consts with their primary SST in unions" >> prop { (sj: LeafEjs, sjs: ISet[LeafEjs]) =>
val pt = primaryTagOf(sj.ejs)
val primarySst = compression.primarySst(Real(1), sj.ejs)
val primarySst = compression.primarySst(true)(Real(1), sj.ejs)
val leafs = sjs.insert(sj).toIList
val ssts = leafs.map(_.toSst)
val (matching, nonmatching) =
leafs.partition(l => primaryTagOf(l.ejs) ≟ pt)
val simplified = matching.map(l => compression.primarySst(Real(1), l.ejs))
val simplified = matching.map(l => compression.primarySst(true)(Real(1), l.ejs))
val coalesced = NonEmptyList.nel(primarySst, simplified).suml1
val compressed =
attemptCompression(
NonEmptyList.nel(primarySst, ssts).suml1,
compression.coalescePrimary)
compression.coalescePrimary(true))
compressed must_= NonEmptyList.nel(coalesced, nonmatching map (_.toSst)).suml1
}
@@ -182,12 +182,12 @@ final class CompressionSpec extends quasar.Qspec
val union = envT(cnt, TypeST(TypeF.union[J, S](y, ys.head, ys.tail))).embed
val sum = (y <:: ys).suml1
attemptCompression(union, compression.coalescePrimary) must_= sum
attemptCompression(union, compression.coalescePrimary(true)) must_= sum
}
"no effect when a const's primary tag not in the union" >> prop { ljs: NonEmptyList[LeafEjs] =>
val sum = ljs.foldMap1(_.toSst)
attemptCompression(sum, compression.coalescePrimary) must_= sum
attemptCompression(sum, compression.coalescePrimary(true)) must_= sum
}
}
@@ -352,11 +352,22 @@ final class CompressionSpec extends quasar.Qspec
val str = SST.fromEJson(Real(1), J.str(s))
val arr = strings.compress[S, J, Real](str.copoint, s).embed
val req = attemptCompression(str, compression.limitStrings(plen))
val rlt = attemptCompression(str, compression.limitStrings(lt))
val req = attemptCompression(str, compression.limitStrings(plen, true))
val rlt = attemptCompression(str, compression.limitStrings(lt, true))
(req must_= str) and (rlt must_= arr)
}}
"compresses to simple type when preserve structure is 'false'" >> prop { s: String => (s.length > 1) ==> {
val lt: Natural = Natural((s.length - 1).toLong) getOrElse 0L
val str = SST.fromEJson(Real(1), J.str(s))
val smp = strings.simple[S, J, Real](str.copoint).embed
val rlt = attemptCompression(str, compression.limitStrings(lt, false))
rlt must_= smp
}}
}
"narrowUnion" >> {
@@ -376,7 +387,7 @@ final class CompressionSpec extends quasar.Qspec
val union0 = NonEmptyList.nel(dec, strs ::: chars).suml1
val union1 = envT(union0.copoint, TypeST(TypeF.union[J, S](compChar, dec, strs))).embed
attemptCompression(union0, compression.narrowUnion(3L)) must_= union1
attemptCompression(union0, compression.narrowUnion(3L, true)) must_= union1
}}
"no effect on unions smaller or equal to maxSize" >> prop {
@@ -385,7 +396,7 @@ final class CompressionSpec extends quasar.Qspec
val union = envT(cnt1, TypeST(TypeF.union[J, S](x.toSst, y.toSst, xs map (_.toSst)))).embed
Positive((xs.length + 2).toLong).cata(
l => attemptCompression(union, compression.narrowUnion(l)),
l => attemptCompression(union, compression.narrowUnion(l, true)),
union
) must_= union
}

0 comments on commit a971bbc

Please sign in to comment.