Skip to content

Commit

Permalink
simplify depth extraction in ExploreRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
tribbloid committed Feb 8, 2024
1 parent ab9e99b commit 8a8feff
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,23 @@ object ExplorePlan {
executionID: ExeID = nextExeID()
) {

lazy val effectiveRange: Range = {
@transient lazy val effectiveRange: Range = {
require(range.min >= -1, "explore range cannot be lower than -1")
range
}

lazy val maxRange: Int = effectiveRange.max
lazy val minRange: Int = effectiveRange.min
@transient lazy val maxRange: Int = effectiveRange.max
@transient lazy val minRange: Int = effectiveRange.min

@transient lazy val depth_++ : Alias.Impl[FR, Int] = {
Get(depthField)
.typed[Int]
.andMap(_ + 1)
.orElse(
Lit(0)
)
.withAlias(depthField.!!)
}

lazy val includeStateBeforeExplore: Boolean = effectiveRange.contains(-1)
}
Expand Down Expand Up @@ -96,12 +106,14 @@ case class ExplorePlan(
)
}

val depth_0: Resolved[Int] = resolver.include(Lit(0) withAlias _effectiveParams.depthField).head
val depth_++ : Resolved[Int] = resolver
.include(
Get(_effectiveParams.depthField).typed[Int].andMap(_ + 1) withAlias _effectiveParams.depthField.!!
)
.head
// val depth_0: Resolved[Int] = resolver.include(Lit(0) withAlias _effectiveParams.depthField).head
// val depth_++ : Resolved[Int] = resolver
// .include(
// Get(_effectiveParams.depthField).typed[Int].andMap(_ + 1) withAlias _effectiveParams.depthField.!!
// )
// .head

resolver.includeTyped(TypedField(_effectiveParams.depthField, IntegerType)).head

resolver.includeTyped(TypedField(_effectiveParams.ordinalField, ArrayType(IntegerType))).head

Expand Down Expand Up @@ -177,7 +189,7 @@ case class ExplorePlan(
openSetAcc.reset

val stateRDD_+ : RDD[(LocalityGroup, State)] = stateRDD.mapPartitions { itr =>
val runner = ExploreRunner(itr, impl, sameBy, depth_0, depth_++)
val runner = ExploreRunner(itr, impl, sameBy)
val state_+ = runner.run(
_on,
sampler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import scala.collection.{mutable, MapView}
case class ExploreRunner(
partition: Iterator[(LocalityGroup, State)],
pathPlanningImpl: PathPlanning.Impl,
sameBy: Trace => Any,
depth0: Resolved[Int],
`depth_++`: Resolved[Int]
sameBy: Trace => Any
) extends NOTSerializable {

import dsl._
Expand Down Expand Up @@ -90,21 +88,17 @@ case class ExploreRunner(
.nextOption()
.map { row =>
row
.withCtx(spooky)
.extract(depth0)
}
.getOrElse {

val selected: (LocalityGroup, Vector[DataRow]) = pathPlanningImpl.selectNextOpen(open)
val withDepth = SquashedRow(AgentState(selected._1), selected._2.map(_.withEmptyScope))
val withLineage = SquashedRow(AgentState(selected._1), selected._2.map(_.withEmptyScope))
.withCtx(spooky)
.resetScope
.withCtx(spooky)
.extract(depth_++)
.withLineageIDs

// val transformed = delta.fn(withDepth)
withDepth
withLineage
}
row
}
Expand All @@ -116,6 +110,8 @@ case class ExploreRunner(
selectedRow
}

lazy val _depth_++ : Resolved[Int] = pathPlanningImpl.schema.newResolver.include(depth_++).head

protected def executeOnce(
forkExpr: Resolved[Any],
sampler: Sampler[Any],
Expand All @@ -133,11 +129,14 @@ case class ExploreRunner(

this.fetchingInProgressOpt = Some(selectedRow.group)

val transformed = delta.fn(selectedRow)
val deltaApplied = delta
.fn(selectedRow)
.withCtx(spooky)
.extract(_depth_++)

{
// commit transformed data into visited
val data = transformed.dataRows.map(_.self).toVector
val data = deltaApplied.dataRows.map(_.self).toVector

val inRange: Vector[DataRow] = data.flatMap { row =>
val depth = row.getInt(depthField).getOrElse(Int.MaxValue)
Expand All @@ -151,11 +150,11 @@ case class ExploreRunner(
} else None
}

Commit(transformed.group, inRange).intoVisited()
Commit(deltaApplied.group, inRange).intoVisited()
}

{
val forked = transformed
val forked = deltaApplied
.withCtx(spooky)
.extract(forkExpr)
.explodeData(forkExpr.field, ordinalField, forkType, sampler)
Expand All @@ -178,20 +177,10 @@ case class ExploreRunner(

// this will be used to filter dataRows yield by the next fork, it will not affect current transformation

val filtered = grouped
// .mapValues { v =>
// val inRange = v.flatMap { dataRow =>
// val depth = dataRow.getInt(depthField).getOrElse(Int.MaxValue)
// if (depth < minRange) Some(dataRow.copy(isOutOfRange = true))
// else if (depth < maxRange - 1) Some(dataRow)
// else None
// }
// inRange
// }
.filter {
case (_, v) =>
v.nonEmpty
}.toList
val filtered = grouped.filter {
case (_, v) =>
v.nonEmpty
}.toList

filtered.foreach { newOpen: (LocalityGroup, Seq[DataRow]) =>
val trace_+ = newOpen._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ object GenExtractor extends GenExtractorImplicits {
// override def toString = meta.getOrElse("Elem").toString
}

case class OrElse[T, +R](
a: GenExtractor[T, R],
b: GenExtractor[T, R]
) extends GenExtractor[T, R] {

// resolve to a Spark SQL DataType according to an exeuction plan
override def resolveType(tt: DataType): DataType = {
val Seq(at, bt) = Seq(a, b).map(_.resolveType(tt))
require(at == bt, s"conflicting types: $at and $bt")
at
}

override def resolve(tt: DataType): PartialFunction[T, R] = {
val af = a.resolve(tt)
val bf = b.resolve(tt)

af.orElse(bf)
}

override def _args: Seq[GenExtractor[_, _]] = Seq(a, b)
}

case class AndThen[A, B, +C](
a: GenExtractor[A, B],
b: GenExtractor[B, C],
Expand Down Expand Up @@ -194,27 +216,33 @@ trait GenExtractor[T, +R] extends CatalystTypeOps.ImplicitMixin with Product wit
}

// will not rename an already-named Alias.
def withAliasIfMissing(field: Field): Alias[T, R] = {
def withFieldIfMissing(field: Field): Alias[T, R] = {
this match {
case alias: Alias[T, R] => alias
case _ => this.withAlias(field)
}
}

def withForkFieldIfMissing: Alias[T, R] = withAliasIfMissing(Const.defaultForkField)
def withForkFieldIfMissing: Alias[T, R] = withFieldIfMissing(Const.defaultForkField)

// TODO: should merge into andMap
def andEx[R2 >: R, A](g: GenExtractor[R2, A], meta: Option[Any] = None): GenExtractor[T, A] =
AndThen[T, R2, A](this, g, meta)

// TODO: map
def andMap[A: TypeTag](g: R => A, meta: Option[Any] = None): GenExtractor[T, A] = {
andEx(g, meta)
}

// TOD flatMap
def andFlatMap[A: TypeTag](g: R => Option[A], meta: Option[Any] = None): GenExtractor[T, A] = {
andEx(GenExtractor.fromOptionFn(g), meta)
}

def orElse[R2 >: R](g: GenExtractor[T, R2]): GenExtractor[T, R2] = {
OrElse(this, g)
}

def andTyped[R2 >: R, A](
g: R2 => A,
resolveType: DataType => DataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestDSL extends SpookyBaseSpec with LocalPathDocsFixture {
assert(renamed.asInstanceOf[Alias[_, _]].field.name == "name1")
val renamed2 = renamed as 'name2
assert(renamed2.asInstanceOf[Alias[_, _]].field.name == "name2")
val notRenamed = renamed withAliasIfMissing 'name2
val notRenamed = renamed withFieldIfMissing 'name2
assert(notRenamed.field.name == "name1")
}

Expand Down

0 comments on commit 8a8feff

Please sign in to comment.