New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reliable spark crasher on current (June 25, 2017) development version of Sparklyr #783

Open
JohnMount opened this Issue Jun 25, 2017 · 13 comments

Comments

Projects
None yet
5 participants
@JohnMount

This is essentially re-opening Sparklyr issue 721.

I dev-tools installed all of dplyr, dbplyr, and sparklyr just now (June 25, 2017) and tried to run the code below. After about 10 repeats of the "rep" loop it hard-locks the Spark cluster (just a local cluster) with excessive GC load (I got a message from the HTML user interface once, and have pasted that below). This is a re-worked example that should have no leaks if the dplyr::db_drop_table() steps are working, and should not have long dependencies if the dplyr::compute() steps are working. Each repetition simulates rbinding about 12 tables for a final row count of less that 400 rows. So even though we are repeating hundreds of steps- they are all supposed to be affordable steps.

# devtools::install_github("tidyverse/dplyr")
# devtools::install_github("tidyverse/dbplyr")
# devtools::install_github("rstudio/sparklyr")
# See also: https://github.com/rstudio/sparklyr/issues/721
suppressPackageStartupMessages(library("dplyr"))

devtools::session_info()

# Session info ---------------------------------------------------------------------------------------------------------------------------------------
#  setting  value
#  version  R version 3.4.0 (2017-04-21)
#  system   x86_64, darwin15.6.0
#  ui       RStudio (1.0.143)
#  language (EN)
#  collate  en_US.UTF-8
#  tz       America/Los_Angeles
#  date     2017-06-25
#
# Packages -------------------------------------------------------------------------------------------------------------------------------------------
#  package    * version    date       source
#  assertthat   0.2.0      2017-04-11 CRAN (R 3.4.0)
#  backports    1.1.0      2017-05-22 CRAN (R 3.4.0)
#  base       * 3.4.0      2017-04-21 local
#  base64enc    0.1-3      2015-07-28 CRAN (R 3.4.0)
#  bindr        0.1        2016-11-13 cran (@0.1)
#  bindrcpp     0.2        2017-06-17 CRAN (R 3.4.0)
#  broom        0.4.2      2017-02-13 CRAN (R 3.4.0)
#  compiler     3.4.0      2017-04-21 local
#  curl         2.6        2017-04-27 CRAN (R 3.4.0)
#  datasets   * 3.4.0      2017-04-21 local
#  DBI          0.7        2017-06-18 CRAN (R 3.4.0)
#  dbplyr       1.0.0.9000 2017-06-25 Github (tidyverse/dbplyr@59875c3)
#  devtools     1.13.2     2017-06-02 CRAN (R 3.4.0)
#  digest       0.6.12     2017-01-27 CRAN (R 3.4.0)
#  dplyr        0.7.1.9000 2017-06-24 Github (tidyverse/dplyr@4bb35fb)
#  foreign      0.8-69     2017-06-21 CRAN (R 3.4.0)
#  git2r        0.18.0     2017-01-01 CRAN (R 3.4.0)
#  glue         1.1.1      2017-06-21 CRAN (R 3.4.0)
#  graphics   * 3.4.0      2017-04-21 local
#  grDevices  * 3.4.0      2017-04-21 local
#  grid         3.4.0      2017-04-21 local
#  htmltools    0.3.6      2017-04-28 CRAN (R 3.4.0)
#  httpuv       1.3.3      2015-08-04 CRAN (R 3.4.0)
#  httr         1.2.1      2016-07-03 CRAN (R 3.4.0)
#  jsonlite     1.5        2017-06-01 CRAN (R 3.4.0)
#  knitr        1.16       2017-05-18 CRAN (R 3.4.0)
#  lattice      0.20-35    2017-03-25 CRAN (R 3.4.0)
#  lazyeval     0.2.0      2016-06-12 CRAN (R 3.4.0)
#  magrittr     1.5        2014-11-22 CRAN (R 3.4.0)
#  memoise      1.1.0      2017-04-21 CRAN (R 3.4.0)
#  methods    * 3.4.0      2017-04-21 local
#  mime         0.5        2016-07-07 CRAN (R 3.4.0)
#  mnormt       1.5-5      2016-10-15 CRAN (R 3.4.0)
#  nlme         3.1-131    2017-02-06 CRAN (R 3.4.0)
#  parallel     3.4.0      2017-04-21 local
#  pkgconfig    2.0.1      2017-03-21 cran (@2.0.1)
#  plyr         1.8.4      2016-06-08 CRAN (R 3.4.0)
#  psych        1.7.5      2017-05-03 CRAN (R 3.4.0)
#  R6           2.2.2      2017-06-17 CRAN (R 3.4.0)
#  Rcpp         0.12.11    2017-05-22 CRAN (R 3.4.0)
#  reshape2     1.4.2      2016-10-22 CRAN (R 3.4.0)
#  rlang        0.1.1.9000 2017-06-22 Github (tidyverse/rlang@a97e7fa)
#  rprojroot    1.2        2017-01-16 CRAN (R 3.4.0)
#  rstudioapi   0.6        2016-06-27 CRAN (R 3.4.0)
#  shiny        1.0.3      2017-04-26 CRAN (R 3.4.0)
#  sparklyr     0.5.6-9004 2017-06-25 Github (rstudio/sparklyr@faaeb68)
#  stats      * 3.4.0      2017-04-21 local
#  stringi      1.1.5      2017-04-07 CRAN (R 3.4.0)
#  stringr      1.2.0      2017-02-18 CRAN (R 3.4.0)
#  tibble       1.3.3      2017-05-28 CRAN (R 3.4.0)
#  tidyr        0.6.3      2017-05-15 CRAN (R 3.4.0)
#  tools        3.4.0      2017-04-21 local
#  utils      * 3.4.0      2017-04-21 local
#  withr        1.0.2      2016-06-20 CRAN (R 3.4.0)
#  xtable       1.8-2      2016-02-05 CRAN (R 3.4.0)

sc <- sparklyr::spark_connect(version='2.0.2',
                              master = "local")
print(sc)


# $master
# [1] "local[4]"
#
# $method
# [1] "shell"
#
# $app_name
# [1] "sparklyr"
#
# $config
# $config$sparklyr.cores.local
# [1] 4
#
# $config$spark.sql.shuffle.partitions.local
# [1] 4
#
# $config$spark.env.SPARK_LOCAL_IP.local
# [1] "127.0.0.1"
#
# $config$sparklyr.csv.embedded
# [1] "^1.*"
#
# $config$`sparklyr.shell.driver-class-path`
# [1] ""
#
# attr(,"config")
# [1] "default"
# attr(,"file")
# [1] "/Library/Frameworks/R.framework/Versions/3.4/Resources/library/sparklyr/conf/config-template.yml"
#
# $spark_home
# [1] "/Users/johnmount/Library/Caches/spark/spark-2.0.2-bin-hadoop2.7"
#
# $backend
# A connection with
# description "->localhost:49632"
# class       "sockconn"
# mode        "wb"
# text        "binary"
# opened      "opened"
# can read    "yes"
# can write   "yes"
#
# $monitor
# A connection with
# description "->localhost:8880"
# class       "sockconn"
# mode        "rb"
# text        "binary"
# opened      "opened"
# can read    "yes"
# can write   "yes"
#
# $output_file
# [1] "/var/folders/7q/h_jp2vj131g5799gfnpzhdp80000gn/T//Rtmp3oin5Z/file5f5591c5712_spark.log"
#
# $spark_context
# <jobj[6]>
#   class org.apache.spark.SparkContext
#   org.apache.spark.SparkContext@284b61f2
#
# $java_context
# <jobj[7]>
#   class org.apache.spark.api.java.JavaSparkContext
#   org.apache.spark.api.java.JavaSparkContext@45f23727
#
# attr(,"class")
# [1] "spark_connection"       "spark_shell_connection" "DBIConnection"



#' Compute union_all of tables.  Cut down from \code{replyr::replyr_union_all()} for debugging.
#'
#' @param sc remote data source tables are on (and where to copy-to and work), NULL for local tables.
#' @param tabA not-NULL table with at least 1 row on sc data source, and columns \code{c("car", "fact", "value")}.
#' @param tabB not-NULL table with at least 1 row on same data source as tabA and columns \code{c("car", "fact", "value")}.
#' @return table with all rows of tabA and tabB (union_all).
#'
#' @export
example_union_all <- function(sc, tabA, tabB) {
  cols <- intersect(colnames(tabA), colnames(tabB))
  expectedCols <- c("car", "fact", "value")
  if((length(cols)!=length(expectedCols)) ||
     (!all.equal(cols, expectedCols))) {
    stop(paste("example_union_all: column set must be exactly",
               paste(expectedCols, collapse = ', ')))
  }
  mergeColName <- 'exampleunioncol'
  # build a 2-row table to control the union
  controlTable <- data.frame(exampleunioncol= c('a', 'b'),
                             stringsAsFactors = FALSE)
  if(!is.null(sc)) {
    controlTable <- copy_to(sc, controlTable,
                            temporary=TRUE)
  }
  # decorate left and right tables for the merge
  tabA <- tabA %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('a'))
  tabB <- tabB %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('b'))
  # do the merges
  joined <- controlTable %>%
    left_join(tabA, by=mergeColName) %>%
    left_join(tabB, by=mergeColName, suffix = c('_a', '_b'))
  # coalesce the values
  joined <- joined %>%
    mutate(car = ifelse(exampleunioncol=='a', car_a, car_b))
  joined <- joined %>%
    mutate(fact = ifelse(exampleunioncol=='a', fact_a, fact_b))
  joined <- joined %>%
    mutate(value = ifelse(exampleunioncol=='a', value_a, value_b))
  joined %>%
    select(one_of(cols))
}


mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars))

frameList <- mtcars2 %>%
  tidyr::gather(key='fact', value='value', -car) %>%
  split(., .$fact)

frameListS <- lapply(names(frameList),
                     function(ni) {
                       copy_to(sc, frameList[[ni]], ni)
                     }
)

count <- 1
for(rep in 1:20) {
  print(paste('start rep', rep, base::date()))
  nm <- paste('tmp', count, sep='_')
  count <- count + 1
  res <- compute(frameListS[[1]], name=nm)
  for(i in (2:length(frameListS))) {
    print(paste(' start phase', rep, i, base::date()))
    oi <- frameListS[[i]]
    res <- example_union_all(sc, res, oi)
    prevNM <- nm
    nm <- paste('tmp', count, sep='_')
    count <- count + 1
    res <- compute(res, name=nm)
    dplyr::db_drop_table(sc, prevNM)
    print(paste(' done phase', rep, i, base::date()))
  }
  print(head(res))
  dplyr::db_drop_table(sc, nm)
  print(paste('done rep', rep, base::date()))
}

# Code above should have no resource leaks (the db_drop_tables should match the computes).
# It is only building a final table of 352 rows (essentially binding rows on a remote source).

# Hard-locked Spark cluster on rep 10 step 9 or so (so may need more reps!).

# HTTP ERROR 500
#
# Problem accessing /jobs/. Reason:
#
#     Server Error
# Caused by:
#
# java.lang.OutOfMemoryError: GC overhead limit exceeded
# 	at java.util.Arrays.copyOfRange(Arrays.java:3664)
# 	at java.lang.String.<init>(String.java:207)
# 	at java.lang.StringBuilder.toString(StringBuilder.java:407)
# 	at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:249)
# 	at sun.net.www.protocol.file.Handler.parseURL(Handler.java:67)
# 	at java.net.URL.<init>(URL.java:615)
# 	at java.net.URL.<init>(URL.java:483)
# 	at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1222)
# 	at sun.misc.URLClassPath$FileLoader.findResource(URLClassPath.java:1212)
# 	at sun.misc.URLClassPath.findResource(URLClassPath.java:188)
# 	at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
# 	at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
# 	at java.security.AccessController.doPrivileged(Native Method)
# 	at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
# 	at java.lang.ClassLoader.getResource(ClassLoader.java:1093)
# 	at java.lang.ClassLoader.getResource(ClassLoader.java:1088)
# 	at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
# 	at org.apache.xerces.parsers.SecuritySupport$6.run(Unknown Source)
# 	at java.security.AccessController.doPrivileged(Native Method)
# 	at org.apache.xerces.parsers.SecuritySupport.getResourceAsStream(Unknown Source)
# 	at org.apache.xerces.parsers.ObjectFactory.findJarServiceProvider(Unknown Source)
# 	at org.apache.xerces.parsers.ObjectFactory.createObject(Unknown Source)
# 	at org.apache.xerces.parsers.ObjectFactory.createObject(Unknown Source)
# 	at org.apache.xerces.parsers.SAXParser.<init>(Unknown Source)
# 	at org.apache.xerces.parsers.SAXParser.<init>(Unknown Source)
# 	at org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser.<init>(Unknown Source)
# 	at org.apache.xerces.jaxp.SAXParserImpl.<init>(Unknown Source)
# 	at org.apache.xerces.jaxp.SAXParserFactoryImpl.newSAXParser(Unknown Source)
# 	at scala.xml.factory.XMLLoader$class.parser(XMLLoader.scala:30)
# 	at scala.xml.XML$.parser(XML.scala:60)
# 	at scala.xml.factory.XMLLoader$class.loadString(XMLLoader.scala:60)
# 	at scala.xml.XML$.loadString(XML.scala:60)
# Powered by Jetty://

@JohnMount JohnMount referenced this issue Jun 25, 2017

Closed

Spark crasher #721

@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 25, 2017

I noticed the example was leaking the 2-row control table. I added code to collect that and crashed again. The modified code is below. I could not get all of the error message as I lost the R session (image attached).
untitled

# devtools::install_github("tidyverse/dplyr")
# devtools::install_github("tidyverse/dbplyr")
# devtools::install_github("rstudio/sparklyr")
# See also: https://github.com/rstudio/sparklyr/issues/721
suppressPackageStartupMessages(library("dplyr"))

devtools::session_info()


sc <- sparklyr::spark_connect(version='2.0.2',
                              master = "local")
print(sc)



#' Compute union_all of tables.  Cut down from \code{replyr::replyr_union_all()} for debugging.
#'
#' @param sc remote data source tables are on (and where to copy-to and work), NULL for local tables.
#' @param tabA not-NULL table with at least 1 row on sc data source, and columns \code{c("car", "fact", "value")}.
#' @param tabB not-NULL table with at least 1 row on same data source as tabA and columns \code{c("car", "fact", "value")}.
#' @param tempName name for temp table
#' @return table with all rows of tabA and tabB (union_all).
#'
#' @export
example_union_all <- function(sc, tabA, tabB, tempName) {
  cols <- intersect(colnames(tabA), colnames(tabB))
  expectedCols <- c("car", "fact", "value")
  if((length(cols)!=length(expectedCols)) ||
     (!all.equal(cols, expectedCols))) {
    stop(paste("example_union_all: column set must be exactly",
               paste(expectedCols, collapse = ', ')))
  }
  mergeColName <- 'exampleunioncol'
  # build a 2-row table to control the union
  controlTable <- data.frame(exampleunioncol= c('a', 'b'),
                             stringsAsFactors = FALSE)
  if(!is.null(sc)) {
    controlTable <- copy_to(sc, controlTable,
                            name= tempName,
                            temporary=TRUE)
  }
  # decorate left and right tables for the merge
  tabA <- tabA %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('a'))
  tabB <- tabB %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('b'))
  # do the merges
  joined <- controlTable %>%
    left_join(tabA, by=mergeColName) %>%
    left_join(tabB, by=mergeColName, suffix = c('_a', '_b'))
  # coalesce the values
  joined <- joined %>%
    mutate(car = ifelse(exampleunioncol=='a', car_a, car_b))
  joined <- joined %>%
    mutate(fact = ifelse(exampleunioncol=='a', fact_a, fact_b))
  joined <- joined %>%
    mutate(value = ifelse(exampleunioncol=='a', value_a, value_b))
  joined %>%
    select(one_of(cols))
}


mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars))

frameList <- mtcars2 %>%
  tidyr::gather(key='fact', value='value', -car) %>%
  split(., .$fact)

frameListS <- lapply(names(frameList),
                     function(ni) {
                       copy_to(sc, frameList[[ni]], ni)
                     }
)

count <- 1
for(rep in 1:20) {
  print(paste('start rep', rep, base::date()))
  nm <- paste('tmp', count, sep='_')
  count <- count + 1
  res <- compute(frameListS[[1]], name=nm)
  for(i in (2:length(frameListS))) {
    print(paste(' start phase', rep, i, base::date()))
    oi <- frameListS[[i]]
    nm2 <- paste('ctmp', count, sep='_')
    count <- count + 1
    res <- example_union_all(sc, res, oi, nm2)
    prevNM <- nm
    nm <- paste('tmp', count, sep='_')
    count <- count + 1
    res <- compute(res, name=nm)
    dplyr::db_drop_table(sc, nm2)
    dplyr::db_drop_table(sc, prevNM)
    print(paste(' done phase', rep, i, base::date()))
  }
  print(head(res))
  dplyr::db_drop_table(sc, nm)
  print(paste('done rep', rep, base::date()))
}

I noticed the example was leaking the 2-row control table. I added code to collect that and crashed again. The modified code is below. I could not get all of the error message as I lost the R session (image attached).
untitled

# devtools::install_github("tidyverse/dplyr")
# devtools::install_github("tidyverse/dbplyr")
# devtools::install_github("rstudio/sparklyr")
# See also: https://github.com/rstudio/sparklyr/issues/721
suppressPackageStartupMessages(library("dplyr"))

devtools::session_info()


sc <- sparklyr::spark_connect(version='2.0.2',
                              master = "local")
print(sc)



#' Compute union_all of tables.  Cut down from \code{replyr::replyr_union_all()} for debugging.
#'
#' @param sc remote data source tables are on (and where to copy-to and work), NULL for local tables.
#' @param tabA not-NULL table with at least 1 row on sc data source, and columns \code{c("car", "fact", "value")}.
#' @param tabB not-NULL table with at least 1 row on same data source as tabA and columns \code{c("car", "fact", "value")}.
#' @param tempName name for temp table
#' @return table with all rows of tabA and tabB (union_all).
#'
#' @export
example_union_all <- function(sc, tabA, tabB, tempName) {
  cols <- intersect(colnames(tabA), colnames(tabB))
  expectedCols <- c("car", "fact", "value")
  if((length(cols)!=length(expectedCols)) ||
     (!all.equal(cols, expectedCols))) {
    stop(paste("example_union_all: column set must be exactly",
               paste(expectedCols, collapse = ', ')))
  }
  mergeColName <- 'exampleunioncol'
  # build a 2-row table to control the union
  controlTable <- data.frame(exampleunioncol= c('a', 'b'),
                             stringsAsFactors = FALSE)
  if(!is.null(sc)) {
    controlTable <- copy_to(sc, controlTable,
                            name= tempName,
                            temporary=TRUE)
  }
  # decorate left and right tables for the merge
  tabA <- tabA %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('a'))
  tabB <- tabB %>%
    select(one_of(cols)) %>%
    mutate(exampleunioncol = as.character('b'))
  # do the merges
  joined <- controlTable %>%
    left_join(tabA, by=mergeColName) %>%
    left_join(tabB, by=mergeColName, suffix = c('_a', '_b'))
  # coalesce the values
  joined <- joined %>%
    mutate(car = ifelse(exampleunioncol=='a', car_a, car_b))
  joined <- joined %>%
    mutate(fact = ifelse(exampleunioncol=='a', fact_a, fact_b))
  joined <- joined %>%
    mutate(value = ifelse(exampleunioncol=='a', value_a, value_b))
  joined %>%
    select(one_of(cols))
}


mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars))

frameList <- mtcars2 %>%
  tidyr::gather(key='fact', value='value', -car) %>%
  split(., .$fact)

frameListS <- lapply(names(frameList),
                     function(ni) {
                       copy_to(sc, frameList[[ni]], ni)
                     }
)

count <- 1
for(rep in 1:20) {
  print(paste('start rep', rep, base::date()))
  nm <- paste('tmp', count, sep='_')
  count <- count + 1
  res <- compute(frameListS[[1]], name=nm)
  for(i in (2:length(frameListS))) {
    print(paste(' start phase', rep, i, base::date()))
    oi <- frameListS[[i]]
    nm2 <- paste('ctmp', count, sep='_')
    count <- count + 1
    res <- example_union_all(sc, res, oi, nm2)
    prevNM <- nm
    nm <- paste('tmp', count, sep='_')
    count <- count + 1
    res <- compute(res, name=nm)
    dplyr::db_drop_table(sc, nm2)
    dplyr::db_drop_table(sc, prevNM)
    print(paste(' done phase', rep, i, base::date()))
  }
  print(head(res))
  dplyr::db_drop_table(sc, nm)
  print(paste('done rep', rep, base::date()))
}
@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 25, 2017

I tried it as Markdown. If failed, but I didn't get any messages back. I am attaching the example as a .Rmd file. It doesn't knit- but it is easy to run the chunk to see the issue.

SparkCrasher.Rmd.txt
log4j.spark.log.txt
derby.log.txt

I tried it as Markdown. If failed, but I didn't get any messages back. I am attaching the example as a .Rmd file. It doesn't knit- but it is easy to run the chunk to see the issue.

SparkCrasher.Rmd.txt
log4j.spark.log.txt
derby.log.txt

@kevinykuo

This comment has been minimized.

Show comment
Hide comment
@kevinykuo

kevinykuo Jun 26, 2017

Collaborator

@JohnMount Thanks for this reprex! I'll try to provide more details when I get a chance but I think you can resolve this by persisting with storage level MEMORY_ONLY_SER, i.e. instead of compute(), call sdf_persist() and sdf_register() to assign a name.

Collaborator

kevinykuo commented Jun 26, 2017

@JohnMount Thanks for this reprex! I'll try to provide more details when I get a chance but I think you can resolve this by persisting with storage level MEMORY_ONLY_SER, i.e. instead of compute(), call sdf_persist() and sdf_register() to assign a name.

@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 26, 2017

Thanks @kevinykuo . I am actually just preparing. If I sdf_persist() and then sdf_register() will dplyr::db_drop() table be how I free the thing, or is there another call?

JohnMount commented Jun 26, 2017

Thanks @kevinykuo . I am actually just preparing. If I sdf_persist() and then sdf_register() will dplyr::db_drop() table be how I free the thing, or is there another call?

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Jun 26, 2017

Member

@JohnMount Since this is an OutOfMemoryException, I would also try increasing memory:

config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.0.2',
                              master = "local", config = config)

and also replace compute() with sdf_checkpoint() as suggested in the previous issue.

I'm interested in fixing the r-session; however, this might require a fix to R so the easiest workaround is to avoid critical exceptions in Spark (easier said than done).

Member

javierluraschi commented Jun 26, 2017

@JohnMount Since this is an OutOfMemoryException, I would also try increasing memory:

config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.0.2',
                              master = "local", config = config)

and also replace compute() with sdf_checkpoint() as suggested in the previous issue.

I'm interested in fixing the r-session; however, this might require a fix to R so the easiest workaround is to avoid critical exceptions in Spark (easier said than done).

@kevinykuo

This comment has been minimized.

Show comment
Hide comment
@kevinykuo

kevinykuo Jun 26, 2017

Collaborator

@JohnMount When I looked at this example the DAGs didn't blow up like they did in the previous issue, so sdf_checkpoint() might not help since its main use is to truncate the lineage. The problem here is likely that you're creating too many intermediate objects causing the GC to hang. Caching serialized data is more of a bandaid here; you should try to avoid "churning" datasets in your application.

Here are a couple resources:
https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
https://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning

BTW, are you trying to re-implement sdf_bind_rows()?

Collaborator

kevinykuo commented Jun 26, 2017

@JohnMount When I looked at this example the DAGs didn't blow up like they did in the previous issue, so sdf_checkpoint() might not help since its main use is to truncate the lineage. The problem here is likely that you're creating too many intermediate objects causing the GC to hang. Caching serialized data is more of a bandaid here; you should try to avoid "churning" datasets in your application.

Here are a couple resources:
https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
https://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning

BTW, are you trying to re-implement sdf_bind_rows()?

@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 26, 2017

Yup- it is a generic brute force implementation of bind_rows() (for databases in addition to Spark). I probably will special case to calling sdf_bind_rows() for Spark.

That being said- it is a bit of a medium/small test. It is only binding about 30 tiny tables together 10 times. So I am still using it as a stand-in for complicated client work (which is usually about 10 large tables).

Yup- it is a generic brute force implementation of bind_rows() (for databases in addition to Spark). I probably will special case to calling sdf_bind_rows() for Spark.

That being said- it is a bit of a medium/small test. It is only binding about 30 tiny tables together 10 times. So I am still using it as a stand-in for complicated client work (which is usually about 10 large tables).

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Jun 27, 2017

Member

@JohnMount did additional memory suggestion helped in any way? If you try using the latest version of sparklyr, I'm hoping that at least we can avoid the r session crash with this fix: #787. We'll send a few more suggestions your way.

I agree that using sdf_bind_rows() would be ideal, but it also seems like Spark/sparklyr shouldn't crash with in here, it is a bit of a stretch-use-case but happy to keep investigating a bit further to see what can be optimized.

Member

javierluraschi commented Jun 27, 2017

@JohnMount did additional memory suggestion helped in any way? If you try using the latest version of sparklyr, I'm hoping that at least we can avoid the r session crash with this fix: #787. We'll send a few more suggestions your way.

I agree that using sdf_bind_rows() would be ideal, but it also seems like Spark/sparklyr shouldn't crash with in here, it is a bit of a stretch-use-case but happy to keep investigating a bit further to see what can be optimized.

@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 28, 2017

@javierluraschi The additional memory did not help. The cluster went to Java 100% with 8GB allocated (so I think it did see the config). I have am maintaining the example here.

I think we are on the same page on this example (and I very much appreciate that): I want to use this code fragment to work as an acid test on Spark clusters and installs. At some point you are always going to hit a stretch-use-case with actual production tasks, so it is good to have some experience getting a practice task fixed up.

@javierluraschi The additional memory did not help. The cluster went to Java 100% with 8GB allocated (so I think it did see the config). I have am maintaining the example here.

I think we are on the same page on this example (and I very much appreciate that): I want to use this code fragment to work as an acid test on Spark clusters and installs. At some point you are always going to hit a stretch-use-case with actual production tasks, so it is good to have some experience getting a practice task fixed up.

@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 28, 2017

@javierluraschi , I am now reliably getting a segfault on the original (more complicated, packaged) source of this example.

Would you be able to try the following?

devtools::install_github('rstudio/sparklyr')
devtools::install_github('tidyverse/dplyr')
devtools::install_github('tidyverse/dbplyr')
devtools::install_github('WinVector/replyr')

I'll end up modifying replyr soon, so the commit I am interested in testing is: 54f17cdb944a3ba32fd2fcaa4c4fc4222105044b.

And then trying to knit the Rmarkdown file found here.

I know this involves the replyr package, but that is pure R package that only calls dplyr and such (so it is triggering the segfault, but I don't think it can be the actual source of the glitch).

 *** caught segfault ***
address 0x0, cause 'unknown'

Traceback:
 1: r_replyr_bind_rows(lst, colnames, eagerTempRemoval, TRUE, makeTempNameGenerator("bind_rows_priv"),     tempNameGenerator)
 2: replyr_bind_rows(rlist, tempNameGenerator = tempNameGenerator)
 3: replyr_moveValuesToRows(., nameForNewKeyColumn = "fact", nameForNewValueColumn = "value",     columnsToTakeFrom = colnames(mtcars), nameForNewClassColumn = "class")
 4: function_list[[i]](value)
 5: freduce(value, `_function_list`)
 6: `_fseq`(`_lhs`)
 7: eval(quote(`_fseq`(`_lhs`)), env, env)
 8: eval(quote(`_fseq`(`_lhs`)), env, env)
 9: withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
10: mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn = "fact",     nameForNewValueColumn = "value", columnsToTakeFrom = colnames(mtcars),     nameForNewClassColumn = "class") %>% arrange(car, fact)
11: eval(expr, envir, enclos)
12: eval(expr, envir, enclos)
13: withVisible(eval(expr, envir, enclos))
14: withCallingHandlers(withVisible(eval(expr, envir, enclos)), warning = wHandler,     error = eHandler, message = mHandler)
15: handle(ev <- withCallingHandlers(withVisible(eval(expr, envir,     enclos)), warning = wHandler, error = eHandler, message = mHandler))
16: timing_fn(handle(ev <- withCallingHandlers(withVisible(eval(expr,     envir, enclos)), warning = wHandler, error = eHandler, message = mHandler)))
17: evaluate_call(expr, parsed$src[[i]], envir = envir, enclos = enclos,     debug = debug, last = i == length(out), use_try = stop_on_error !=         2L, keep_warning = keep_warning, keep_message = keep_message,     output_handler = output_handler, include_timing = include_timing)
18: evaluate(code, envir = env, new_device = FALSE, keep_warning = !isFALSE(options$warning),     keep_message = !isFALSE(options$message), stop_on_error = if (options$error &&         options$include) 0L else 2L, output_handler = knit_handlers(options$render,         options))
19: in_dir(input_dir(), evaluate(code, envir = env, new_device = FALSE,     keep_warning = !isFALSE(options$warning), keep_message = !isFALSE(options$message),     stop_on_error = if (options$error && options$include) 0L else 2L,     output_handler = knit_handlers(options$render, options)))
20: block_exec(params)
21: call_block(x)
22: process_group.block(group)
23: process_group(group)
24: withCallingHandlers(if (tangle) process_tangle(group) else process_group(group),     error = function(e) {        setwd(wd)        cat(res, sep = "\n", file = output %n% "")        message("Quitting from lines ", paste(current_lines(i),             collapse = "-"), " (", knit_concord$get("infile"),             ") ")    })
25: process_file(text, output)
26: knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet,     encoding = encoding)
27: rmarkdown::render("/Users/johnmount/Documents/work/Examples/SparkTips/replyr/replyrExamples.Rmd",     encoding = "UTF-8")
An irrecoverable exception occurred. R is aborting now ...

JohnMount commented Jun 28, 2017

@javierluraschi , I am now reliably getting a segfault on the original (more complicated, packaged) source of this example.

Would you be able to try the following?

devtools::install_github('rstudio/sparklyr')
devtools::install_github('tidyverse/dplyr')
devtools::install_github('tidyverse/dbplyr')
devtools::install_github('WinVector/replyr')

I'll end up modifying replyr soon, so the commit I am interested in testing is: 54f17cdb944a3ba32fd2fcaa4c4fc4222105044b.

And then trying to knit the Rmarkdown file found here.

I know this involves the replyr package, but that is pure R package that only calls dplyr and such (so it is triggering the segfault, but I don't think it can be the actual source of the glitch).

 *** caught segfault ***
address 0x0, cause 'unknown'

Traceback:
 1: r_replyr_bind_rows(lst, colnames, eagerTempRemoval, TRUE, makeTempNameGenerator("bind_rows_priv"),     tempNameGenerator)
 2: replyr_bind_rows(rlist, tempNameGenerator = tempNameGenerator)
 3: replyr_moveValuesToRows(., nameForNewKeyColumn = "fact", nameForNewValueColumn = "value",     columnsToTakeFrom = colnames(mtcars), nameForNewClassColumn = "class")
 4: function_list[[i]](value)
 5: freduce(value, `_function_list`)
 6: `_fseq`(`_lhs`)
 7: eval(quote(`_fseq`(`_lhs`)), env, env)
 8: eval(quote(`_fseq`(`_lhs`)), env, env)
 9: withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
10: mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn = "fact",     nameForNewValueColumn = "value", columnsToTakeFrom = colnames(mtcars),     nameForNewClassColumn = "class") %>% arrange(car, fact)
11: eval(expr, envir, enclos)
12: eval(expr, envir, enclos)
13: withVisible(eval(expr, envir, enclos))
14: withCallingHandlers(withVisible(eval(expr, envir, enclos)), warning = wHandler,     error = eHandler, message = mHandler)
15: handle(ev <- withCallingHandlers(withVisible(eval(expr, envir,     enclos)), warning = wHandler, error = eHandler, message = mHandler))
16: timing_fn(handle(ev <- withCallingHandlers(withVisible(eval(expr,     envir, enclos)), warning = wHandler, error = eHandler, message = mHandler)))
17: evaluate_call(expr, parsed$src[[i]], envir = envir, enclos = enclos,     debug = debug, last = i == length(out), use_try = stop_on_error !=         2L, keep_warning = keep_warning, keep_message = keep_message,     output_handler = output_handler, include_timing = include_timing)
18: evaluate(code, envir = env, new_device = FALSE, keep_warning = !isFALSE(options$warning),     keep_message = !isFALSE(options$message), stop_on_error = if (options$error &&         options$include) 0L else 2L, output_handler = knit_handlers(options$render,         options))
19: in_dir(input_dir(), evaluate(code, envir = env, new_device = FALSE,     keep_warning = !isFALSE(options$warning), keep_message = !isFALSE(options$message),     stop_on_error = if (options$error && options$include) 0L else 2L,     output_handler = knit_handlers(options$render, options)))
20: block_exec(params)
21: call_block(x)
22: process_group.block(group)
23: process_group(group)
24: withCallingHandlers(if (tangle) process_tangle(group) else process_group(group),     error = function(e) {        setwd(wd)        cat(res, sep = "\n", file = output %n% "")        message("Quitting from lines ", paste(current_lines(i),             collapse = "-"), " (", knit_concord$get("infile"),             ") ")    })
25: process_file(text, output)
26: knitr::knit(knit_input, knit_output, envir = envir, quiet = quiet,     encoding = encoding)
27: rmarkdown::render("/Users/johnmount/Documents/work/Examples/SparkTips/replyr/replyrExamples.Rmd",     encoding = "UTF-8")
An irrecoverable exception occurred. R is aborting now ...
@JohnMount

This comment has been minimized.

Show comment
Hide comment
@JohnMount

JohnMount Jun 29, 2017

I am trying to prepare to switch my bind rows to use sparklyr::sdf_bind_rows() when on Spark, which looks good now that Sparklyr issue 804 is closed. Thanks! However, as we discussed the 54f17cdb944a3ba32fd2fcaa4c4fc4222105044b commit of replyr is a good shake-down example that you may want to run down to root causes.

JohnMount commented Jun 29, 2017

I am trying to prepare to switch my bind rows to use sparklyr::sdf_bind_rows() when on Spark, which looks good now that Sparklyr issue 804 is closed. Thanks! However, as we discussed the 54f17cdb944a3ba32fd2fcaa4c4fc4222105044b commit of replyr is a good shake-down example that you may want to run down to root causes.

@AkhilNairAmey

This comment has been minimized.

Show comment
Hide comment
@AkhilNairAmey

AkhilNairAmey Jul 10, 2017

Contributor

I've also seen some consistent segfaulting crop up in the last 3 weeks that weren't there before (haven't changed any code, just updated to dplyr 0.7.0 from 0.5.0.900x and sparklyr 0.5.6 from 0.5.4). Still investigating the issue on my end, but I'm fairly sure the root lives somewhere between sparklyr and dplyr. Will report back if find any more..

*** caught segfault ***
address 0x55efc90b4003, case 'memory not mapped'

I've also seen the cause 'unknown' 'reason' from the same code.

Contributor

AkhilNairAmey commented Jul 10, 2017

I've also seen some consistent segfaulting crop up in the last 3 weeks that weren't there before (haven't changed any code, just updated to dplyr 0.7.0 from 0.5.0.900x and sparklyr 0.5.6 from 0.5.4). Still investigating the issue on my end, but I'm fairly sure the root lives somewhere between sparklyr and dplyr. Will report back if find any more..

*** caught segfault ***
address 0x55efc90b4003, case 'memory not mapped'

I've also seen the cause 'unknown' 'reason' from the same code.

@MZLABS

This comment has been minimized.

Show comment
Hide comment

MZLABS commented Sep 20, 2017

May be related to sparklyr issue 1026.

@kevinykuo kevinykuo removed the sdf label Jan 17, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment