Skip to content

Commit

Permalink
[Minor][SparkR] Minor refactor and removes redundancy related to clea…
Browse files Browse the repository at this point in the history
…nClosure.

1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition.
2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`.

Author: hlin09 <hlin09pu@gmail.com>

Closes apache#5495 from hlin09/cleanClosureFix and squashes the following commits:

74ec303 [hlin09] Minor refactor and removes redundancy.
  • Loading branch information
hlin09 authored and shivaram committed Apr 14, 2015
1 parent b45059d commit 0ba3fdd
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 16 deletions.
16 changes: 4 additions & 12 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)

if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
.Object@func <- func
.Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
Expand All @@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
}
.Object@func <- pipelinedFunc
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
Expand Down Expand Up @@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
return(rdd@env$jrdd_val)
}

computeFunc <- function(split, part) {
rdd@func(split, part)
}

packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)

broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })

serializedFuncArr <- serialize(computeFunc, connection = NULL)
serializedFuncArr <- serialize(rdd@func, connection = NULL)

prev_jrdd <- rdd@prev_jrdd

Expand Down Expand Up @@ -551,11 +547,7 @@ setMethod("mapPartitions",
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
FUN <- cleanClosure(FUN)
closureCapturingFunc <- function(split, part) {
FUN(split, part)
}
PipelinedRDD(X, closureCapturingFunc)
PipelinedRDD(X, FUN)
})

#' @rdname lapplyPartitionsWithIndex
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,6 @@ setMethod("cogroup",
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
# will not be captured into UDF if getJRDD is not invoked.
# It should be resolved together with that issue.
getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {
Expand Down

0 comments on commit 0ba3fdd

Please sign in to comment.