Skip to content

Commit

Permalink
jfj
Browse files Browse the repository at this point in the history
  • Loading branch information
amwhite committed Mar 26, 2014
1 parent c0d59e2 commit 056e33f
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 58 deletions.
102 changes: 102 additions & 0 deletions src/test/R/test-join-mr.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
## This file contains tests that perform a simple map-reduce job
## asynchronously and then joins it.

context("Interrupting a simple mr job")

test_that("test rhinit", {
rhinit()
})

test.dir <- file.path(rhoptions()$HADOOP.TMP.FOLDER, "rhipeTest")

test_that("clean rhoptions()$HADOOP.TMP.FOLDER/rhipeTest and set working directory", {
if(rhexists(test.dir))
rhdel(test.dir)

rhmkdir(test.dir)
hdfs.setwd(test.dir)
})

test_that("simple mr job setup", {
# dummy set of data
permute <- sample(1:150, 150)
splits <- split(permute, rep(1:3, 50))
irisSplit <- lapply(seq_along(splits), function(x) {
list(x, iris[splits[[x]],])
})

if(rhexists("irisData"))
rhdel("irisData")
rhwrite(irisSplit, file="irisData")
})

## Function to loop for x seconds
pause <- function(x) {
s <- Sys.time()
while (Sys.time() - s < x) {}
}

test_that("start simple mr job asynchronously, then join it", {
# map code for computing range
rangeMap <- rhmap({
by(r, r$Species, function(x) {
rhcollect(
as.character(x$Species[1]),
range(x$Sepal.Length)
)
})
})
expect_true("rhmr-map" %in% class(rangeMap))

# reduce code for computing max
rangeReduce <- expression(
pre = {
rng <- c(Inf, -Inf)
},
reduce = {
a <- reduce.key
rx <- unlist(reduce.values)
rng <- c(min(rng[1], rx, na.rm = TRUE), max(rng[2], rx, na.rm = TRUE))
},
post = {
rhcollect(reduce.key, rng)
}
)

# if irisMax already exists, delete it before starting job
if (rhexists("irisMax")) { rhdel("irisMax") }

# set up the job
job <- rhwatch(
map = rangeMap,
reduce = rangeReduce,
input = "irisData",
output = "irisMax",
noeval=TRUE
)

# run the job
jobtoken <- rhex(job, async=TRUE)

# join the job (wait for it to complete)
res <- rhjoin(jobtoken)

# test rhofolder() function
expect_true(file.path(hdfs.getwd(), "irisMax"), rhofolder(job), "rhofolder returns correct value")

# get output and test for correctness
irisMax <- rhread("irisMax")

irisMax <- do.call(rbind, lapply(irisMax, function(x) {
data.frame(species=x[[1]], min=x[[2]][1], max=x[[2]][2], stringsAsFactors=FALSE)
}))
irisMax <- irisMax[order(irisMax$species),]

expectedMax <- as.numeric(by(iris, iris$Species, function(x) max(x$Sepal.Length)))

expect_equivalent(irisMax$max, expectedMax, "result of mr job is not correct")

})

# several parameters of rhwatch to test (readback, mapred, combiner, different input/output formats)

84 changes: 84 additions & 0 deletions src/test/R/test-mr-with-combiner.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
## This file contains tests that perform a simple map-reduce job
## that includes a combiner step.

context("Simple mr job with combiner")

test_that("test rhinit", {
rhinit()
})

test.dir <- file.path(rhoptions()$HADOOP.TMP.FOLDER, "rhipeTest")

test_that("clean rhoptions()$HADOOP.TMP.FOLDER/rhipeTest and set working directory", {
if(rhexists(test.dir))
rhdel(test.dir)

rhmkdir(test.dir)
hdfs.setwd(test.dir)
})

test_that("simple mr job setup", {
# dummy set of data
permute <- sample(1:150, 150)
splits <- split(permute, rep(1:3, 50))
irisSplit <- lapply(seq_along(splits), function(x) {
list(x, iris[splits[[x]],])
})

if(rhexists("irisData"))
rhdel("irisData")
rhwrite(irisSplit, file="irisData")
})

test_that("run mr job with combiner=TRUE", {
# map code for computing range
rangeMap <- rhmap({
by(r, r$Species, function(x) {
rhcollect(
as.character(x$Species[1]),
range(x$Sepal.Length)
)
})
})
expect_true("rhmr-map" %in% class(rangeMap))

# reduce code for computing max
rangeReduce <- expression(
pre = {
rng <- c(Inf, -Inf)
},
reduce = {
a <- reduce.key
rx <- unlist(reduce.values)
rng <- c(min(rng[1], rx, na.rm = TRUE), max(rng[2], rx, na.rm = TRUE))
},
post = {
rhcollect(reduce.key, rng)
}
)

# execute the job
res <- try(rhwatch(
map = rangeMap,
reduce = rangeReduce,
input = "irisData",
output = "irisMax",
combiner = TRUE
))

expect_true(!inherits(res, "try-error"),
label = "mr job ran successfully")

res <- do.call(rbind, lapply(res, function(x) {
data.frame(species=x[[1]], min=x[[2]][1], max=x[[2]][2], stringsAsFactors=FALSE)
}))
res <- res[order(res$species),]

resTest <- as.numeric(by(iris, iris$Species, function(x) max(x$Sepal.Length)))

expect_equivalent(res$max, resTest, "result of mr job is correct")

})

# several parameters of rhwatch to test (readback, mapred, combiner, different input/output formats)

59 changes: 59 additions & 0 deletions src/test/R/test-serialization.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
context("Test serialization and de-serialization")

test_that("test rhinit", {
rhinit()
})

test.dir <- file.path(rhoptions()$HADOOP.TMP.FOLDER, "rhipeTest")

test_that("clean rhoptions()$HADOOP.TMP.FOLDER/rhipeTest and set working directory", {
if(rhexists(test.dir))
rhdel(test.dir)

rhmkdir(test.dir)
hdfs.setwd(test.dir)
})

test_that("serialization and de-serialization in R memory", {
# dummy set of data
permute <- sample(1:150, 150)
splits <- split(permute, rep(1:3, 50))
irisSplit <- lapply(seq_along(splits), function(x) {
list(x, iris[splits[[x]],])
})
sample.string <- "this is a test of the serialization and de-serialization functions in Rhipe"

all.data <- list(iris, irisSplit, sample.string)

serialized <- rhsz(all.data)

unserialized <- rhuz(serialized)

expect_equivalent(all.data, unserialized)
})

test_that("deserialize data serialized in java"), {
#de-serialize something serialized in Java
java.opts <- rhuz(rhoptions()$server$rhmropts())
expect_true(data.class(java.opts) == "list")
expect_true(all(lapply(java.opts, FUN=data.class) == "character"))
})

test_that("use rhwrite/rhread which use serialization", {
# dummy set of data
permute <- sample(1:150, 150)
splits <- split(permute, rep(1:3, 50))
irisSplit <- lapply(seq_along(splits), function(x) {
list(x, iris[splits[[x]],])
})
rhwrite(irisSplit, "serializationTest")
new.data <- rhread("serializationTest")

# dummy set of data
permute <- sample(1:150, 150)
splits <- split(permute, rep(1:3, 50))
irisSplit <- lapply(seq_along(splits), function(x) {
list(x, iris[splits[[x]],])
})
expect_equivalent(irisSplit, new.data)
})
Loading

0 comments on commit 056e33f

Please sign in to comment.