Skip to content

Commit

Permalink
Add workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial committed Jan 13, 2017
1 parent fce7bf4 commit 39dec58
Show file tree
Hide file tree
Showing 10 changed files with 587 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ lazy val wvletCore =
wvletLog,
"org.wvlet" %% "object-schema" % "1.0",
"org.msgpack" % "msgpack-core" % "0.8.7",
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scalap" % scalaVersion.value,
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"
)
).dependsOn(wvletTest % "test->compile")
Expand Down
74 changes: 74 additions & 0 deletions wvlet-core/src/main/scala/wvlet/workflow/GraphvizWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.workflow

import java.io.{OutputStream, PrintStream}

class GraphvizWriter(out: OutputStream, options: Map[String, String]) {

def this(out: OutputStream) = this(out, Map.empty)

private val g = new PrintStream(out, true, "UTF-8")
private var indentLevel = 0

if (!options.isEmpty) {
g.println(s"graph {${options.map(p => s"${p._1}=${p._2}").mkString(", ")}}")
}

def digraph(graphName: String = "G")(w: GraphvizWriter => Unit): GraphvizWriter = {
g.println("digraph %s {".format(graphName))
indentLevel += 1
w(this)
indentLevel -= 1
g.println("}")
this
}

def newline {
g.println
}

def indent {
for (i <- 0 until indentLevel) {
g.print(" ")
}
}

private def toString(options: Map[String, String]) =
"[%s]".format(options.map(p => "%s=%s".format(p._1, p._2)).mkString(", "))

def node(nodeName: String, options: Map[String, String] = Map.empty): GraphvizWriter = {
indent
g.print("\"%s\"".format(nodeName))
if (!options.isEmpty) {
g.print(" %s".format(toString(options)))
}
g.println(";")
this
}

def arrow(srcNodeID: String, destNodeID: String, options: Map[String, String] = Map.empty): GraphvizWriter = {
indent
g.print("\"%s\" -> \"%s\"".format(srcNodeID, destNodeID))
if (!options.isEmpty) {
g.print(" %s".format(toString(options)))
}
g.println(";")
this
}

def flush = g.flush

def close = g.close
}
117 changes: 117 additions & 0 deletions wvlet-core/src/main/scala/wvlet/workflow/Task.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.workflow

import wvlet.log.LogSupport
import wvlet.workflow.macros.{CodeBlock, CodeRef}

import scala.collection.mutable
import scala.language.dynamics

sealed trait TaskState {
def isDone: Boolean
}

object TaskState {

case object Queued extends TaskState {
def isDone = false
}

case object Running extends TaskState {
def isDone = false
}

case object Finished extends TaskState {
def isDone = true
}

case object Failed extends TaskState {
def isDone = true
}


}

object TaskConfig {
def empty = new TaskConfig(mutable.Map.empty)

def apply(params: (String, Any)*) = new TaskConfig(mutable.Map(params: _*))
}

object TaskContext {
val empty = TaskContext(TaskConfig.empty)
}

case class TaskContext(config: TaskConfig)

case class TaskConfig(param: mutable.Map[String, Any] = mutable.Map.empty) extends Dynamic {
/**
* Merge two config parameters and return a new TaskConfig object.
* @param other
* @return
*/
def merge(other: TaskConfig): TaskConfig = new TaskConfig(this.param ++ other.param)

def selectDynamic(key: String) = param(key)

private[wvlet] def updateDynamic(key: String)(v: Any) { param.put(key, v) }

override def toString = param.map { case (k, v) => s"${ k }:${ v }" }.mkString(", ")
}

/**
* Task is a special case of Flow which does not pass any data object
*/
class Task(val name: String,
val config: TaskConfig,
var dependencies: Seq[Task] = Seq.empty) { self =>
//
// def apply() : Unit = { runSolely() }
//
// /**
// * Run this task locally without processing its dependent tasks
// */
// def runSolely(context: TaskContext = TaskContext.empty): Unit = block.runSolely(context)

/**
* Add dependencies to this task.
*
* @param task
* @return
*/
def dependsOn(task: Task*): self.type = { dependencies ++= task.seq; self }

def shortName = Option(name.split("\\.")).map(a => a(a.length - 1)).getOrElse(name)

override def toString() = s"Task(${ shortName }, ${ config })"

def objectId: String = "%8x".format(super.hashCode())
}

object Task extends LogSupport {

def apply(name: String, config: TaskConfig = TaskConfig())(body: => Unit): Task = new Task(name, config, Seq.empty[Task])

def newTask(codeRef: CodeRef, body: => Unit, config: TaskConfig = TaskConfig.empty): Task =
new Task(codeRef.name, config.merge(TaskConfig("codeRef" -> codeRef)), Seq.empty)

def newTaskWithContext(codeRef: CodeRef, config: TaskConfig = TaskConfig.empty): Task =
new Task(codeRef.name, config.merge(TaskConfig("codeRef" -> codeRef)), Seq.empty)

def fromCommand(codeRef: CodeRef, command: String): Task =
newTaskWithContext(codeRef, TaskConfig("command" -> command))

}

85 changes: 85 additions & 0 deletions wvlet-core/src/main/scala/wvlet/workflow/TaskGraph.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.workflow

import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets

object TaskGraph {

def apply(leaf: Task): TaskGraph = {

var numNodes = 0
val idTable = scala.collection.mutable.Map[Task, Int]()
val edgeTable = scala.collection.mutable.Set[(Int, Int)]()
def getId(s: Task) = idTable.getOrElseUpdate(s, {numNodes += 1; numNodes - 1})

def traverse(s: Task, visited: Set[Int]): Unit = {
val id = getId(s)
if (!visited.contains(id)) {
val updated = visited + id
for (in <- s.dependencies) {
val sourceId = getId(in)
edgeTable += ((id, sourceId))
traverse(in, updated)
}
}
}

traverse(leaf, Set.empty)

val nodes = idTable.toSeq.sortBy(_._2).map(_._1)
val edges = for ((id, lst) <- edgeTable.toSeq.groupBy(_._1)) yield id -> lst.map(_._2).sorted
TaskGraph(nodes, edges)
}
}

/**
*
*/
case class TaskGraph(nodes: Seq[Task], dependencies: Map[Int, Seq[Int]]) {

override def toString = {
val out = new StringBuilder
out.append("[nodes]\n")
for ((n, i) <- nodes.zipWithIndex) {
out.append(f" [$i:${n.hashCode()}%08x] ${n.name} := [${n.shortName}]\n")
}
out.append("[dependencies]\n")
for ((n, id) <- nodes.zipWithIndex; dep <- dependencies.get(id)) {
out.append(s" $id <- ${dep.mkString(", ")}\n")
}
out.result()
}

def toGraphViz: String = {
val s = new ByteArrayOutputStream()
val g = new GraphvizWriter(s, Map("fontname" -> "Roboto"))

g.digraph("G") {
dg =>
for ((n, id) <- nodes.zipWithIndex) {
val label = s"${n.shortName}"
dg
.node(id.toString, Map("label" -> label, "shape" -> "box", "color" -> "\"#5cc2c9\"", "fontcolor" -> "white", "style" -> "filled"))
}
for ((srcId, destIdLst: Seq[Int]) <- dependencies; dstId <- destIdLst) {
dg.arrow(dstId.toString, srcId.toString)
}
}
g.close
new String(s.toByteArray, StandardCharsets.UTF_8)
}

}
25 changes: 25 additions & 0 deletions wvlet-core/src/main/scala/wvlet/workflow/WorkflowException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.workflow

/**
*
*/
object WorkflowException {
def NA = {
val t = new Throwable
val caller = t.getStackTrace()(1)
throw new Exception(s"${caller.getMethodName} (${caller.getFileName}:${caller.getLineNumber})")
}
}
32 changes: 32 additions & 0 deletions wvlet-core/src/main/scala/wvlet/workflow/macros/CodeBlock.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.workflow.macros

/**
* This class is used for wrapping a code block since case class does not allow having
* a code block as a named parameter.
*
* @param body
*/
class CodeBlock[Context](body: Context => Unit, lazyF0: Option[LazyF0[Unit]] = None) {
def this(body: Context => Unit) = this(body, None)

def this(body: => Unit) = this({ context : Context => body }, Some(LazyF0[Unit](body)))

def runSolely(context:Context) = body(context)

def codeBlockClass: Class[_] = lazyF0.map(_.functionClass).getOrElse(body.getClass)

def codeBlockInstance: AnyRef = lazyF0.map(_.functionInstance).getOrElse(body)
}

0 comments on commit 39dec58

Please sign in to comment.