New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply functions from SparkR #81

Closed
ElianoMarques opened this Issue Jul 6, 2016 · 47 comments

Comments

Projects
None yet
@ElianoMarques

How would we do something similar to the apply functions in SparkR?

dapply(x, func, schema)
spark.lapply(sc, list, func)

Documentation can be found here.

Thank you

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Jul 6, 2016

Member

@ElianoMarques arbitrary r-code is currently not supported in sparklyr. However, we do have plans to bring this in the near future. I'll keep this issue open to provide updates.

Member

javierluraschi commented Jul 6, 2016

@ElianoMarques arbitrary r-code is currently not supported in sparklyr. However, we do have plans to bring this in the near future. I'll keep this issue open to provide updates.

@javierluraschi javierluraschi added this to the feature requests milestone Jul 7, 2016

@ElianoMarques

This comment has been minimized.

Show comment
Hide comment
@ElianoMarques

ElianoMarques Jul 7, 2016

Thanks for the reply.

Thanks for the reply.

@ElianoMarques

This comment has been minimized.

Show comment
Hide comment
@ElianoMarques

ElianoMarques Oct 11, 2016

@javierluraschi So for example, lets say you're interested in performing a particular ML model in a
partition or a particular node from R but that particular algorithm is not available in the ML framework of both ml and h20? The solution could require classic ML + Ensembling.
There are out there a few ML processes that doesn't have a 1-2-1 with ML or H20 but are very valuable. TensortFlow (great project you guys started), SOMs, Time Series, etc. Other areas this could help are simulation, automation of model selection and the list would continue to go on.

ElianoMarques commented Oct 11, 2016

@javierluraschi So for example, lets say you're interested in performing a particular ML model in a
partition or a particular node from R but that particular algorithm is not available in the ML framework of both ml and h20? The solution could require classic ML + Ensembling.
There are out there a few ML processes that doesn't have a 1-2-1 with ML or H20 but are very valuable. TensortFlow (great project you guys started), SOMs, Time Series, etc. Other areas this could help are simulation, automation of model selection and the list would continue to go on.

@kevinykuo

This comment has been minimized.

Show comment
Hide comment
@kevinykuo

kevinykuo Oct 15, 2016

Collaborator

Wanted to throw out possibly incorporating purrr functions... will have to think about how everything plays together.

Collaborator

kevinykuo commented Oct 15, 2016

Wanted to throw out possibly incorporating purrr functions... will have to think about how everything plays together.

@ElianoMarques

This comment has been minimized.

Show comment
Hide comment
@ElianoMarques

ElianoMarques Oct 15, 2016

Thanks @kykuo, that would be great.

Thanks @kykuo, that would be great.

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Oct 31, 2016

Member

Received mail with another request to add support for parallel execution through dplyrs do: https://cran.r-project.org/web/packages/dplyr/dplyr.pdf - "Do arbitrary operations on a tbl." section. Noted! Enabling this feature would open up a bunch of interesting scenarios.

Member

javierluraschi commented Oct 31, 2016

Received mail with another request to add support for parallel execution through dplyrs do: https://cran.r-project.org/web/packages/dplyr/dplyr.pdf - "Do arbitrary operations on a tbl." section. Noted! Enabling this feature would open up a bunch of interesting scenarios.

@hafen

This comment has been minimized.

Show comment
Hide comment
@hafen

hafen Nov 2, 2016

+1000 for arbitrary r-code execution through do!

hafen commented Nov 2, 2016

+1000 for arbitrary r-code execution through do!

@doorisajar

This comment has been minimized.

Show comment
Hide comment
@doorisajar

doorisajar Nov 8, 2016

group_by() %>% do() would enable a wide range of possibilities. The SparkR UDF implementations have constraints that prevent them from meeting my needs.

group_by() %>% do() would enable a wide range of possibilities. The SparkR UDF implementations have constraints that prevent them from meeting my needs.

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Nov 10, 2016

Member

@AjarKeen could you share some examples of current limitations with SparkR UDF to make sure this gets implemented without them?

Member

javierluraschi commented Nov 10, 2016

@AjarKeen could you share some examples of current limitations with SparkR UDF to make sure this gets implemented without them?

@doorisajar

This comment has been minimized.

Show comment
Hide comment
@doorisajar

doorisajar Nov 10, 2016

Sure! The SparkR UDF implementation comes with some pretty hard constraints.

  1. SparkR UDFs are only allowed to receive 2-3 arguments: a key, an R data frame, and (in some cases) a schema for the output. Your function definition can have other arguments if they have defaults, but you can't pass those arguments yourself at runtime.
  2. These UDFs are only allowed to return a data frame. This makes it very hard to implement e.g. custom model training methods where the thing you want to return for each group is a model object, not a data frame. (I tried flattening my objects into DFs to reconstruct them later, but that didn't work well at all.)
  3. The environment of the UDF has no access to the R environment from which the SparkR *apply function was called. The only way I've been able to get access to package functions inside my SparkR UDF is to :: them. (Some things from the originating R environment do get passed along: sourcing .R files that contain functions does make those functions available inside the UDF. But I've also tried defining variables in my originating R environment and asking for them in my UDF to get around items 1 and 4, and that didn't work.)
  4. Side effects are difficult to achieve. The R instance that runs the UDF is invoked by another user -- e.g. on AWS in YARN mode, system("whoami") inside my UDF identifies the user as 'yarn', which means no privileges to write to the filesystem outside of the Hadoop scratch directory, no privileges to copy objects to AWS S3, etc, etc.
  5. Some things seem to get lost in translation. I implemented ... in my model training function to pass in a formula which then gets parsed by the actual learner, which works in a non-Spark local setting. When I put this inside a SparkR UDF, however, the character vector doesn't get passed down to the learner correctly. I can't be more specific than this because I haven't been able to identify the cause of the problem. :(

The actual thing I want to do is group a Spark DataFrame by a key, train a custom R model (comprised of multiple custom submodels) for each group, and then either get the model object back for each group, or save it somewhere. I can do that now in a limited way, but once my model becomes at all complex, I start hitting the limitations I described above and it stops working.

doorisajar commented Nov 10, 2016

Sure! The SparkR UDF implementation comes with some pretty hard constraints.

  1. SparkR UDFs are only allowed to receive 2-3 arguments: a key, an R data frame, and (in some cases) a schema for the output. Your function definition can have other arguments if they have defaults, but you can't pass those arguments yourself at runtime.
  2. These UDFs are only allowed to return a data frame. This makes it very hard to implement e.g. custom model training methods where the thing you want to return for each group is a model object, not a data frame. (I tried flattening my objects into DFs to reconstruct them later, but that didn't work well at all.)
  3. The environment of the UDF has no access to the R environment from which the SparkR *apply function was called. The only way I've been able to get access to package functions inside my SparkR UDF is to :: them. (Some things from the originating R environment do get passed along: sourcing .R files that contain functions does make those functions available inside the UDF. But I've also tried defining variables in my originating R environment and asking for them in my UDF to get around items 1 and 4, and that didn't work.)
  4. Side effects are difficult to achieve. The R instance that runs the UDF is invoked by another user -- e.g. on AWS in YARN mode, system("whoami") inside my UDF identifies the user as 'yarn', which means no privileges to write to the filesystem outside of the Hadoop scratch directory, no privileges to copy objects to AWS S3, etc, etc.
  5. Some things seem to get lost in translation. I implemented ... in my model training function to pass in a formula which then gets parsed by the actual learner, which works in a non-Spark local setting. When I put this inside a SparkR UDF, however, the character vector doesn't get passed down to the learner correctly. I can't be more specific than this because I haven't been able to identify the cause of the problem. :(

The actual thing I want to do is group a Spark DataFrame by a key, train a custom R model (comprised of multiple custom submodels) for each group, and then either get the model object back for each group, or save it somewhere. I can do that now in a limited way, but once my model becomes at all complex, I start hitting the limitations I described above and it stops working.

@cosmincatalin

This comment has been minimized.

Show comment
Hide comment
@cosmincatalin

cosmincatalin Nov 11, 2016

Just an opinion: due to the distributed nature of Spark, I find it really challenging, but really awesome, but really challenging, but really awesome if the do function gets implemented at least on par with the SparkR *apply functions.

Issue number 1 above is a valid feature request, but can be worked around by modifying the data frame argument in advance to contain runtime information. Not pretty, but it would work.

Issue number 2 means a restructure of the way sparklyr works, which is based on data frames, and what one would expect.

Issue number 3 touches a little bit on how Spark works, depending on the type of objects you have in your parent scope, using them in the closure might be outright impossible because of serialization issues. Just being able to use some outer scope variables is pretty cool in my world.

Issue number 4 looks like is unrelated to SparkR.

cosmincatalin commented Nov 11, 2016

Just an opinion: due to the distributed nature of Spark, I find it really challenging, but really awesome, but really challenging, but really awesome if the do function gets implemented at least on par with the SparkR *apply functions.

Issue number 1 above is a valid feature request, but can be worked around by modifying the data frame argument in advance to contain runtime information. Not pretty, but it would work.

Issue number 2 means a restructure of the way sparklyr works, which is based on data frames, and what one would expect.

Issue number 3 touches a little bit on how Spark works, depending on the type of objects you have in your parent scope, using them in the closure might be outright impossible because of serialization issues. Just being able to use some outer scope variables is pretty cool in my world.

Issue number 4 looks like is unrelated to SparkR.

@hadley

This comment has been minimized.

Show comment
Hide comment
@hadley

hadley Nov 11, 2016

Member

FWIW, I no longer consider do() to be the way forward for arbitrary code execution. I think list-cols + mutate + a multidplyr approach (or parallel purrr) to be a more likely way forward.

Member

hadley commented Nov 11, 2016

FWIW, I no longer consider do() to be the way forward for arbitrary code execution. I think list-cols + mutate + a multidplyr approach (or parallel purrr) to be a more likely way forward.

@doorisajar

This comment has been minimized.

Show comment
Hide comment
@doorisajar

doorisajar Nov 11, 2016

I'm all for best practices. I'm also willing to contribute to this effort if there's anything useful I can do, because if we can get UDFs to work in a more flexible/robust and R-like way than SparkR does, it will be tremendously useful for my job. For example, I'd be happy to run distributed tests on AWS for new sparklyr features such as this one.

I'm all for best practices. I'm also willing to contribute to this effort if there's anything useful I can do, because if we can get UDFs to work in a more flexible/robust and R-like way than SparkR does, it will be tremendously useful for my job. For example, I'd be happy to run distributed tests on AWS for new sparklyr features such as this one.

@akzaidi

This comment has been minimized.

Show comment
Hide comment
@akzaidi

akzaidi Nov 11, 2016

Happy to help with Azure on HDI tests as well :)

Just my two cents: while the list-colsts and mutate() approach might be more pleasing from a tidy perspective, I think it'll generally be hard to have nested DataFrames in Spark, without at least braking the API and resorting to RDDs. I think a do() or purrr:map() approach would be a lot easier and would still maintain the proper functional aspects of developing UDFs.

akzaidi commented Nov 11, 2016

Happy to help with Azure on HDI tests as well :)

Just my two cents: while the list-colsts and mutate() approach might be more pleasing from a tidy perspective, I think it'll generally be hard to have nested DataFrames in Spark, without at least braking the API and resorting to RDDs. I think a do() or purrr:map() approach would be a lot easier and would still maintain the proper functional aspects of developing UDFs.

@doorisajar

This comment has been minimized.

Show comment
Hide comment
@doorisajar

doorisajar Nov 11, 2016

I think map is a very smart approach, since it would mirror the Scala and Python APIs.

I think map is a very smart approach, since it would mirror the Scala and Python APIs.

@hafen

This comment has been minimized.

Show comment
Hide comment
@hafen

hafen Nov 12, 2016

Here are some thoughts / ideas supporting @hadley's comments (I think!).

I think the approach that would be the easiest to implement and hit the most use cases would be if we could stick with Spark DataFrames. But in my opinion, any kind of UDF support automatically means support for list-columns / arbitrary R objects stored in columns of a Spark DataFrame. However, I think there is an easily-with-reach approach to this provided the following assumptions regarding nested data frames / list-columns:

  1. There is never any need to do certain SQL-like operations (e.g. via filter, arrange) on the nested data frames or list-colum objects (operations that only make sense for atomic data structures).
  2. Pretty much all list-column objects we create are probably going to be things that only R understands (e.g. model fits like lm objects, etc.). Otherwise why would we be creating them? Therefore we don't care if other languages can't handle these columns because even if they could they wouldn't know what to do with them.
  3. Related to (2), the use of list-columns / nested data frames will probably typically be for intermediate data structures that we manipulate in various ways with the ultimate goal to get back to a dataset or result that can be stored back into a cross-platform friendly DataFrame or read back in to R
  4. Given 1-3, users should be entirely free to specify any kind of R object for a list-column and not have to care about specifying its schema (requiring anything like this would be a nightmare and people wouldn't use it)

Given these assumptions, and the case to stick with Spark DataFrames, I would propose the following for any specified mutation (thinking of both nest->mutate and group_by->summarise):

  • If a mutation uses only data and operations that can be understood by SparkSQL, handle it as usual
  • If a mutation has a UDF (basically anything outside supported SparkSQL operations), pipe the input colums it requires into an R RPC for each group and process it
    • If the result of this mutation is a standard SparkSQL data type, treat it as such
    • If the result is an R nested data frame or list-column, store a serialized version (binary/string type? - hopefully can be done efficiently!) and accept that this column is now relegated to subsequent R UDF computation

There are details of the RPC such as how all the data is gathered together for each group that will have some challenges. The way we are used to thinking of "group_by + custom mutate" is that our mutation function expects to operate on all the data for a group. We can't guarantee that all the data for a group will be of manageable size with larger distributed data. And it's too much (and often impossible) to ask the user to specify a mutate function that can be iteratively updated with new data. So I wonder if some kind of analyze tool can be provided to help users understand if their grouping + UDF will be feasible or not. On the flipside there can also be the case where groups are tiny and the number of groups insanely huge, where you would want to take a multidplyr approach mentioned by Hadley and send chunks of groups into an RPC.

Given functionality for list-columns, UDF support, and group_by, I think the vast majority of distributed computations and data structures can be provided for, continuing under the dplyr / tidy paradigm that people are familiar with and without having to resort to RDDs, MapReduce-like stuff, etc., although I may be naively missing something.

I used to think sticking to data frames and SQL-like operations was severely limiting, but with UDF support and list-columns, I've come around (see here).

hafen commented Nov 12, 2016

Here are some thoughts / ideas supporting @hadley's comments (I think!).

I think the approach that would be the easiest to implement and hit the most use cases would be if we could stick with Spark DataFrames. But in my opinion, any kind of UDF support automatically means support for list-columns / arbitrary R objects stored in columns of a Spark DataFrame. However, I think there is an easily-with-reach approach to this provided the following assumptions regarding nested data frames / list-columns:

  1. There is never any need to do certain SQL-like operations (e.g. via filter, arrange) on the nested data frames or list-colum objects (operations that only make sense for atomic data structures).
  2. Pretty much all list-column objects we create are probably going to be things that only R understands (e.g. model fits like lm objects, etc.). Otherwise why would we be creating them? Therefore we don't care if other languages can't handle these columns because even if they could they wouldn't know what to do with them.
  3. Related to (2), the use of list-columns / nested data frames will probably typically be for intermediate data structures that we manipulate in various ways with the ultimate goal to get back to a dataset or result that can be stored back into a cross-platform friendly DataFrame or read back in to R
  4. Given 1-3, users should be entirely free to specify any kind of R object for a list-column and not have to care about specifying its schema (requiring anything like this would be a nightmare and people wouldn't use it)

Given these assumptions, and the case to stick with Spark DataFrames, I would propose the following for any specified mutation (thinking of both nest->mutate and group_by->summarise):

  • If a mutation uses only data and operations that can be understood by SparkSQL, handle it as usual
  • If a mutation has a UDF (basically anything outside supported SparkSQL operations), pipe the input colums it requires into an R RPC for each group and process it
    • If the result of this mutation is a standard SparkSQL data type, treat it as such
    • If the result is an R nested data frame or list-column, store a serialized version (binary/string type? - hopefully can be done efficiently!) and accept that this column is now relegated to subsequent R UDF computation

There are details of the RPC such as how all the data is gathered together for each group that will have some challenges. The way we are used to thinking of "group_by + custom mutate" is that our mutation function expects to operate on all the data for a group. We can't guarantee that all the data for a group will be of manageable size with larger distributed data. And it's too much (and often impossible) to ask the user to specify a mutate function that can be iteratively updated with new data. So I wonder if some kind of analyze tool can be provided to help users understand if their grouping + UDF will be feasible or not. On the flipside there can also be the case where groups are tiny and the number of groups insanely huge, where you would want to take a multidplyr approach mentioned by Hadley and send chunks of groups into an RPC.

Given functionality for list-columns, UDF support, and group_by, I think the vast majority of distributed computations and data structures can be provided for, continuing under the dplyr / tidy paradigm that people are familiar with and without having to resort to RDDs, MapReduce-like stuff, etc., although I may be naively missing something.

I used to think sticking to data frames and SQL-like operations was severely limiting, but with UDF support and list-columns, I've come around (see here).

@hadley

This comment has been minimized.

Show comment
Hide comment
@hadley

hadley Nov 12, 2016

Member

@hafen that aligns very closely with my thinking :)

Member

hadley commented Nov 12, 2016

@hafen that aligns very closely with my thinking :)

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Nov 14, 2016

Hi!

Sparklyr is a wonderful approach to Spark. I have to admit that I do not understand everything that was discussed here, but I do want to point out that even in R, not every interesting data item is a data frame. For example, I often have to do text processing, where I need to handle large collections of text documents, and I need to split those into tokens and things like this. I think that this is a strength of the scala and python interfaces for Spark, which allow to operate on text files in much the same ways as on csv files. That is what I can see from the map().reduce() commands in scala, like for example in "val parsed = noheader.map(line => parse(line))".

From the O'Reilly book, Advanced analytics in Spark, I understood that constructs like:

"val merged = nas.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } })"

are extremely useful and, most of all, efficient to use. Will we exclude such flexibility in R with the discussed design decisions, if Sparklyr becomes a standard? I do not understand enough from the details, to assess it, honestly.

I understood that sparklyr wants to implement the concept of tidy data frames; however, on the other hand you are also aiming to become THE spark interface for R, that is my impression. I wonder if this can work if you neglect other data structures than tabular ones.

I might be wrong with that impression. Please correct me, if I am. I do not want to disturb the discussions, just give a perspective from a newbie on the topic.

Best regards, Sebastian

codingbutstillalive commented Nov 14, 2016

Hi!

Sparklyr is a wonderful approach to Spark. I have to admit that I do not understand everything that was discussed here, but I do want to point out that even in R, not every interesting data item is a data frame. For example, I often have to do text processing, where I need to handle large collections of text documents, and I need to split those into tokens and things like this. I think that this is a strength of the scala and python interfaces for Spark, which allow to operate on text files in much the same ways as on csv files. That is what I can see from the map().reduce() commands in scala, like for example in "val parsed = noheader.map(line => parse(line))".

From the O'Reilly book, Advanced analytics in Spark, I understood that constructs like:

"val merged = nas.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } })"

are extremely useful and, most of all, efficient to use. Will we exclude such flexibility in R with the discussed design decisions, if Sparklyr becomes a standard? I do not understand enough from the details, to assess it, honestly.

I understood that sparklyr wants to implement the concept of tidy data frames; however, on the other hand you are also aiming to become THE spark interface for R, that is my impression. I wonder if this can work if you neglect other data structures than tabular ones.

I might be wrong with that impression. Please correct me, if I am. I do not want to disturb the discussions, just give a perspective from a newbie on the topic.

Best regards, Sebastian

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@hadley

This comment has been minimized.

Show comment
Hide comment
@hadley

hadley Nov 15, 2016

Member

I think we'd need both interfaces for tidy data frames and lists/vectors. (But see http://tidytextmining.com for handling text data with data frames)

Member

hadley commented Nov 15, 2016

I think we'd need both interfaces for tidy data frames and lists/vectors. (But see http://tidytextmining.com for handling text data with data frames)

@ElianoMarques

This comment has been minimized.

Show comment
Hide comment
@ElianoMarques

ElianoMarques Jan 15, 2017

@javierluraschi any expectation on when we could have this in sparklyr? We are about to start a piece of work and might have to adopt sparkR given this feature its there. it would be a shame given the investment made in getting up to speed with sparklyr across the team?

@javierluraschi any expectation on when we could have this in sparklyr? We are about to start a piece of work and might have to adopt sparkR given this feature its there. it would be a shame given the investment made in getting up to speed with sparklyr across the team?

@zero323

This comment has been minimized.

Show comment
Hide comment
@zero323

zero323 Jan 22, 2017

@cosmincatalin

Issue number 4 looks like is unrelated to SparkR.

To some extent it is highly related to Spark execution model. Any kind of side effects can be extremely hard to manage in complex pipelines. What happens when you enable speculative executing, how to handle task failures, how to deal with normal task re-execution? If you wan to allow for side effects there is a lot of low level thinking required to do it right.

That being said, AFAIK local-user on Yarn is configurable.

@AjarKeen

SparkR UDFs are only allowed to receive 2-3 arguments: a key, an R data frame, and (in some cases) a schema for the output. Your function definition can have other arguments if they have defaults, but you can't pass those arguments yourself at runtime.

You can easily use currying / partial application for that so it is hardly a limitation.

schema <-   structType(
    structField("species", "string"),
    structField("max_s_width", "double"))

f <-  function(i)  function(k, x) {
  dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width * i))
}

as.DataFrame(iris) %>% gapply("Species", f(3), schema) %>% head

Some things seem to get lost in translation. I implemented ... in my model training function to pass in a formula which then gets parsed by the actual learner, which works in a non-Spark local setting. When I put this inside a SparkR UDF, however, the character vector doesn't get passed down to the learner correctly

This should probably deserves a JIRA ticket but in general determining a minimal set of objects which have to be serialized as a part of the closure can be tricky. The more opaque is your code, the harder it gets.

zero323 commented Jan 22, 2017

@cosmincatalin

Issue number 4 looks like is unrelated to SparkR.

To some extent it is highly related to Spark execution model. Any kind of side effects can be extremely hard to manage in complex pipelines. What happens when you enable speculative executing, how to handle task failures, how to deal with normal task re-execution? If you wan to allow for side effects there is a lot of low level thinking required to do it right.

That being said, AFAIK local-user on Yarn is configurable.

@AjarKeen

SparkR UDFs are only allowed to receive 2-3 arguments: a key, an R data frame, and (in some cases) a schema for the output. Your function definition can have other arguments if they have defaults, but you can't pass those arguments yourself at runtime.

You can easily use currying / partial application for that so it is hardly a limitation.

schema <-   structType(
    structField("species", "string"),
    structField("max_s_width", "double"))

f <-  function(i)  function(k, x) {
  dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width * i))
}

as.DataFrame(iris) %>% gapply("Species", f(3), schema) %>% head

Some things seem to get lost in translation. I implemented ... in my model training function to pass in a formula which then gets parsed by the actual learner, which works in a non-Spark local setting. When I put this inside a SparkR UDF, however, the character vector doesn't get passed down to the learner correctly

This should probably deserves a JIRA ticket but in general determining a minimal set of objects which have to be serialized as a part of the closure can be tricky. The more opaque is your code, the harder it gets.

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Feb 1, 2017

Member

@ElianoMarques no updates on this yet, but I do understand the urgency of this request. Thanks for the kind reminder to get this going, I will update this issue when we get this started.

Member

javierluraschi commented Feb 1, 2017

@ElianoMarques no updates on this yet, but I do understand the urgency of this request. Thanks for the kind reminder to get this going, I will update this issue when we get this started.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Feb 2, 2017

That's great! I am also still hoping for it. Would be a really great addition.

That's great! I am also still hoping for it. Would be a really great addition.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Mar 1, 2017

Want to keep this alive. It's still urgently needed.

Want to keep this alive. It's still urgently needed.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Mar 13, 2017

I wonder. Are you guys working on this, or did you give it up?

I wonder. Are you guys working on this, or did you give it up?

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Mar 14, 2017

Member

@codingbutstillalive we will keep it alive, I have a prototype repo to investigate this, shouldn't be that hard to get basic parallelization working: https://github.com/javierluraschi/sparkworker - My plan it to get this working there and then merge back to sparklyr, not usable at this point in time, ping me back in a couple weeks.

Member

javierluraschi commented Mar 14, 2017

@codingbutstillalive we will keep it alive, I have a prototype repo to investigate this, shouldn't be that hard to get basic parallelization working: https://github.com/javierluraschi/sparkworker - My plan it to get this working there and then merge back to sparklyr, not usable at this point in time, ping me back in a couple weeks.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Mar 14, 2017

That's wonderful news. I appreciate that. Good luck and success!

That's wonderful news. I appreciate that. Good luck and success!

@ElianoMarques

This comment has been minimized.

Show comment
Hide comment
@ElianoMarques

ElianoMarques Mar 16, 2017

Awesome news @javierluraschi . Thanks for that.

Awesome news @javierluraschi . Thanks for that.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Mar 29, 2017

I hope you are still motivated ;)

I hope you are still motivated ;)

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Apr 6, 2017

Seems to be more difficult than expected, I suppose. Right?

Seems to be more difficult than expected, I suppose. Right?

@kevinushey

This comment has been minimized.

Show comment
Hide comment
@kevinushey

kevinushey Apr 10, 2017

Contributor

We're still working on this; I think we'll be able to accomplish something in the near future but given that we're balancing a large number of responsibilities it's possible this will still take a little longer than expected.

Contributor

kevinushey commented Apr 10, 2017

We're still working on this; I think we'll be able to accomplish something in the near future but given that we're balancing a large number of responsibilities it's possible this will still take a little longer than expected.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Apr 13, 2017

I hope for the best.

I hope for the best.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Apr 20, 2017

Just to let you know. Once more, I was asked to switch from SparkR to sparklyr for our data science courses, but needed to refuse it because of those insufficiencies. It's a pity.

Just to let you know. Once more, I was asked to switch from SparkR to sparklyr for our data science courses, but needed to refuse it because of those insufficiencies. It's a pity.

@chezou

This comment has been minimized.

Show comment
Hide comment
@chezou

chezou Apr 20, 2017

Contributor

Hey @codingbutstillalive don't be so noisy. sparklyr is an OSS. If you need some functions, why not contribute to them?

For example, there is an implementation of the concept.
https://github.com/javierluraschi/sparkworker

It's a plugin of sparklyr, so why don't you try it and report any problem?

I'm not a commiter of sparklyr, but it would be nice for us to contribute the project 😉

Contributor

chezou commented Apr 20, 2017

Hey @codingbutstillalive don't be so noisy. sparklyr is an OSS. If you need some functions, why not contribute to them?

For example, there is an implementation of the concept.
https://github.com/javierluraschi/sparkworker

It's a plugin of sparklyr, so why don't you try it and report any problem?

I'm not a commiter of sparklyr, but it would be nice for us to contribute the project 😉

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Apr 21, 2017

@chezou I consider the requested functionality as mandatory (which should be part of the core functions). Plugins should be reserved for convenience or special purpose functions imho.

codingbutstillalive commented Apr 21, 2017

@chezou I consider the requested functionality as mandatory (which should be part of the core functions). Plugins should be reserved for convenience or special purpose functions imho.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Apr 21, 2017

@chezou By the way, it's about the sparkworker plugin that we were talking about. It was supposed to address the problem of missing apply functions. See comments from 14th of March above.

@chezou By the way, it's about the sparkworker plugin that we were talking about. It was supposed to address the problem of missing apply functions. See comments from 14th of March above.

@francoisjehl

This comment has been minimized.

Show comment
Hide comment
@francoisjehl

francoisjehl May 17, 2017

Hi @javierluraschi , I've been playing a bit with your extension. I was wondering if you thought about using a managed JVM based engine such as Renjin for R code execution, instead of relying on forking RScript. In my company we run Spark mostly within Yarn, and try to keep dependencies on the physical workers as low as possible (in other words: we'll never install R on our 1000+ Hadoop data node :) )

Hi @javierluraschi , I've been playing a bit with your extension. I was wondering if you thought about using a managed JVM based engine such as Renjin for R code execution, instead of relying on forking RScript. In my company we run Spark mostly within Yarn, and try to keep dependencies on the physical workers as low as possible (in other words: we'll never install R on our 1000+ Hadoop data node :) )

@chezou

This comment has been minimized.

Show comment
Hide comment
@chezou

chezou May 17, 2017

Contributor

I didn't try, but I guess we can use this kind of trick: https://www.continuum.io/blog/developer-blog/conda-spark

Basic idea is distributing R binary with conda environment and set spark.r.command.
https://github.com/javierluraschi/sparkworker/blob/master/java/process.scala#L28

I guess this idea may help your situation.

Contributor

chezou commented May 17, 2017

I didn't try, but I guess we can use this kind of trick: https://www.continuum.io/blog/developer-blog/conda-spark

Basic idea is distributing R binary with conda environment and set spark.r.command.
https://github.com/javierluraschi/sparkworker/blob/master/java/process.scala#L28

I guess this idea may help your situation.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive May 19, 2017

I see from javiers recent commits to sparkworker and the activities here that the topic has become revived. That is fantastic, guys, thanks for your efforts.

I see from javiers recent commits to sparkworker and the activities here that the topic has become revived. That is fantastic, guys, thanks for your efforts.

@hafen

This comment has been minimized.

Show comment
Hide comment
@hafen

hafen May 19, 2017

I don't recall if I have already mentioned this or not, and maybe you are already doing something similar, but one approach we created and highly advocate with the use of RHIPE is to create an archive of your local R environment and all its dependencies and ship it to the nodes using Hadoop's distributed cache (I would assume Spark's broadcast variables could be used similarly).

With this approach you don't need to worry about having R and the correct packages installed on all the nodes. Further, you only need to create and broadcast the archive whenever you have significantly updated your R environment. Otherwise, it's cached and available for subsequent jobs.

Here's the source for this functionality: https://github.com/delta-rho/RHIPE/blob/master/src/main/R/bashRhipeArchive.R. The downsides are that it requires bash and assumes the same architecture on all nodes.

hafen commented May 19, 2017

I don't recall if I have already mentioned this or not, and maybe you are already doing something similar, but one approach we created and highly advocate with the use of RHIPE is to create an archive of your local R environment and all its dependencies and ship it to the nodes using Hadoop's distributed cache (I would assume Spark's broadcast variables could be used similarly).

With this approach you don't need to worry about having R and the correct packages installed on all the nodes. Further, you only need to create and broadcast the archive whenever you have significantly updated your R environment. Otherwise, it's cached and available for subsequent jobs.

Here's the source for this functionality: https://github.com/delta-rho/RHIPE/blob/master/src/main/R/bashRhipeArchive.R. The downsides are that it requires bash and assumes the same architecture on all nodes.

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi May 21, 2017

Member

@francoisjehl Thanks for the comment. What would it take to convince you that it's worth installing R in each machine? Python is probably installed, isn't it? Renjin looks great as an engine, but the challenge would be to bring all the packages to run into it; I'm sure you are aware that popular CRAN packages use native code, etc. so those packages would be hard to port to Renjin. That said, I wouldn't be opposed supporting Renjin at some point for those clusters that are locked from installing additional binaries, I'll keep this in mind.

@hafen Hi, I don't think you've mentioned it. But I agree, we might end up having to do something like RHIPE: https://github.com/delta-rho/RHIPE/blob/master/src/main/R/bashRhipeArchive.R#L49-L55 - So far, I'm assuming each node has the right packages, once that works, we can tackle this case which I'm sure will be pretty common.

Member

javierluraschi commented May 21, 2017

@francoisjehl Thanks for the comment. What would it take to convince you that it's worth installing R in each machine? Python is probably installed, isn't it? Renjin looks great as an engine, but the challenge would be to bring all the packages to run into it; I'm sure you are aware that popular CRAN packages use native code, etc. so those packages would be hard to port to Renjin. That said, I wouldn't be opposed supporting Renjin at some point for those clusters that are locked from installing additional binaries, I'll keep this in mind.

@hafen Hi, I don't think you've mentioned it. But I agree, we might end up having to do something like RHIPE: https://github.com/delta-rho/RHIPE/blob/master/src/main/R/bashRhipeArchive.R#L49-L55 - So far, I'm assuming each node has the right packages, once that works, we can tackle this case which I'm sure will be pretty common.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive May 21, 2017

From my practical experience, it is possible (but difficult) to convince the cluster admin to install R on each cluster node, but definitely not all the packages that are needed. This could become a bottleneck then.

I think Javier has a good point. The RHIPE approach is a very good way to go once the general operations are running. I didn't imagine the whole thing to have so much dependencies. Now I understand how difficult it is. Great job, guys.

From my practical experience, it is possible (but difficult) to convince the cluster admin to install R on each cluster node, but definitely not all the packages that are needed. This could become a bottleneck then.

I think Javier has a good point. The RHIPE approach is a very good way to go once the general operations are running. I didn't imagine the whole thing to have so much dependencies. Now I understand how difficult it is. Great job, guys.

@hafen

This comment has been minimized.

Show comment
Hide comment
@hafen

hafen May 23, 2017

Sounds like a good approach. Yes the main driver for the shipping to the nodes is not necessarily the base R installation but the constant updating of packages that no cluster admin will ever be able to keep up with (not to mention different users with different environments). It turns out to be much hairier than just copying packages because of a lot of possible shared library dependencies, etc. (hence the rest of the fun stuff in the code I referenced).

hafen commented May 23, 2017

Sounds like a good approach. Yes the main driver for the shipping to the nodes is not necessarily the base R installation but the constant updating of packages that no cluster admin will ever be able to keep up with (not to mention different users with different environments). It turns out to be much hairier than just copying packages because of a lot of possible shared library dependencies, etc. (hence the rest of the fun stuff in the code I referenced).

@javierluraschi

This comment has been minimized.

Show comment
Hide comment
@javierluraschi

javierluraschi Jun 5, 2017

Member

Here is the initial Implement R Workers PR to support a distributed apply function.

Notice that, currently, this requires your cluster to have the right packages installed or manually install them under the closure. I've opened #729 to track and implement something similar as to what @hafen described.

I think it would be healthy to close this issue and open new suggestions, say support for purrr::map, dplyr integration, etc. in separate issues.

Thanks for all the feedback and sticking along while this got implemented.

Member

javierluraschi commented Jun 5, 2017

Here is the initial Implement R Workers PR to support a distributed apply function.

Notice that, currently, this requires your cluster to have the right packages installed or manually install them under the closure. I've opened #729 to track and implement something similar as to what @hafen described.

I think it would be healthy to close this issue and open new suggestions, say support for purrr::map, dplyr integration, etc. in separate issues.

Thanks for all the feedback and sticking along while this got implemented.

@codingbutstillalive

This comment has been minimized.

Show comment
Hide comment
@codingbutstillalive

codingbutstillalive Jun 5, 2017

Congratulations!!! :-)

Congratulations!!! :-)

@hafen

This comment has been minimized.

Show comment
Hide comment
@hafen

hafen Jun 9, 2017

Excellent! I'm excited to try this out.

hafen commented Jun 9, 2017

Excellent! I'm excited to try this out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment