Skip to content

Commit

Permalink
Drop FastEval (#2622)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Jan 23, 2020
1 parent 189a339 commit df4a0bf
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 54 deletions.
42 changes: 1 addition & 41 deletions scio-core/src/main/scala/com/spotify/scio/schemas/To.scala
Expand Up @@ -23,7 +23,6 @@ import org.apache.beam.sdk.schemas.{Schema => BSchema, SchemaCoder}
import scala.collection.JavaConverters._
import scala.language.experimental.macros
import scala.annotation.tailrec
import scala.tools.reflect.ToolBox

sealed trait To[I, O] extends (SCollection[I] => SCollection[O]) with Serializable {
def coder: Coder[O]
Expand Down Expand Up @@ -221,45 +220,6 @@ object To {
macro ToMacro.safeImpl[I, O]
}

import scala.reflect.macros._
import reflect.runtime.{universe => u}

final private[scio] class FastEval(evalToolBox: ToolBox[u.type]) {
def eval[T](ctx: blackbox.Context)(expr: ctx.Expr[T]): T = {
import ctx.universe._

val evalImporter =
u.internal
.createImporter(ctx.universe)
.asInstanceOf[u.Importer { val from: ctx.universe.type }]

expr.tree match {
case Literal(Constant(value)) =>
ctx.eval[T](expr)
case _ =>
val imported = evalImporter.importTree(expr.tree)
evalToolBox.eval(imported).asInstanceOf[T]
}
}
}

/**
* Provide faster evaluation of tree by reusing the same toolbox
* This is faster bc. resusing the same toolbox also meas reusing the same classloader,
* which saves disks IOs since we don't load the same classes from disk multiple times
*/
final private[scio] object FastEval {
import scala.tools.reflect._

private lazy val tl: ToolBox[u.type] = {
val evalMirror = scala.reflect.runtime.currentMirror
evalMirror.mkToolBox()
}

def apply[T](ctx: blackbox.Context)(expr: ctx.Expr[T]): T =
new FastEval(tl).eval(ctx)(expr)
}

object ToMacro {
import scala.reflect.macros._
def safeImpl[I: c.WeakTypeTag, O: c.WeakTypeTag](
Expand All @@ -273,7 +233,7 @@ object ToMacro {
val tpeO = weakTypeOf[O]

val expr = c.Expr[(Schema[I], Schema[O])](q"(${untyped(iSchema)}, ${untyped(oSchema)})")
val (sIn, sOut) = FastEval(c)(expr)
val (sIn, sOut) = c.eval(expr)

val schemaOut: BSchema = SchemaMaterializer.fieldType(sOut).getRowSchema()
val schemaIn: BSchema = SchemaMaterializer.fieldType(sIn).getRowSchema()
Expand Down
2 changes: 1 addition & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query1.scala
Expand Up @@ -80,7 +80,7 @@ object Query1 {
assertConcrete[B](c)

val (sIn, sOut) =
FastEval(c)(c.Expr[(Schema[A], Schema[B])](q"(${untyped(iSchema)}, ${untyped(oSchema)})"))
c.eval(c.Expr[(Schema[A], Schema[B])](q"(${untyped(iSchema)}, ${untyped(oSchema)})"))

val sq = Query1[A, B](cons(c)(query), tupleTag(c)(aTag))
typecheck(sq)(sIn, sOut)
Expand Down
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query10.scala
Expand Up @@ -178,7 +178,7 @@ object Query10 {
schemas10,
schemas11
) =
FastEval(c)(
c.eval(
c.Expr[
(
Schema[A],
Expand Down Expand Up @@ -258,6 +258,7 @@ final class SqlSCollection10[
i: SCollection[I],
j: SCollection[J]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -303,6 +304,7 @@ final class SqlSCollection10[
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName} join ${collF.tfName} join ${collG.tfName} join ${collH.tfName} join ${collI.tfName} join ${collJ.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand Down Expand Up @@ -341,4 +343,5 @@ final class SqlSCollection10[
case e: ParseException =>
Query10.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query2.scala
Expand Up @@ -80,7 +80,7 @@ object Query2 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3) =
FastEval(c)(
c.eval(
c.Expr[(Schema[A], Schema[B], Schema[R])](
q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(rSchema)})"
)
Expand All @@ -96,6 +96,7 @@ object Query2 {
}

final class SqlSCollection2[A: Schema, B: Schema](a: SCollection[A], b: SCollection[B]) {

def query(q: String, aTag: TupleTag[A], bTag: TupleTag[B], udfs: Udf*): SCollection[Row] =
query(Query2(q, aTag, bTag, udfs.toList))

Expand All @@ -109,6 +110,7 @@ final class SqlSCollection2[A: Schema, B: Schema](a: SCollection[A], b: SCollect
.of(q.aTag, collA.internal)
.and(q.bTag, collB.internal)
.apply(s"${collA.tfName} join ${collB.tfName}", sqlTransform)

}

def queryAs[R: Schema](
Expand All @@ -126,4 +128,5 @@ final class SqlSCollection2[A: Schema, B: Schema](a: SCollection[A], b: SCollect
case e: ParseException =>
Query2.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query3.scala
Expand Up @@ -90,7 +90,7 @@ object Query3 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4) =
FastEval(c)(
c.eval(
c.Expr[(Schema[A], Schema[B], Schema[C], Schema[R])](
q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(cSchema)}, ${untyped(rSchema)})"
)
Expand All @@ -114,6 +114,7 @@ final class SqlSCollection3[A: Schema, B: Schema, C: Schema](
b: SCollection[B],
c: SCollection[C]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand All @@ -135,6 +136,7 @@ final class SqlSCollection3[A: Schema, B: Schema, C: Schema](
.and(q.bTag, collB.internal)
.and(q.cTag, collC.internal)
.apply(s"${collA.tfName} join ${collB.tfName} join ${collC.tfName}", sqlTransform)

}

def queryAs[R: Schema](
Expand All @@ -153,4 +155,5 @@ final class SqlSCollection3[A: Schema, B: Schema, C: Schema](
case e: ParseException =>
Query3.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query4.scala
Expand Up @@ -100,7 +100,7 @@ object Query4 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4, schemas5) =
FastEval(c)(
c.eval(
c.Expr[(Schema[A], Schema[B], Schema[C], Schema[D], Schema[R])](
q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(cSchema)}, ${untyped(dSchema)}, ${untyped(rSchema)})"
)
Expand Down Expand Up @@ -130,6 +130,7 @@ final class SqlSCollection4[A: Schema, B: Schema, C: Schema, D: Schema](
c: SCollection[C],
d: SCollection[D]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -157,6 +158,7 @@ final class SqlSCollection4[A: Schema, B: Schema, C: Schema, D: Schema](
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand All @@ -176,4 +178,5 @@ final class SqlSCollection4[A: Schema, B: Schema, C: Schema, D: Schema](
case e: ParseException =>
Query4.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query5.scala
Expand Up @@ -107,7 +107,7 @@ object Query5 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4, schemas5, schemas6) =
FastEval(c)(
c.eval(
c.Expr[(Schema[A], Schema[B], Schema[C], Schema[D], Schema[E], Schema[R])](
q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(cSchema)}, ${untyped(dSchema)}, ${untyped(eSchema)}, ${untyped(rSchema)})"
)
Expand Down Expand Up @@ -139,6 +139,7 @@ final class SqlSCollection5[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
d: SCollection[D],
e: SCollection[E]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -169,6 +170,7 @@ final class SqlSCollection5[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand All @@ -190,4 +192,5 @@ final class SqlSCollection5[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
case e: ParseException =>
Query5.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query6.scala
Expand Up @@ -114,7 +114,7 @@ object Query6 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4, schemas5, schemas6, schemas7) =
FastEval(c)(
c.eval(
c.Expr[(Schema[A], Schema[B], Schema[C], Schema[D], Schema[E], Schema[F], Schema[R])](
q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(cSchema)}, ${untyped(dSchema)}, ${untyped(
eSchema
Expand Down Expand Up @@ -150,6 +150,7 @@ final class SqlSCollection6[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
e: SCollection[E],
f: SCollection[F]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -183,6 +184,7 @@ final class SqlSCollection6[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName} join ${collF.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand All @@ -205,4 +207,5 @@ final class SqlSCollection6[A: Schema, B: Schema, C: Schema, D: Schema, E: Schem
case e: ParseException =>
Query6.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query7.scala
Expand Up @@ -128,7 +128,7 @@ object Query7 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4, schemas5, schemas6, schemas7, schemas8) =
FastEval(c)(
c.eval(
c.Expr[
(Schema[A], Schema[B], Schema[C], Schema[D], Schema[E], Schema[F], Schema[G], Schema[R])
](q"(${untyped(aSchema)}, ${untyped(bSchema)}, ${untyped(cSchema)}, ${untyped(dSchema)}, ${untyped(
Expand Down Expand Up @@ -174,6 +174,7 @@ final class SqlSCollection7[
f: SCollection[F],
g: SCollection[G]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -210,6 +211,7 @@ final class SqlSCollection7[
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName} join ${collF.tfName} join ${collG.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand All @@ -233,4 +235,5 @@ final class SqlSCollection7[
case e: ParseException =>
Query7.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query8.scala
Expand Up @@ -146,7 +146,7 @@ object Query8 {
assertConcrete[R](c)

val (schemas1, schemas2, schemas3, schemas4, schemas5, schemas6, schemas7, schemas8, schemas9) =
FastEval(c)(
c.eval(
c.Expr[
(
Schema[A],
Expand Down Expand Up @@ -214,6 +214,7 @@ final class SqlSCollection8[
g: SCollection[G],
h: SCollection[H]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -253,6 +254,7 @@ final class SqlSCollection8[
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName} join ${collF.tfName} join ${collG.tfName} join ${collH.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand All @@ -277,4 +279,5 @@ final class SqlSCollection8[
case e: ParseException =>
Query8.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
5 changes: 4 additions & 1 deletion scio-sql/src/main/scala/com/spotify/scio/sql/Query9.scala
Expand Up @@ -166,7 +166,7 @@ object Query9 {
schemas9,
schemas10
) =
FastEval(c)(
c.eval(
c.Expr[
(
Schema[A],
Expand Down Expand Up @@ -241,6 +241,7 @@ final class SqlSCollection9[
h: SCollection[H],
i: SCollection[I]
) {

def query(
q: String,
aTag: TupleTag[A],
Expand Down Expand Up @@ -283,6 +284,7 @@ final class SqlSCollection9[
s"${collA.tfName} join ${collB.tfName} join ${collC.tfName} join ${collD.tfName} join ${collE.tfName} join ${collF.tfName} join ${collG.tfName} join ${collH.tfName} join ${collI.tfName}",
sqlTransform
)

}

def queryAs[R: Schema](
Expand Down Expand Up @@ -319,4 +321,5 @@ final class SqlSCollection9[
case e: ParseException =>
Query9.typecheck(q).fold(err => throw new RuntimeException(err, e), _ => throw e)
}

}
Expand Up @@ -26,6 +26,7 @@ import com.spotify.scio.schemas.Schema
import org.apache.beam.sdk.values.TupleTag

object SQLBuilders {

private[sql] def from[A](
q: String,
refA: SCollectionRef[A],
Expand Down Expand Up @@ -401,6 +402,7 @@ object SQLBuilders {
udfs: List[Udf]
): SQLBuilder =
l match {

case (refA, aTag) :: Nil =>
from[refA._A](q, refA, aTag, udfs)

Expand Down
Expand Up @@ -22,12 +22,10 @@

package com.spotify.scio.sql

import com.spotify.scio.annotations.experimental
import com.spotify.scio.schemas._
import com.spotify.scio.values.SCollection

trait SqlSCollections {
@experimental
def from[A: Schema](a: SCollection[A]): SqlSCollection1[A] = new SqlSCollection1(a)
def from[A: Schema, B: Schema](a: SCollection[A], b: SCollection[B]): SqlSCollection2[A, B] =
new SqlSCollection2(a, b)
Expand Down
2 changes: 1 addition & 1 deletion scripts/sql.py
Expand Up @@ -204,7 +204,7 @@ def typed{n}Impl[{weak_bounds}, R: c.WeakTypeTag](c: blackbox.Context)(query: c.
assertConcrete[R](c)
val ({schema_tuple_vals}, schemas{n_p}) =
FastEval(c)(
c.eval(
c.Expr[({schemas}, Schema[R])](
q"({infer_schemas}, ${{untyped(rSchema)}})"))
Expand Down

0 comments on commit df4a0bf

Please sign in to comment.