Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster workspace execution #220

Merged
merged 11 commits into from Feb 3, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ Please add changes to "master", preferably ordered by their significance. (Most
### master

- Upgraded to PyTorch Geometric (PyG) 2.0.1. [#206](https://github.com/lynxkite/lynxkite/pull/206)
- The workspace interface is much faster now. [#220](https://github.com/lynxkite/lynxkite/pull/220)

### 4.3.0

Expand Down
1 change: 1 addition & 0 deletions app/com/lynxanalytics/biggraph/controllers/Operation.scala
Expand Up @@ -304,6 +304,7 @@ abstract class OperationRepository(env: SparkFreeEnvironment) {
def getBoxMetadata(id: String) = getBox(id)._1

def atomicOperationIds = atomicOperations.keys.toSeq.sorted
def isCustom(id: String) = !atomicOperations.contains(id)

private def listFolder(user: serving.User, path: String): Seq[String] = {
val entry = DirectoryEntry.fromName(path)
Expand Down
62 changes: 59 additions & 3 deletions app/com/lynxanalytics/biggraph/controllers/Workspace.scala
Expand Up @@ -206,6 +206,49 @@ case class WorkspaceExecutionContext(
}
}

// This cache is used to avoid re-executing boxes all the time. Execution is fairly
// fast, because it only operates on the "meta" level. But we execute the workspace
// on every little change (e.g., a box was moved) so latency is quite important.
class BoxCache(maxSize: Int)
extends java.util.LinkedHashMap[String, Map[String, BoxOutputState]]() {
override def removeEldestEntry(
eldest: java.util.Map.Entry[String, Map[String, BoxOutputState]]) = {
size > maxSize
}
def getOrElseUpdate(
boxId: String, // Not part of the key, just for BoxOutputs.
operationId: String,
parameters: Map[String, String],
inputs: Map[String, BoxOutputState],
parametricParameters: Map[String, String],
workspaceParameters: Map[String, String])(
outputs: => Map[BoxOutput, BoxOutputState],
): Map[BoxOutput, BoxOutputState] = synchronized {
val k = key(operationId, parameters, inputs, parametricParameters, workspaceParameters)
if (this.containsKey(k)) {
val v = this.get(k)
v.map { case (k, v) => BoxOutput(boxId, k) -> v }
} else {
val v = outputs
this.put(k, v.map { case (k, v) => k.id -> v })
v
}
}
private def key(
operationId: String,
parameters: Map[String, String],
inputs: Map[String, BoxOutputState],
parametricParameters: Map[String, String],
workspaceParameters: Map[String, String]): String = {
val s: Seq[String] = Seq(operationId) ++
parameters.flatMap { case (k, v) => Seq(k, v) } ++
inputs.flatMap { case (k, v) => Seq(k, v.gUID.toString) } ++
parametricParameters.flatMap { case (k, v) => Seq(k, v) } ++
workspaceParameters.flatMap { case (k, v) => Seq(k, v) }
s.mkString("##")
}
}

case class Box(
id: String,
operationId: String,
Expand All @@ -229,9 +272,22 @@ case class Box(
def execute(
ctx: WorkspaceExecutionContext,
inputStates: Map[String, BoxOutputState]): Map[BoxOutput, BoxOutputState] = {
val op = getOperation(ctx, inputStates)
val outputStates = op.getOutputs
outputStates
if (ctx.ops.isCustom(operationId)) {
// The output of custom boxes is not entirely defined by their inputs and parameters.
// The backing workspace could change too! At the same time it's not important to cache
// them, since we will cache the atomic boxes that make them up anyway.
getOperation(ctx, inputStates).getOutputs
} else {
ctx.ops.metaGraphManager.boxCache.getOrElseUpdate(
id,
operationId,
parameters,
inputStates,
parametricParameters,
ctx.workspaceParameters) {
getOperation(ctx, inputStates).getOutputs
}
}
}

def allOutputsWithError(
Expand Down
Expand Up @@ -390,17 +390,20 @@ class WorkflowOperations(env: SparkFreeEnvironment) extends ProjectOperations(en
params += Code(
"sql",
"SQL",
defaultValue = s"select * from $defaultTableName limit 10\n",
defaultValue = s"select * from $defaultTableName\n",
language = "sql",
enableTableBrowser = true)
params += Choice("persist", "Persist result", options = FEOption.yesno)
params += Choice("persist", "Persist result", options = FEOption.noyes)
override def summary = params("summary")
def enabled = FEStatus.enabled
def defaultTableName = {
val tableNames = this.getInputTables(renaming).keySet.toList.sorted
val name = Seq("vertices", inputNames.head, inputNames.head + ".vertices")
.find(tableNames.contains(_))
.getOrElse(tableNames.head)
val first = inputNames.head
val state = context.inputs(inputs.head)
val name =
if (state.isProject) {
if (inputNames.length == 1) "vertices"
else first + ".vertices"
} else first
val simple = "[a-zA-Z0-9]*".r
name match {
case simple() => name
Expand Down
Expand Up @@ -16,12 +16,15 @@ import scala.reflect.runtime.universe.TypeTag
import com.lynxanalytics.biggraph.{bigGraphLogger => log}
import com.lynxanalytics.biggraph.controllers.CheckpointRepository
import com.lynxanalytics.biggraph.controllers.DirectoryEntry
import com.lynxanalytics.biggraph.controllers.BoxCache
import com.lynxanalytics.biggraph.controllers.Workspace
import com.lynxanalytics.biggraph.graph_util.LoggedEnvironment
import com.lynxanalytics.biggraph.graph_util.Timestamp

class MetaGraphManager(val repositoryPath: String) {
val checkpointRepo = MetaGraphManager.getCheckpointRepo(repositoryPath)
val repositoryRoot = new File(repositoryPath).getParent()
val boxCache = new BoxCache(LoggedEnvironment.envOrElse("KITE_BOX_CACHE_SIZE", "100000").toInt)

def apply[IS <: InputSignatureProvider, OMDS <: MetaDataSetProvider](
operation: TypedMetaGraphOp[IS, OMDS],
Expand Down
Expand Up @@ -374,6 +374,9 @@ class BigGraphKryoRegistrator extends KryoRegistrator {
kryo.register(Class.forName("org.apache.spark.sql.types.Decimal$DecimalIsFractional$"))
kryo.register(Class.forName("org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult"))
kryo.register(Class.forName("org.apache.spark.sql.execution.columnar.DefaultCachedBatch"))
kryo.register(Class.forName("scala.math.Ordering$Reverse"))
kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$"))
kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.NullsFirst$"))

// Add new stuff just above this line! Thanks.
// Adding Foo$mcXXX$sp? It is a type specialization. Register the decoded type instead!
Expand Down
6 changes: 3 additions & 3 deletions dependency-licenses/javascript.md
Expand Up @@ -499,7 +499,7 @@
│ ├─ flagged-respawn@1.0.1
│ ├─ flat-cache@2.0.1
│ ├─ flush-write-stream@1.1.1
│ ├─ follow-redirects@1.14.1
│ ├─ follow-redirects@1.14.7
│ ├─ for-in@1.0.2
│ ├─ for-own@1.0.0
│ ├─ form-data@2.3.3
Expand Down Expand Up @@ -659,7 +659,7 @@
│ ├─ map-cache@0.2.2
│ ├─ map-stream@0.0.7
│ ├─ map-visit@1.0.0
│ ├─ markdown-it@12.0.6
│ ├─ markdown-it@12.3.2
│ ├─ matchdep@2.0.0
│ ├─ mdurl@1.0.1
│ ├─ merge-stream@2.0.0
Expand All @@ -680,7 +680,7 @@
│ ├─ ms@2.1.3
│ ├─ mute-stdout@1.0.1
│ ├─ nan@2.14.2
│ ├─ nanoid@3.1.23
│ ├─ nanoid@3.2.0
│ ├─ nanomatch@1.2.13
│ ├─ natural-compare@1.4.0
│ ├─ negotiator@0.6.2
Expand Down