Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Arrow #1611

Merged
merged 132 commits into from
Nov 2, 2018
Merged

Implement Arrow #1611

merged 132 commits into from
Nov 2, 2018

Conversation

javierluraschi
Copy link
Collaborator

@javierluraschi javierluraschi commented Jul 20, 2018

Support for Apache Arrow in sparklyr.

# Install this PR
devtools::install_github("apache/arrow", subdir = "r", ref = "dc5df8f")
devtools::install_github("rstudio/sparklyr")

# Initialize Data
df <- data.frame(y = runif(10^5, 0, 1))

# Initialize sparklyr
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3.1")

Benchmarks

For completeness, adding sparkR, which gets initialized as:

# Initialize SparkR
Sys.setenv(SPARK_HOME = sparklyr::spark_home_dir("2.3.1"))
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sess <- sparkR.session(master = "local[*]")

Copying

copy_benchmark <- microbenchmark::microbenchmark(
    arrow = {
        library(arrow)
        sparklyr_df <<- dplyr::copy_to(sc, df, memory = T, overwrite = T)
        dplyr::count(sparklyr_df)
    },
    sparklyr = {
        if ("arrow" %in% .packages()) detach("package:arrow")
        sparklyr_df <<- dplyr::copy_to(sc, df, memory = T, overwrite = T)
        dplyr::count(sparklyr_df)
    },
    sparkr = {
        sparkr_df <<- SparkR::cache(SparkR::as.DataFrame(df))
        SparkR::count(sparkr_df)
    },
    times = 10
)

ggplot2::autoplot(copy_benchmark)

copy-benchmark

Collecting

collect_benchmark <- microbenchmark::microbenchmark(
    arrow = {
        library(arrow)
        sparklyr_local <<- dplyr::collect(sparklyr_df)
    },
    sparklyr = {
        if ("arrow" %in% .packages()) detach("package:arrow")
        sparklyr_local <<- dplyr::collect(sparklyr_df)
    },
    sparkr = {
        sparkr_local <<- collect(sparkr_df)
    },
    times = 10
)

ggplot2::autoplot(collect_benchmark)

collect-benchmark

Running this benchmark with 10^6 entries shows improvements under arrow,

df_large <- data.frame(y = runif(10^6, 0, 1))
sparklyr_large <<- dplyr::copy_to(sc, df_large, memory = T, overwrite = T)

collect_large_benchmark <- microbenchmark::microbenchmark(
    arrow = {
        library(arrow)
        sparklyr_local <<- dplyr::collect(sparklyr_large)
    },
    sparklyr = {
        if ("arrow" %in% .packages()) detach("package:arrow")
        sparklyr_local <<- dplyr::collect(sparklyr_large)
    },
    times = 10
)

ggplot2::autoplot(collect_large_benchmark)

collect-large-benchmark

spark_apply()

r_benchmark <- microbenchmark::microbenchmark(
    arrow = {
        library(arrow)
        spark_apply(sparklyr_df, ~ .x / 1.2, columns = list(x = "numeric"), env = list(R_ENABLE_JIT = "0"), memory = F) %>% dplyr::count() %>% dplyr::collect()
    },
    sparklyr = {
        if ("arrow" %in% .packages()) detach("package:arrow")
        spark_apply(sparklyr_df, ~ .x / 1.2, columns = list(x = "numeric"), memory = F) %>% dplyr::count() %>% dplyr::collect()
    },
    sparkr = {
        dapply(sparkr_df, function(x) x / 1.2, structType(structField("y", "double"))) %>% SparkR::count()
    },
    times = 10, control = list(order = "block")
)

ggplot2::autoplot(r_benchmark)

r-benchmark

Notice that JIT was turned off since it adds a bit of overhead in spark_apply() for this particular example, here is a detailed comparison between JIT enabled/disabled with arrow:

jit_benchmark <- microbenchmark::microbenchmark(
    jit_off = {
        library(arrow)
        spark_apply(sparklyr_df, ~ .x / 1.2, columns = list(x = "numeric"), env = list(R_ENABLE_JIT = "0"), memory = F) %>% dplyr::count() %>% dplyr::collect()
    },
    jit_on = {
        library(arrow)
        spark_apply(sparklyr_df, ~ .x / 1.2, columns = list(x = "numeric"), memory = F) %>% dplyr::count() %>% dplyr::collect()
    },
    times = 10
)

ggplot2::autoplot(jit_benchmark)

jit-benchmark

Here is a profile measuring time spent while running spark_apply(), loading arrow seems to take 260ms which could be worth investigating further at some point:

screen shot 2018-10-17 at 8 11 47 pm

Comparing with scala:

def time[R](block: => R): R = {
    val t0 = System.currentTimeMillis()
    val result = block    // call-by-name
    val t1 = System.currentTimeMillis()
    println("Elapsed time: " + (t1 - t0) + "ms")
    result
}

val data = spark.range(1,100000,1).cache

time { data.map(_ / 1.2).count() }

Elapsed time: 174ms
res16: Long = 99999

Tests

From the Travis run performance results, we can compare execution against arrow as follows:

library(dplyr)

data_sparklyr <- read.csv("~/RStudio/temp/sparklyr-perf-tests-spark.txt") %>%
  pull() %>%
  stringr::str_match(., "(.*) ([0-9]+\\.?[0-9]*)") %>%
  as.data.frame() %>%
  transmute(test = trimws(V2), serializer = "sparklyr", time = as.numeric(trimws(V3)))

data_arrow <- read.csv("~/RStudio/temp/sparklyr-perf-tests-arrow.txt") %>%
  pull() %>%
  stringr::str_match(., "(.*) ([0-9]+\\.?[0-9]*)") %>%
  as.data.frame() %>%
  transmute(test = trimws(V2), serializer = "arrow", time = as.numeric(trimws(V3)))

library(ggplot2)
bind_rows(data_sparklyr, data_arrow) %>%
  ggplot(aes(x=test, y=time, fill = serializer)) +
    geom_bar(stat='identity', position='dodge') +
    theme(axis.text.x = element_blank(), axis.ticks.x=element_blank())

sparklyr-arrow-tests

bind_rows(data_sparklyr, data_arrow) %>%
  tidyr::spread(serializer, time) %>%
  summarise(arrow = sum(arrow), sparklyr = sum(sparklyr))
     arrow sparklyr
1 1406.354 1475.308

Overall, arrow tests execute faster than the sparklyr serializer, Travis tests use only small datasets but help ensure unnecessary overhead is not being introduced.

@javierluraschi javierluraschi merged commit 78bbe0a into master Nov 2, 2018
@wesm
Copy link

wesm commented Nov 11, 2018

huzzah!

@wesm
Copy link

wesm commented Nov 11, 2018

@javierluraschi would you be interested in doing a write up for the Apache Arrow blog about this work, including all the benchmark results?

@javierluraschi
Copy link
Collaborator Author

@wesm yes, for sure. However, I'm not considering this work complete, mostly due to arrow_data.R#L21, since I'm currently tuning off arrow for the unsupported data types, we have dates almost figured out but nested data is also missing. I'm also investigating larger copy/collect use cases by tweaking batches.

So, we could write a "preliminary results" post in your blog mentioning these caveats and the current state of this work, or we could wait until we push everything to CRAN, which is probably a couple months away, or do both posts.

What's your take?

@wesm
Copy link

wesm commented Nov 12, 2018

I recommend a blog much sooner as a means of also drumming up community involvement.

@javierluraschi
Copy link
Collaborator Author

@wesm Makes sense. How do I send you a blog post?

@wesm
Copy link

wesm commented Nov 12, 2018

You can do it as a pull request to the site/ directory in the Arrow repo

@javierluraschi
Copy link
Collaborator Author

@wesm here is a draft post: apache/arrow#3001

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants