Skip to content
Clark Fitzgerald edited this page Sep 6, 2016 · 19 revisions

Overview

We aim to implement an abstraction that allows the user to express operations using conventional R verbs, while computations are executed in a distributed fashion. Managing distributed computations is not trivial, however, and the optimal resource allocation depends on the problem at hand. Therefore, we also extend the R API with primitives that are specific to the distributed context.

Abstraction

There are two layers to our abstraction. The darray, dlist and dframe objects are part of the high-level, user-facing tier. Methods on those objects carry out backend-independent tasks, such as sanity checks, argument munging, etc. To carry out the work, those methods delegate to a corresponding derivative of the Backend class. Those derivatives are specific to the backend implementation and are abstractly constructed by the Driver subclass for that implementation. The user selects a backend by enabling a specific driver.

Distributed primitives

Since we are distributing our computations, we need some way for the user to access and manipulate the mode in which the data are partitioned over the nodes of the cluster. Parameters controlling the partitioning can be passed to the data structure constructors. We also introduce the parts() generic for returning a (lazy) list representation of the grouping of the elements by node.

Internal Design

Section by Clark Fitzgerald.

Topic first discussed on September 2, 2016 meeting with Michael Lawrence.

Systems for Distributed Data

Seems like there are two broad paradigms for handling large data sets:

1) Embarrassingly Parallel paradigm maps the same function over a set of data, ie lapply in base R.

  • R's parallel, snow etc. can potentially scale to large data if they have a function generating it, ie. reading from disk
  • Hadoop MapReduce
  • Spark RDD
  • DistributedR

2) Key Value paradigm to store large data sets. This is potentially more sophisticated since these systems are not restricted to the map paradigm.

  • Redis database
  • Apache Cassandra
  • NetWorkSpaces (NWS) (also Python's Twisted, Linda, tuple spaces, etc.)
  • File Servers. The familiar file directory system.

Note: These do overlap, and interesting combinations can happen between them. One relevant example is the Python Bolt project which implements an N-dimensional Python array on top of Spark using the key / value idea. Other related systems like MPI don't necessarily fit in this model, but MPI is a bit lower level.

If ddR hopes to become an interface from these systems to R then it makes sense to organize ddR's internal driver code around these two large use cases.

Current

With the current state of ddR the work happens through do_dmapply, which has this function signature:

setMethod("do_dmapply",
          signature(driver="parallel.ddR", func="function"),
          function(driver, func, ..., MoreArgs = list(),
                   output.type = c("dlist", "dframe", "darray", "sparse_darray"),
                   nparts = NULL,
                   combine = c("default","c", "rbind", "cbind"))

This complicated do_dmapply function then handles every possible combination of inputs and outputs inside of it. The code for the distributedR backend is 350 lines of nonstandard function construction through string manipulation. Yet distributedR implements the 3 distributed objects required for ddR, and ddR was tailor made for distributedR, so one would expect them to fit together seamlessly with a minimum of code.

For the parallel backend as I've rewritten it only a small portion of the do_mapply code is specific to the parallel driver:

    allargs <- c(list(cl = driver@cluster,
                      fun = func,
                      MoreArgs = MoreArgs,
                      RECYCLE = FALSE, SIMPLIFY = FALSE),
                 dots)
    answer <- do.call(parallel::clusterMap, allargs)

That means that all do_mapply really does is find a way to express the computation in the 'Embarrassingly Parallel' framework described above. Indeed, while working on the Spark stuff I was able to literally replace just this code block and pass 90% of the unit tests.

The current object oriented hierarchy of the package can be seen as this:

DObject
    ParallelObj

So the primary conceptual objects: darrays, dlists, and dframes are not represented by classes. In my experience the purpose of object oriented programming in R or any other language is to use classes to organize data and methods around the primary conceptual objects. The current design leads to this type of code:

setMethod("mean", "DObject", function(x,trim=0,na.rm=FALSE,...) {
if(!is.darray(x) && !is.dframe(x)) stop("mean is only supported for DArrays and DFrames")

Alternative

ddR could be redesigned and built on the idea of lazy lower level chunks. This would allow much easier integration with the backend systems above. It's also similar to what Michael Kane and Bryan Lewis said in their DSC presentation. Below is a rough sketch of the responsibilities of the new packages. Note that the function signatures can't be exactly this, it's only to convey the general idea. We also don't need to rename it!

  • ddR2 Layer used by most users
    • Requires a backend system to provide chunks.
    • Assembles chunks into distributed lists, matrices, and dataframes through (key, chunk) pairs.
    • Maintains metadata necessary to make operations like colnames fast.
    • Defines default standard R methods (head, mean, etc.) for all distributed objects
    • Defines convenient user facing parallel apply code like dmapply
    • Translates high level functions into lower level Map(func, chunks, MoreArgs) and sends these functions to chunks where the data reside.
    • collect triggers evaluation into local R object as before
  • spark.ddR2 Storage and execution layer providing direct access to the chunks from the backend system, which is Spark in this example.
    • Evaluates Map(func, chunks, MoreArgs) in parallel (or serial!) with a simple Map function
    • Creates new chunked objects.

In summary, the backend provides something like an R list with lapply, while ddR combines the elements of this list into the higher level R objects.

The new object oriented inheritance hierarchy would be this:

DObject
    DList
         Spark.DList  (OPTIONAL)
    DArray
         Spark.DArray (OPTIONAL)
    DFrame
         Spark.DFrame (OPTIONAL)

The driver / backend specific classes are optional because once we have Map and (key, chunk) pairs we can build all the other objects by stitching together chunks. The purpose of the driver specific classes is to extend and accelerate using dispatch on fast paths. I believe this was the original intent in ddR, although I haven't observed any places in the code where this is used. Here's what it would look like as an exported function in the spark.ddR2package:

setMethod("mean", signature(x = "Spark.DArray"), function(x) {
  jmean <- sparkapi::invoke(x@backend_object, "FastNativeJavaArrayMean")
  ... (etc.)
})

Benefits

  • Addresses much of the recent feedback on ddR
  • Complexity of do_dmapply will live in only one place, as opposed to repeating it for every driver.
  • Clear, simple, minimal responsibilities for driver and backend.
  • Extendable through method dispatch
  • Easier to maintain, ie. for continued R Consortium Work
  • Allows users access at different points for different levels of control, ie. they can go straight to the Map(func, chunks) if they need that.

Costs

  • Significant internal rewrite
  • May break some algorithms