Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Serialization #941

Closed
javierluraschi opened this issue Aug 16, 2017 · 21 comments
Closed

Improve Serialization #941

javierluraschi opened this issue Aug 16, 2017 · 21 comments

Comments

@javierluraschi
Copy link
Collaborator

javierluraschi commented Aug 16, 2017

There are two areas worth considering here:

  1. Improve collect()serialization. This is already implemented as columnar-based collect; however, I believe only for numeric and logical data types.

  2. Implement copy_to() through columnar invoke().

Assuming (2) is on a par with current implementations of copy_to(), we can stop here; otherwise, we would have to explore additional serialization improvements.

Related issues:

@JakeRuss
Copy link

JakeRuss commented Aug 21, 2017

@javierluraschi I know you're hard at work supporting these features, but do you have any estimate for how long before sparklyr will support date (and datetime) column types?

I'm updating a production database via sparklyr now, but the code is fragile because it includes workarounds for managing timestamps, and the rest of the dev team has me weighing the merits of a switch to SparkR. If you think the answer is "soon" then I may be able to stall the refactor and continue using sparklyr (my preference).

@kevinykuo
Copy link
Collaborator

@JakeRuss We'll be taking a look at this next week to see if we can do something about dates before entirely revamping copy_to() and report back.

@JakeRuss
Copy link

JakeRuss commented Sep 1, 2017

@kevinykuo That update sounds great from my end. As soon as it's ready, I'll be in line to test it out. Many thanks!

@javierluraschi
Copy link
Collaborator Author

@JakeRuss thanks for the feedback. There are two points to consider here:

  1. The current implementation of copy_to does contain a few number of serialization issues; however, copy_to was never intended to copy "big data" into the cluster, it is more likely that the data will be already available in the Spark cluster.
  2. There are a few other serialization issues; however, most of the serialization code in sparklyr was actually forked from sparkR. Therefore, the relevant part of this work is to avoid using the original sparkR serializer and use a more robust approach.

If you end up switching to sparkR, I would love to hear feedback from you on which things happen to work in sparkR but not in sparklyr.

We don't have a timeline yet to implement this "Improve Serialization" feature. However, if there is a particular issue that is blocking you, please open a github issue and I'll try to address it very soon without depending on the completion of this work.

@JakeRuss
Copy link

I appreciate your thoughts, @javierluraschi. My desired workflow may be irrelevant for the copy_to discussion, but I'm not certain, so let me explain more and if this discussion should be moved, I will open another issue.

I mainly use spark_read_jdbc to pull data into Spark from our production database, make a calculation, and then record the results back to another database table. Does this copy_to serialization work affect serialization in spark_read_jdbc? The only remaining pain point from my ideal workflow to what I am using right now is that I can't currently work with timestamp fields. When I spark_read_jdbc timestamp fields are brought into Spark as character and then I convert them to Unix epochs (integers). From there I can manage the calculations and various date manipulations using epoch math before I send the results back to our database. The spark_write_jdbc step also requires a workaround for timestamps because I have to write the epoch first and then convert that field to a timestamp on the database side.

I did experiment with a sparkR workflow but there were other issues that also required workarounds using that process too. And the code wasn't as expressive as the dplyr pipes, so I am content to stick with sparklyr and wait for the serialization improvements before I finalize these processes.

Please let me know if I should open a separate serialization issue for spark_read_jdbc and spark_write_jdbc.

@javierluraschi
Copy link
Collaborator Author

@JakeRuss not at all, copy_to would not affect spark_read_jdbc nor spark_write_jdbc, most of the blocking issues this bug is tracking fall into the copy_to category which does not affect you.

Scala Date should map to R Date and Scala Time to R POSIXct while collecting; however, there might be additional types to consider for JDBC or other formats that should fall into a Data conversion.

@JakeRuss, what would really help us here would be to get a copy of the schema (not data necessary) that you are using to make sure data round trips correctly when we get to this work. Something like:

CREATE TABLE COMPANY(
   ID INT PRIMARY KEY     NOT NULL,
   NAME           TEXT    NOT NULL,
   AGE            INT     NOT NULL,
   ADDRESS        CHAR(50),
   SALARY         REAL
);

and I'll make sure this gets validated when the improvements get implemented. Feel free to comment here or open a new issue with details that I can close and link to this one.

Also worth mentioning that I do want to work on this issue as soon as possible. I've been cleaning up old github issues and while no specific serialization issue seems critical, it's now obvious that there are many little ones that make this more urgent. I'll keep you posted!

@JakeRuss
Copy link

Moved to a new issue, thank you @javierluraschi!

@russellpierce
Copy link
Contributor

@javierluraschi You mentioned that copy_to was never intended to copy "big data" into the cluster. What is the prefered pathway for large datasets? Get it up as a .csv or Parquet or other format directly into file storage and read it off file storage?

@javierluraschi
Copy link
Collaborator Author

javierluraschi commented Sep 22, 2017

@russellpierce yes, for instance with hadoop cp file dest if you are using Hadoop, etc. See https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#cp

I would like to improve copy_to, but in general, being Spark a big data / big compute solution, it is often the case that the data will already be available in the cluster. That said, more people are using sparklyr + Spark also as a general compute engine, so uploading the data and then doing computations over larger nodes is something we can improve at some point.

Would you mind explaining why the data is not available in your case already in Spark? And how you are using sparklyr? Would love to hear what the use case for copy_to here to see how to help best.

@russellpierce
Copy link
Contributor

We're still feeling out usage patterns. At the moment we have a bunch of pre-existing code that pulls data locally to R and processes it. Previously at the end of processing we'd just write the results as a CSV. Now we'd like to write the results as Parquet instead. The easiest path I've found to write Parquet from R is to bounce it through a Spark DataFrame. Thus, using copy_to or writing the CSV to the cluster and then loading the CSV (which seemed unnecessarily awkward).

@javierluraschi
Copy link
Collaborator Author

#1041 makes data collection improvements over dates and timestamps.

@javierluraschi
Copy link
Collaborator Author

#1045 makes data collection improvements over fields with NAs.

@JakeRuss
Copy link

JakeRuss commented Oct 2, 2017

It looks like spark_read_jdbc() is reading in date types correctly now. Will comment once I'm able to test spark_write_jdbc().

Is it expected behavior for TINYINT (a field of 0s and 1s) to be read in as logical?

@doorisajar
Copy link

Is this also the place to discuss things like expanding spark_apply functions to accept multiple arguments, and then shipping those artifacts (e.g. small R model objects) to the workers along with the package dependencies?

@asantucci
Copy link

@javierluraschi The issue of date-columns being truncated to year when using copy_to is preventing me from continuing to adopt sparklyr further. I have data stored on a PostgreSQL database. I would like to import into a sparklyr pipeline.

I tried using spark_read_jdbc but was greeted with an error of the form java.sql.SQLException: No suitable driver and there is no documentation specific to R that I can find that addresses this. (If you can point me to a resource addressing this, that would be appreciated)

In an effort to prototype, I attempted to pull a subset of the data into memory in R and then use copy_to to upload the data to a Spark cluster (which, at this point happens to be local as well, since I am just determining limitations of existing packages). Of course, I get date truncated to year and this is a showstopper.

To give some context, I actually have already built a pipeline that plugs into PostgreSQL using dbplyr and pushes all computations out of memory, the intermediate goal being to generate features which will then be used for modeling. Given that PostgreSQL limits each row to a single page of memory (8060 bytes), I cannot create very "wide" feature sets, and hence have been motivated to switch over to a sparklyr pipeline. The idea from the beginning has been to write all code using basic dplyr transformations such that we can experiment with Spark and PostgreSQL backends.

@russellpierce
Copy link
Contributor

russellpierce commented Nov 9, 2017

@asantucci I've been having some success with writing a CSV out to distributed storage, then loading the dataset into Spark via a CSV read (and specifying data types where it helps). That bypasses copy_to and any serialization issues we have and here it was mentioned that copy_to() isn't intended for large data sets. A draft of that, mostly focused on saving data off to parquet is here: https://github.com/zapier/parquetr - but it doesn't work entirely yet because it depends on some internal packages for communicating with S3. But, I think that those can be self-documenting / fixable for someone who is particularly interested. (Gladly accepting PRs if that issue can be resolved - or for anything else - I haven't been able to dedicate time to it yet).

@JakeRuss
Copy link

JakeRuss commented Nov 9, 2017

@asantucci ,

I use sparklyr to pull data out of MySQL and directly into the spark context, via spark_read_jdbc(). To do this I add a MySQL JDBC driver to the class path via spark_config, like so,

config <- spark_config()
config$`sparklyr.shell.driver-class-path` <- "~/my/local/path/mysql-connector-java-5.1.43/mysql-connector-java-5.1.43-bin.jar"

Maybe there is a driver you could add for PostgreSQL instead which will eliminate the error message. Here?

Also, it looks like Javier has resolved the issues with dates in the development version of sparklyr. I am testing this out on my pipeline right now, as it is critical for me as well.

@javierluraschi are the serialization improvements scheduled to be part of the 0.7 release to CRAN?

@asantucci
Copy link

@JakeRuss I'm trying your approach as follows:

config <- spark_config()
config$`sparklyr.shell.driver-class-path` <- "valid_path/postgresql-42.14"

sc <- spark_connect(master = 'local', config = config)

spark_read_jdbc(sc, name = tbl_name, options = list(url = 'jdbc:postgresql://...', user = username, password = pword, dbtable = tbl_name)

And I still yield the same error: java.sql.SQLException: No suitable driver. Any further thoughts?

@JakeRuss
Copy link

JakeRuss commented Nov 9, 2017

Let me post more of my code and see if you spot anything different about your set up...

config <- spark_config()
config$`sparklyr.shell.driver-class-path` <- "valid_path/mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
config$`spark.executor.heartbeatInterval` <- "120000ms"
config$`spark.network.timeout`            <- "240s"
config$`sparklyr.shell.driver-memory`     <- "6G"
config$`spark.executor.memory`            <- "4G" 

sc <- spark_connect(master         = "local",
                     version        = "1.6.0",
                     hadoop_version = 2.6,
                     config         = config)

jdbc_read_url <- paste0("jdbc:mysql://", db_host_ip,":3306/", db_name)

db_table <- sc %>%
  spark_read_jdbc(sc      = .,
                  name    = "spark_tbl_name",  
                  options = list(url      = jdbc_read_url,
                                 user     = db_username,
                                 password = db_password,
                                 dbtable  = "database_tbl_name"),
                  memory  = FALSE)

My jdbc url is formatted like jdbc:mysql://host_ip:3306/database_name. The only difference I see is that I have memory = FALSE, but that shouldn't matter here.

My cursory Google searching for that error message indicates that either the jdbc URL is incorrect, or the driver still isn't found on the class path. Maybe check your valid_path again?

@asantucci
Copy link

Thank you for the follow up. It looks like my problem was in fact not appending the jdbc url with both a port and database name. Now it appears that I am able to connect to the DBI!

@javierluraschi
Copy link
Collaborator Author

We are planning to improve serialization by using Apache Arrow, this work should address many conversion issues by providing a common serialization format between R and Scale that we don't have to maintain in sparklyr. Arrow work is being tracked with #1457.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests

6 participants