Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST] Run all Travis tests with Arrow enabled #1727

Closed
wants to merge 96 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
fa4e6a8
start with naive conversion using feather
javierluraschi Jul 19, 2018
6b66774
use scala helper to load binary rdd holding arrow data
javierluraschi Jul 19, 2018
0427aee
support arrowconverters using public spark api
javierluraschi Jul 20, 2018
67c6ad7
make use of converters from arrow_copy_to
javierluraschi Jul 20, 2018
556fc88
fix typos while calling arrow converter from r
javierluraschi Jul 20, 2018
66c4699
add interface for python arrow serializer to compare with ease
javierluraschi Jul 20, 2018
aa19a71
reuse known schema instead of relying on arrow's file schema
javierluraschi Jul 20, 2018
9396a4f
use proper arrow batch writer since batches are expected in java arro…
javierluraschi Jul 21, 2018
60d3898
avoid __index_level_0__ while converting to arrow
javierluraschi Jul 23, 2018
ee23b1a
use internal rows to match pythons arrow converter
javierluraschi Jul 23, 2018
7f2fbed
use sparklyr's invoke to properly match method arguments
javierluraschi Jul 24, 2018
31b4bb4
fix reticulate reference in arrow poc
javierluraschi Jul 24, 2018
960da2f
add arrow as remote
javierluraschi Oct 2, 2018
e1bd21e
make use of new arrow serializer and default to this
javierluraschi Oct 2, 2018
04933d8
simplify arrow serializers to only use the R serializer
javierluraschi Oct 2, 2018
6a6ce05
enable arrow upstream serialization in sdf_copy_to, dplyr and dbi
javierluraschi Oct 2, 2018
7060a60
rebuild jars and sources
javierluraschi Oct 2, 2018
b3db8d0
add tobatchiterator for arrowconverters
javierluraschi Oct 3, 2018
6ab9709
rebuild jars
javierluraschi Oct 3, 2018
961d49f
complete arrow serialization to enable raw collection
javierluraschi Oct 4, 2018
825c498
add message headers with schema to arrow collection
javierluraschi Oct 4, 2018
5703b4d
rebuild jars
javierluraschi Oct 4, 2018
273ffee
make use of arrows record_batch() pr to improve performance
javierluraschi Oct 10, 2018
53222f6
rebuild jars
javierluraschi Oct 10, 2018
c8e81b5
support for collecting using arrow
javierluraschi Oct 10, 2018
6017892
enable arrow collection
javierluraschi Oct 10, 2018
0e05606
support for arrow collection in spark_apply scala codebase
javierluraschi Oct 12, 2018
c8ab71a
support for arrow collection in spark_apply R codebase
javierluraschi Oct 12, 2018
49a9182
rebuild docs and sources
javierluraschi Oct 12, 2018
56d1da1
fix null pointer exception while processing distributed map due to se…
javierluraschi Oct 12, 2018
14641a3
fix typo in refactored schema check
javierluraschi Oct 12, 2018
170cf4d
fix schema retrieval from arrow refactoring
javierluraschi Oct 12, 2018
5291ee1
fix missing sc exception in R worker script
javierluraschi Oct 12, 2018
88a4b82
fix additional typos from spark_apply() refactoring
javierluraschi Oct 12, 2018
0b2a109
proper support for passing timezone into spark_apply with arrow
javierluraschi Oct 12, 2018
845af1a
rebuild jars and sources
javierluraschi Oct 12, 2018
6752c8c
support enabling arrow in r worker
javierluraschi Oct 12, 2018
11b4852
use worker invokes in workers code
javierluraschi Oct 12, 2018
0b85c55
rebuild jars
javierluraschi Oct 12, 2018
0858229
fix parameter call order in r worker with arrow
javierluraschi Oct 12, 2018
385c2c1
use entire record batch while streaming bindings are implemented
javierluraschi Oct 13, 2018
bfcb31e
fix schema retrieval while using spark_apply with arrow
javierluraschi Oct 13, 2018
92ceb45
disable arrow for int64 while type not implemented in arrow
javierluraschi Oct 13, 2018
9cc9d4a
rebuild jars and sources
javierluraschi Oct 13, 2018
3f2b505
fix regression in spark_apply() missing to write callstack to log
javierluraschi Oct 15, 2018
279ae06
spark_apply() should not attempt to cancel jobs which reports wrong e…
javierluraschi Oct 15, 2018
646f0b3
support in spark_apply() for constructors
javierluraschi Oct 16, 2018
fc0e7c2
support for returning data to using arrow in spark_apply()
javierluraschi Oct 16, 2018
451eedf
rebuild jars and sources
javierluraschi Oct 16, 2018
d09fb7f
use rtools to compile stringi sources while not in cran
javierluraschi Oct 16, 2018
696e22d
fix r cmd check warnigns
javierluraschi Oct 16, 2018
1c1aff1
fix connection issue using older sparklyr package with newer spark ve…
javierluraschi Oct 17, 2018
8317e02
support for tunring on off arrow and jit in spark_apply
javierluraschi Oct 17, 2018
eae6bfe
transition to use new streaming bindings pr
javierluraschi Oct 17, 2018
affa947
rebuild jars and sources
javierluraschi Oct 17, 2018
6ec92be
remove remote to avoid travis dependencies
javierluraschi Oct 17, 2018
e82006c
fix r cmd check warnings
javierluraschi Oct 18, 2018
fdace3a
rebuild jars and sources
javierluraschi Oct 18, 2018
548ef09
support for factors in copy_to and apply within arrow
javierluraschi Oct 18, 2018
27c7379
rebuild jars and sources
javierluraschi Oct 18, 2018
87093b8
support in spark_apply() for groupby using arrow in scala
javierluraschi Oct 18, 2018
a39fc62
support in spark_apply() for groupby using arrow in R
javierluraschi Oct 18, 2018
75dd6af
enable groupby over arrow only on spark 2.3 or newer
javierluraschi Oct 18, 2018
74442b7
rebuild jars and sources
javierluraschi Oct 18, 2018
b765ec0
fix r cmd warning
javierluraschi Oct 18, 2018
d9e85c9
add missing livy arrow sources
javierluraschi Oct 19, 2018
1cf11c7
rebuild livy sources
javierluraschi Oct 19, 2018
1a24eac
private classes not compatible with livy connections
javierluraschi Oct 19, 2018
0334064
rebuild livy sources
javierluraschi Oct 19, 2018
8542a62
add support for arrow in travis
javierluraschi Oct 18, 2018
a4d9e75
use addons to install arrow binaries
javierluraschi Oct 19, 2018
082d8fc
fix typos and clean settings
javierluraschi Oct 19, 2018
d0ccae9
correct spacing for travis apt source line
javierluraschi Oct 19, 2018
20bd4bd
fix script check in travis arrow installer
javierluraschi Oct 19, 2018
94838cd
one more fix to arrow travis installer
javierluraschi Oct 19, 2018
6aa72bc
install devtools package
javierluraschi Oct 19, 2018
fa425d5
fix typo while installing devtools for arrow in travis
javierluraschi Oct 19, 2018
16ea9b3
enable arrow in travis tests
javierluraschi Oct 19, 2018
ebae3bb
fix iris copy test under arrow
javierluraschi Oct 19, 2018
115f29f
qualify utils class for livy
javierluraschi Oct 19, 2018
d8f1402
rebuild jars and sources
javierluraschi Oct 19, 2018
d41a53f
split arrowbatchstreamwritter to its own file
javierluraschi Oct 20, 2018
235ee87
enable arrow transfer with livy
javierluraschi Oct 20, 2018
e08e4c9
rebuild jars and sources
javierluraschi Oct 20, 2018
a5b9ef2
Revert "qualify utils class for livy"
javierluraschi Oct 20, 2018
535c399
rebuild jars and sources
javierluraschi Oct 20, 2018
9eb6084
make arrow library call in tests dynamic to avoid suggests
javierluraschi Oct 20, 2018
41202e2
fix livy connections under spark_apply() in spark 2.3
javierluraschi Oct 20, 2018
8aa5b2b
better label for arrow travis environment
javierluraschi Oct 20, 2018
a5181ad
enable copy_to and collect using arrow and livy
javierluraschi Oct 22, 2018
3981419
disble spark apply packages distribution by default in livy
javierluraschi Oct 22, 2018
9fe8727
add log entry to connections pane under livy
javierluraschi Oct 22, 2018
8019437
support for spark_apply in livy with arrow
javierluraschi Oct 22, 2018
b0f1b85
rebuild jars and sources
javierluraschi Oct 22, 2018
8ed1a85
attempt to enable all tests under arrow
javierluraschi Oct 22, 2018
d405af2
disable non-arrow tests while troubleshooting
javierluraschi Oct 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.R
Expand Up @@ -10,6 +10,9 @@ if (length(args) == 0) {
source("testthat.R")
} else if (args[[1]] == "--coverage") {
covr::codecov()
} else if (args[[1]] == "--arrow") {
install.packages("devtools")
devtools::install_github("apache/arrow", subdir = "r")
} else {
stop("Unsupported arguments")
}
30 changes: 13 additions & 17 deletions .travis.yml
Expand Up @@ -14,30 +14,26 @@ r_packages:

matrix:
include:
- name: "Spark 1.6.3 (R 3.2, openjdk7)"
r: 3.2
env:
- SPARK_VERSION="1.6.3"
- JAVA_VERSION=openjdk7
- name: "Spark 2.2.1 (R oldrel, oraclejdk8)"
r: oldrel
env:
- SPARK_VERSION="2.2.1"
- JAVA_VERSION=oraclejdk8
- name: "Spark 2.3.1 (R release, openjdk8)"
r: release
env:
- SPARK_VERSION="2.3.1"
- JAVA_VERSION=openjdk8
- name: "Livy 0.5.0 (R release, openjdk8)"
- name: "Arrow (R release, openjdk8)"
r: release
env:
- LIVY_VERSION="0.5.0"
- R_ARROW="true"
- JAVA_VERSION=openjdk8
addons:
apt:
sources:
- sourceline: deb https://packages.red-data-tools.org/ubuntu/ trusty universe
key_url: https://packages.red-data-tools.org/ubuntu/red-data-tools-keyring.gpg
packages:
- apt-transport-https
- lsb-release
- libarrow-dev
- libarrow-glib-dev

before_install:
- jdk_switcher use $JAVA_VERSION
- echo $JAVA_HOME
- if [[ $R_ARROW == "true" ]]; then Rscript .travis.R --arrow; fi

script:
- |
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
@@ -1,5 +1,7 @@
# Sparklyr 0.9.2 (unreleased)

- Support for launching Livy logs from connection pane.

- Removed `overwrite` parameter in `spark_read_table()` (#1698).

- Fix regression preventing using R 3.2 (#1695).
Expand Down
65 changes: 65 additions & 0 deletions R/arrow_data.R
@@ -0,0 +1,65 @@
arrow_enabled <- function(sc) {
spark_config_value(sc, "sparklyr.arrow", "package:arrow" %in% search())
}

arrow_batch <- function(df)
{
record_batch <- get("record_batch", envir = as.environment(asNamespace("arrow")))
write_record_batch <- get("write_record_batch", envir = as.environment(asNamespace("arrow")))

record <- record_batch(df)
write_record_batch(record, raw())
}

arrow_read_stream <- function(stream)
{
record_batch_stream_reader <- get("record_batch_stream_reader", envir = as.environment(asNamespace("arrow")))
read_record_batch <- get("read_record_batch", envir = as.environment(asNamespace("arrow")))

reader <- record_batch_stream_reader(stream)
record_entry <- read_record_batch(reader)

entries <- list()
while (!record_entry$is_null()) {
entries[[length(entries) + 1]] <- tibble::as_tibble(record_entry)
record_entry <- read_record_batch(reader)
}

entries
}

arrow_copy_to <- function(sc, df, parallelism = 8L, serializer = "arrow")
{
# replace factors with characters
if (any(sapply(df, is.factor))) {
df <- dplyr::as_data_frame(lapply(df, function(x) if(is.factor(x)) as.character(x) else x))
}

# serialize to arrow
bytes <- arrow_batch(df)

# create batches data frame
batches <- list(bytes)

# build schema
schema <- spark_data_build_types(sc, lapply(df, class))

# load arrow file in scala
rdd <- invoke_static(sc, "sparklyr.ArrowHelper", "javaRddFromBinaryBatches", spark_context(sc), batches, parallelism)
sdf <- invoke_static(sc, "sparklyr.ArrowConverters", "toDataFrame", rdd, schema, spark_session(sc))

sdf
}

arrow_collect <- function(tbl, ...)
{
sc <- spark_connection(tbl)
sdf <- spark_dataframe(tbl)
session <- spark_session(sc)

time_zone <- spark_session(sc) %>% invoke("sessionState") %>% invoke("conf") %>% invoke("sessionLocalTimeZone")

invoke_static(sc, "sparklyr.ArrowConverters", "toArrowBatchRdd", sdf, session, time_zone) %>%
arrow_read_stream() %>%
dplyr::bind_rows()
}
1 change: 1 addition & 0 deletions R/config_settings.R
Expand Up @@ -8,6 +8,7 @@ spark_config_settings <- function() {
sparklyr.apply.packages = "Configures default value for packages parameter in spark_apply().",
sparklyr.apply.rlang = "Experimental feature. Turns on improved serialization for spark_apply().",
sparklyr.apply.schema.infer = "Number of rows collected to infer schema when column types specified in spark_apply().",
sparklyr.arrow = "Use Apache Arrow to serialize data?",
sparklyr.backend.interval = "Total seconds sparklyr will check on a backend operation.",
sparklyr.backend.timeout = "Total seconds before sparklyr will give up waiting for a backend operation to complete.",
sparklyr.connect.aftersubmit = "R function to call after spark-submit executes.",
Expand Down
6 changes: 6 additions & 0 deletions R/connection_viewer.R
Expand Up @@ -23,6 +23,12 @@ spark_actions <- function(scon) {
callback = function() {
utils::browseURL(file.path(scon$master, "ui"))
}
),
"Log" = list(
icon = file.path(icons, "spark-log.png"),
callback = function() {
utils::browseURL(file.path(scon$master, "ui", "session", scon$sessionId, "log"))
}
)
)
)
Expand Down
2 changes: 1 addition & 1 deletion R/core_invoke.R
Expand Up @@ -111,7 +111,7 @@ core_invoke_method <- function(sc, static, object, method, ...)
backend <- core_invoke_socket(sc)
connection_name <- core_invoke_socket_name(sc)

if (!identical(object, "Handler")) {
if (!identical(object, "Handler") && getOption("sparklyr.connection.cancellable", TRUE)) {
# if connection still running, sync to valid state
if (identical(sc$state$status[[connection_name]], "running"))
core_invoke_sync(sc)
Expand Down
4 changes: 3 additions & 1 deletion R/core_worker_config.R
Expand Up @@ -5,6 +5,7 @@ worker_config_serialize <- function(config) {
spark_config_value(config, "sparklyr.worker.gateway.address", "localhost"),
if (isTRUE(config$profile)) "TRUE" else "FALSE",
if (isTRUE(config$schema)) "TRUE" else "FALSE",
if (isTRUE(config$arrow)) "TRUE" else "FALSE",
sep = ";"
)
}
Expand All @@ -17,6 +18,7 @@ worker_config_deserialize <- function(raw) {
sparklyr.gateway.port = as.integer(parts[[2]]),
sparklyr.gateway.address = parts[[3]],
profile = as.logical(parts[[4]]),
schema = as.logical(parts[[5]])
schema = as.logical(parts[[5]]),
arrow = as.logical(parts[[6]])
)
}
28 changes: 20 additions & 8 deletions R/data_copy.R
Expand Up @@ -115,12 +115,16 @@ spark_serialize_csv_scala <- function(sc, df, columns, repartition) {
invoke(hive_context(sc), "createDataFrame", rdd, structType)
}

spark_serialize_arrow <- function(sc, df, columns, repartition) {
arrow_copy_to(sc, df)
}

spark_data_copy <- function(
sc,
df,
name,
repartition,
serializer = getOption("sparklyr.copy.serializer", "csv_file")) {
serializer = NULL) {

if (!is.numeric(repartition)) {
stop("The repartition parameter must be an integer")
Expand All @@ -130,12 +134,19 @@ spark_data_copy <- function(
stop("Using a local file to copy data is not supported for remote clusters")
}

serializer <- ifelse(is.null(serializer),
ifelse(spark_connection_is_local(sc) ||
spark_connection_is_yarn_client(sc),
"csv_file_scala",
"csv_string"),
serializer)
serializer <- ifelse(
is.null(serializer),
ifelse(
arrow_enabled(sc),
"arrow",
ifelse(
spark_connection_is_local(sc) || spark_connection_is_yarn_client(sc),
"csv_file_scala",
getOption("sparklyr.copy.serializer", "csv_string")
)
),
serializer
)

# Spark unfortunately has a number of issues with '.'s in column names, e.g.
#
Expand All @@ -159,7 +170,8 @@ spark_data_copy <- function(
serializers <- list(
"csv_file" = spark_serialize_csv_file,
"csv_string" = spark_serialize_csv_string,
"csv_file_scala" = spark_serialize_csv_scala
"csv_file_scala" = spark_serialize_csv_scala,
"arrow" = spark_serialize_arrow
)

df <- serializers[[serializer]](sc, df, columns, repartition)
Expand Down
2 changes: 1 addition & 1 deletion R/dbi_spark_table.R
Expand Up @@ -30,7 +30,7 @@ setMethod("dbReadTable", c("spark_connection", "character"),


setMethod("dbListTables", "spark_connection", function(conn) {
df <- df_from_sql(conn, "SHOW TABLES")
df <- df_from_sql(conn, "SHOW TABLES", arrow = FALSE)

tableNames <- df$tableName
filtered <- grep("^sparklyr_tmp_", tableNames, invert = TRUE, value = TRUE)
Expand Down
3 changes: 1 addition & 2 deletions R/install_spark_versions.R
Expand Up @@ -124,7 +124,7 @@ spark_versions <- function(latest = TRUE) {
if (dir.exists(maybeDir)) {
fileName <- basename(maybeDir)
m <- regmatches(fileName, regexec(spark_versions_file_pattern(), fileName))[[1]]
if (length(m) > 2) list(spark = m[[2]], hadoop = m[[3]]) else NULL
if (length(m) > 2) list(spark = m[[2]], hadoop = m[[3]], pattern = fileName) else NULL
}
})
),
Expand All @@ -134,7 +134,6 @@ spark_versions <- function(latest = TRUE) {

newRow <- c(row, installed = TRUE)
newRow$base <- ""
newRow$pattern <- ""
newRow$download <- ""
newRow$default <- FALSE
newRow$hadoop_default <- FALSE
Expand Down
13 changes: 10 additions & 3 deletions R/livy_connection.R
Expand Up @@ -697,6 +697,9 @@ livy_load_scala_sources <- function(sc) {
"serializer.scala",
"stream.scala",
"repartition.scala",
"arrowhelper.scala",
"arrowbatchstreamwriter.scala",
"arrowconverters.scala",
"applyutils.scala",
"classutils.scala",
"fileutils.scala",
Expand Down Expand Up @@ -762,15 +765,19 @@ initialize_connection.livy_connection <- function(sc) {
tryCatch({
livy_load_scala_sources(sc)

session <- NULL
sc$state$spark_context <- tryCatch({
session <<- invoke_static(
session <- tryCatch({
invoke_static(
sc,
"org.apache.spark.sql.SparkSession",
"builder"
) %>%
invoke("getOrCreate")
},
error = function(e) {
NULL
})

sc$state$spark_context <- tryCatch({
invoke(session, "sparkContext")
},
error = function(e) {
Expand Down
3 changes: 3 additions & 0 deletions R/livy_sources.R
Expand Up @@ -11,6 +11,9 @@ livy_sources_included <- function() {
"/repartition\\.scala",
"/tracker\\.scala",
"/livyutils\\.scala",
"/arrowhelper\\.scala",
"/arrowbatchstreamwriter\\.scala",
"/arrowconverters\\.scala",
"/applyutils\\.scala",
"/classutils\\.scala",
"/fileutils\\.scala",
Expand Down
8 changes: 4 additions & 4 deletions R/sdf_sql.R
@@ -1,11 +1,11 @@

df_from_sql <- function(sc, sql) {
df_from_sql <- function(sc, sql, arrow = TRUE) {
sdf <- invoke(hive_context(sc), "sql", as.character(sql))
df_from_sdf(sc, sdf)
df_from_sdf(sc, sdf, arrow = arrow)
}

df_from_sdf <- function(sc, sdf, take = -1) {
sdf_collect(sdf)
df_from_sdf <- function(sc, sdf, take = -1, arrow = TRUE) {
sdf_collect(sdf, arrow = arrow)
}

#' Spark DataFrame from SQL
Expand Down
3 changes: 3 additions & 0 deletions R/sdf_wrapper.R
Expand Up @@ -93,10 +93,13 @@ sdf_read_column <- function(x, column) {
#'
#' @export
sdf_collect <- function(object, ...) {
args <- list(...)
sc <- spark_connection(object)

if (sdf_is_streaming(object))
sdf_collect_stream(object, ...)
else if (arrow_enabled(sc) && !identical(args$arrow, FALSE))
arrow_collect(object, ...)
else
sdf_collect_static(object, ...)
}
Expand Down