# Prepare Rheem

1. Load relevant modules
2. Import relevant packages

In [1]:
val cwd = "/Users/basti/Work/Notebooks/boss-2016/examples"
val rheemVersion = "0.2.0"

// Get the right repositories for SNAPSHOT versions.
classpath.addRepository("file:///Users/basti/.m2/repository")
classpath.addRepository("https://oss.sonatype.org/content/repositories/snapshots")


// Load Rheem's core functionality.
classpath.add("org.qcri.rheem" % "rheem-core" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-api" % rheemVersion)

// Load Rheem's platform plugins.
classpath.add("org.qcri.rheem" % "rheem-java" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-spark" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-basic" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-graphchi" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-sqlite3" % rheemVersion)

// Load the platforms themselves.
classpath.add("org.apache.hadoop" % "hadoop-common" % "2.2.0")
classpath.add("org.apache.hadoop" % "hadoop-hdfs" % "2.2.0")
classpath.add("org.apache.spark" % "spark-core_2.11" % "1.6.1")

// Load the profiling utility used by Rheem.
classpath.add("de.hpi.isg" % "profiledb-store" % "0.1.1")

Adding 13 artifact(s)
Adding 4 artifact(s)
Adding 0 artifact(s)
Adding 1 artifact(s)
Adding 0 artifact(s)
Adding 37 artifact(s)
Adding 2 artifact(s)
Adding 30 artifact(s)
Adding 3 artifact(s)
Adding 84 artifact(s)
Adding 0 artifact(s)


[36mcwd[0m: [32mString[0m = [32m"/Users/basti/Work/Notebooks/boss-2016/examples"[0m
[36mrheemVersion[0m: [32mString[0m = [32m"0.2.0"[0m

In [2]:
// Import relevant packages.
import org.qcri.rheem.api._
import org.qcri.rheem.basic.data._
import org.qcri.rheem.core.api._
import org.qcri.rheem.core.util._
import org.qcri.rheem.core.function.FunctionDescriptor._
import org.qcri.rheem.basic.RheemBasics
import org.qcri.rheem.java.Java
import org.qcri.rheem.spark.Spark
import org.qcri.rheem.sqlite3.Sqlite3
import de.hpi.isg.profiledb.store.model._

import scala.collection.JavaConversions._
import scala.collection.mutable

// Create a configuration.
val conf = new Configuration(s"file://$cwd/rheem.properties")

// Little utility to show some execution metadata.
def printStats(experiment: Experiment) {
    def getTime(id: String) = experiment.getMeasurements.find(_.getId == id) match {
        case Some(measurement) => measurement.asInstanceOf[TimeMeasurement].getMillis
        case _ => 0
    }
    
    val optTime = getTime("Optimization")
    val execTime = getTime("Execution")
    val loEstTime = getTime("Estimate 1 (lower)")
    val hiEstTime = getTime("Estimate 1 (upper)")
    import org.qcri.rheem.core.profiling.PartialExecutionMeasurement
    val executions = experiment.getMeasurements.toSeq
        .filter(_.isInstanceOf[PartialExecutionMeasurement])
        .map(_.asInstanceOf[PartialExecutionMeasurement])
        .sortBy(_.getId)
    
    println("Statistics")
    println("==========")
    println(s"The optimization took ${Formats.formatDuration(optTime)}.")
    println(s"The execution took ${Formats.formatDuration(execTime)}, " + 
            s"while Rheem estimated ${Formats.formatDuration(loEstTime)} to ${Formats.formatDuration(hiEstTime)}.")
    for (execution <- executions) {
        println(s"* ${Formats.formatDuration(execution.getExecutionMillis)} " + 
                s"(est. ${execution.getEstimatedExecutionMillis}): ${execution.getOperators.mkString(", ")}")
    }
}

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/basti/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.13/slf4j-simple-1.7.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/basti/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/basti/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]


[32mimport [36morg.qcri.rheem.api._[0m
[32mimport [36morg.qcri.rheem.basic.data._[0m
[32mimport [36morg.qcri.rheem.core.api._[0m
[32mimport [36morg.qcri.rheem.core.util._[0m
[32mimport [36morg.qcri.rheem.core.function.FunctionDescriptor._[0m
[32mimport [36morg.qcri.rheem.basic.RheemBasics[0m
[32mimport [36morg.qcri.rheem.java.Java[0m
[32mimport [36morg.qcri.rheem.spark.Spark[0m
[32mimport [36morg.qcri.rheem.sqlite3.Sqlite3[0m
[32mimport [36mde.hpi.isg.profiledb.store.model._[0m
[32mimport [36mscala.collection.JavaConversions._[0m
[32mimport [36mscala.collection.mutable[0m
[36mconf[0m: [32morg[0m.[32mqcri[0m.[32mrheem[0m.[32mcore[0m.[32mapi[0m.[32mConfiguration[0m = Configuration[file:///Users/basti/Work/Notebooks/boss-2016/examples/rheem.properties]
defined [32mfunction [36mprintStats[0m

# Wordcount

In [3]:
locally {
    val rheemContext = new RheemContext(conf)
        .withPlugin(Java.basicPlugin)
        .withPlugin(Spark.basicPlugin)
    val planBuilder = new PlanBuilder(rheemContext)
    val experiment = new Experiment("exp01", new Subject("wordcount", "v1.0"))
    
    val inputUrl = "file:///Users/basti/Work/Data/text/odyssey.txt"
    import org.qcri.rheem.core.optimizer.ProbabilisticDoubleInterval
    val wordsPerLine = new ProbabilisticDoubleInterval(5, 15, 0.9)
    
    val result = planBuilder
      // Do some set up.
      .withJobName(s"WordCount ($inputUrl)")
      .withUdfJarsOf(this.getClass)
      .withExperiment(experiment)

      // Read the text file.
      .readTextFile(inputUrl).withName("Load file")

      // Split each line by non-word characters.
      .flatMap(_.split("\\W+"), selectivity = wordsPerLine).withName("Split words")

      // Filter empty tokens.
      .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")

      // Attach counter to each word.
      .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")

      // Sum up counters for every word.
      .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
      .withCardinalityEstimator((in: Long) => math.round(in * 0.01))

      // Execute the plan and collect the results.
      .collect()

    println(s"$inputUrl has ${result.size} different words, for instance:")
    result.toSeq.sortBy(-_._2).take(50).map(wc => s"'${wc._1}' appears ${wc._2} times").foreach(println)

    println()
    printStats(experiment)
}

file:///Users/basti/Work/Data/text/odyssey.txt has 7848 different words, for instance:
'the' appears 7076 times
'and' appears 5406 times
'of' appears 3639 times
'to' appears 3605 times
'a' appears 2052 times
'i' appears 2023 times
'you' appears 1985 times
'in' appears 1930 times
'he' appears 1893 times
'for' appears 1433 times
'his' appears 1343 times
'as' appears 1316 times
'that' appears 1316 times
'it' appears 1284 times
'with' appears 1249 times
'him' appears 1079 times
'was' appears 1065 times
'is' appears 971 times
'they' appears 971 times
'on' appears 934 times
'me' appears 921 times
'had' appears 891 times
'all' appears 887 times
'my' appears 884 times
'have' appears 880 times
'but' appears 876 times
'them' appears 795 times
'not' appears 795 times
'will' appears 737 times
'so' appears 714 times
'this' appears 693 times
'her' appears 673 times
'when' appears 667 times
'ulysses' appears 655 times
'your' appears 651 times
'from' appears 647 times
'she' appears 633 times
'who' app



# SINDY

This algorithm loads tuples, or rather cells, from a SQLite database and then detects *inclusion dependencies (INDs)* among the cells' columns.

In [4]:
locally {
    
    // Parameters
    val jdbcUrl = "jdbc:sqlite:/Users/basti/Work/Temp/Rheem/tpch.db"
    val schemaDefPath = "/Users/basti/Work/Temp/Rheem/tpch-schema.txt"
    val tables = Seq(("REGION", Seq("*")), ("NATION", Seq("*")), ("CUSTOMER", Seq("C_CUSTKEY", "C_NATIONKEY")))
    
    
    // Define some of the more complex UDFs.
    /**
    * UDF to create cells from a [[Record]].
    *
    * @param offset the column ID offset for the input [[Record]]s
    */
    class CellCreator(val offset: Int) extends SerializableFunction[Record, java.lang.Iterable[(String, Int)]] {

    override def apply(record: Record): java.lang.Iterable[(String, Int)] = {
      val cells = new java.util.ArrayList[(String, Int)](record.size)
      var columnId = offset
      for (index <- 0 until record.size) {
        cells.add((record.getString(index), columnId))
        columnId += 1
      }
      cells
    }
  }

  /**
    * UDF to merge the column IDs of two cells.
    */
  class CellMerger extends SerializableBinaryOperator[(String, Array[Int])] {

    lazy val merger = mutable.Set[Int]()

    override def apply(cell1: (String, Array[Int]), cell2: (String, Array[Int])): (String, Array[Int]) = {
      merger.clear()
      for (columnId <- cell1._2) merger += columnId
      for (columnId <- cell2._2) merger += columnId
      (cell1._1, merger.toArray)
    }
  }

  /**
    * UDF to create IND candidates from a cell group.
    */
  class IndCandidateGenerator extends SerializableFunction[(String, Array[Int]), java.lang.Iterable[(Int, Array[Int])]] {

    override def apply(cellGroup: (String, Array[Int])): java.lang.Iterable[(Int, Array[Int])] = {
      val columnIds = cellGroup._2
      val result = new java.util.ArrayList[(Int, Array[Int])](columnIds.length)
      for (i <- columnIds.indices) {
        val refColumnIds = new Array[Int](columnIds.length - 1)
        java.lang.System.arraycopy(columnIds, 0, refColumnIds, 0, i)
        java.lang.System.arraycopy(columnIds, i + 1, refColumnIds, i, refColumnIds.length - i)
        result.add((columnIds(i), refColumnIds))
      }
      result
    }
  }

  /**
    * UDF to merge two IND candidates.
    */
  class IndCandidateMerger extends SerializableBinaryOperator[(Int, Array[Int])] {

    lazy val merger = mutable.Set[Int]()

    override def apply(indc1: (Int, Array[Int]), indc2: (Int, Array[Int])): (Int, Array[Int]) = {
      merger.clear()
      for (columnId <- indc1._2) merger += columnId
      (indc1._1, indc2._2.filter(merger.contains))
    }

  }
    
    // Load the schema definition.
    val tableRegex = """(\w+)\[((?:\w+(?:,\w+)*)|(?:\*))\]""".r
    val schema = scala.io.Source.fromFile(schemaDefPath).getLines().map(_ match {
        case tableRegex(table, columns) => (table, columns.split(',').toSeq)
        case other: String => sys.error(s"Illegal table specifier: $other")
    }).toMap
    
    // Prepare the RheemContext.
    val rheemContext = new RheemContext(conf)
        .withPlugin(Java.basicPlugin)
        .withPlugin(Spark.basicPlugin)
        .withPlugin(Sqlite3.plugin)
    rheemContext.getConfiguration.setProperty("rheem.sqlite3.jdbc.url", jdbcUrl)
    val experiment = new Experiment("exp02", new Subject("sindy", "v1.0"))
    val planBuilder = new PlanBuilder(rheemContext).withExperiment(experiment)

    // Create cells from the various tables.
    var offset = 0
    val columnsById = mutable.Map[Int, String]()
    val allCells = tables
      .map { case (table, columns) =>
        // Handle the special case where columns == "*".
        var resolvedColumns = if (columns == Seq("*")) schema(table) else columns

        // Read the cells from the specified table/columns.
        var records = planBuilder.readTable(new org.qcri.rheem.sqlite3.operators.Sqlite3TableSource(table, schema(table): _*)).withName(s"Load $table")

        // If requested, project to the relevant fields.
        if (resolvedColumns.toSet != schema(table).toSet)
          records = records.projectRecords(resolvedColumns).withName(s"Project $table")

        // Create the cells, finally.
        val cells = records.flatMapJava(new CellCreator(offset), selectivity = resolvedColumns.size.toDouble).withName(s"Create cells for $table")

        // Maintain some helper data structures.
        resolvedColumns.zipWithIndex.foreach { case (column, index) => columnsById(offset + index) = s"$table[$column]" }
        offset += resolvedColumns.size

        cells
      }
      .reduce(_ union _)

    // Do the rest of the SINDY logic on the cells.
    val rawInds = allCells
      .map(cell => (cell._1, Array(cell._2))).withName("Prepare cell merging")
      .reduceByKeyJava(toSerializableFunction(_._1), new CellMerger).withName("Merge cells")
      .flatMapJava(new IndCandidateGenerator).withName("Generate IND candidate sets")
      .reduceByKeyJava(toSerializableFunction(_._1), new IndCandidateMerger).withName("Merge IND candidate sets")
      .filter(_._2.length > 0).withName("Filter empty candidate sets")
      .collect()

    // Make the results readable.
    val inds = rawInds.map {
      case (dep, refs) => (s"${columnsById(dep)}", refs.map(columnsById).toSeq)
    }
    
    println(s"Found ${inds.map(_._2.length).sum} INDs:")
    inds.foreach { case (dep, refs) => println(s"$dep is included in ${refs.mkString(" and ")}.")}
    
    println
    printStats(experiment)
}

Found 8 INDs:
REGION[R_REGIONKEY] is included in NATION[N_REGIONKEY] and NATION[N_NATIONKEY] and CUSTOMER[C_NATIONKEY].
NATION[N_NATIONKEY] is included in CUSTOMER[C_NATIONKEY].
NATION[N_REGIONKEY] is included in REGION[R_REGIONKEY] and NATION[N_NATIONKEY] and CUSTOMER[C_NATIONKEY].
CUSTOMER[C_NATIONKEY] is included in NATION[N_NATIONKEY].

Statistics
The optimization took 0:00:00.882.
The execution took 0:00:00.726, while Rheem estimated 0:00:00.386 to 0:00:04.310.
* 0:00:00.587 (est. (0:00:00.329 .. 0:00:00.789, p=5.00%)): Sqlite3TableSource[Load REGION], SqlToStream[convert out@Sqlite3TableSource[Load REGION]], JavaFlatMap[Create cells for REGION], Sqlite3TableSource[Load NATION], SqlToStream[convert out@Sqlite3TableSource[Load NATION]], JavaFlatMap[Create cells for NATION], JavaUnionAll[2->1, id=726800f8], Sqlite3TableSource[Load CUSTOMER], Sqlite3Projection[Project CUSTOMER], SqlToStream[convert out@Sqlite3Projection[Project CUSTOMER]], JavaFlatMap[Create cells for CUSTOMER], Java



# PageRank

The following Rheem code loads an RDF triple file, creates a graph from it by encoding vertices as IDs and edges as pairs of vertex IDs. The resulting graph is fed into a PageRank and the page ranks are eventually made readable.

In [5]:
locally {
    import org.qcri.rheem.api.graph._
    
    val inputUrl = "file:///Users/basti/Work/Data/rdf/dbpedia/page-links-en-uris_de.sample_5pc.nt"
    val numIterations = 10
    
    // Initialize.
    val rheemCtx = new RheemContext(conf)
        .withPlugin(Java.basicPlugin)
        .withPlugin(RheemBasics.graphPlugin)
    val experiment = new Experiment("exp03", new Subject("pagerank", "v1.0"))
    val planBuilder = new PlanBuilder(rheemCtx)
      .withJobName(s"PageRank ($inputUrl, $numIterations iterations)")
      .withUdfJarsOf(this.getClass)
      .withExperiment(experiment)

    // Read and parse the input file.
    val edges = planBuilder
      .readTextFile(inputUrl).withName("Load file")
      .filter(!_.startsWith("#"), selectivity = 1.0).withName("Filter comments")
      .map { raw =>
            // Find the first two spaces: Odds are that these are separate subject, predicated and object.
            val firstSpacePos = raw.indexOf(' ')
            val secondSpacePos = raw.indexOf(' ', firstSpacePos + 1)

            // Find the end position.
            var stopPos = raw.lastIndexOf('.')
            while (raw.charAt(stopPos - 1) == ' ') stopPos -= 1

            (raw.substring(0, firstSpacePos),
             raw.substring(firstSpacePos + 1, secondSpacePos),
             raw.substring(secondSpacePos + 1, stopPos))
      }.withName("Parse triples")
      .map { case (s, p, o) => (s, o) }.withName("Discard predicate")

    // Create vertex IDs.
    val vertexIds = edges
      .flatMap(edge => Seq(edge._1, edge._2)).withName("Extract vertices")
      .distinct.withName("Distinct vertices")
      .zipWithId.withName("Add vertex IDs")

    // Encode the edges with the vertex IDs
    type VertexId = org.qcri.rheem.basic.data.Tuple2[Vertex, String]
    val idEdges = edges
      .join[VertexId, String](_._1, vertexIds, _.field1).withName("Join source vertex IDs")
      .map { linkAndVertexId =>
        (linkAndVertexId.field1.field0, linkAndVertexId.field0._2)
      }.withName("Set source vertex ID")
      .join[VertexId, String](_._2, vertexIds, _.field1).withName("Join target vertex IDs")
      .map(linkAndVertexId => new Edge(linkAndVertexId.field0._1, linkAndVertexId.field1.field0)).withName("Set target vertex ID")

    // Run the PageRank.
    // Note: org.qcri.rheem.api.graph._ must be imported for this to work.
    val pageRanks = idEdges.pageRank(numIterations)

    // Make the page ranks readable.
    val result = pageRanks
      .join[VertexId, Long](_.field0, vertexIds, _.field0).withName("Join page ranks with vertex IDs")
      .map(joinTuple => (joinTuple.field1.field1, joinTuple.field0.field1)).withName("Make page ranks readable")
      .collect()
    
    println(s"Calculate page ranks for ${result.size} vertices:")
    result.toSeq.sortBy(-_._2).take(50).foreach(pr => println(s"${pr._1} has a page rank of ${pr._2}."))
    println
    printStats(experiment)
}

Calculate page ranks for 659133 vertices:
<http://dbpedia.org/resource/United_States> has a page rank of 0.0010866255.
<http://dbpedia.org/resource/France> has a page rank of 5.982427E-4.
<http://dbpedia.org/resource/Germany> has a page rank of 5.957829E-4.
<http://dbpedia.org/resource/World_War_II> has a page rank of 5.1534176E-4.
<http://dbpedia.org/resource/Category:American_people> has a page rank of 4.3901728E-4.
<http://dbpedia.org/resource/Italy> has a page rank of 3.9436592E-4.
<http://dbpedia.org/resource/English_language> has a page rank of 3.8891463E-4.
<http://dbpedia.org/resource/United_Kingdom> has a page rank of 3.4536282E-4.
<http://dbpedia.org/resource/Switzerland> has a page rank of 3.365571E-4.
<http://dbpedia.org/resource/World_War_I> has a page rank of 3.1882824E-4.
<http://dbpedia.org/resource/Paris> has a page rank of 3.0596784E-4.
<http://dbpedia.org/resource/New_York_City> has a page rank of 3.0100156E-4.
<http://dbpedia.org/resource/Category:German_people> has

