Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up apache-spark benchmarks #248

Merged
merged 14 commits into from
May 13, 2021
Merged

Conversation

lbulej
Copy link
Member

@lbulej lbulej commented Apr 30, 2021

This was originally intended as a simple follow-up to #246 (making actual use of the scratch directory), but I ended up reorganizing the code around a common pattern so that the benchmarks all look alike. The following benchmarks are being updated:

  • als
  • chi-square
  • dec-tree
  • gauss-mix
  • log-regression
  • movie-lens
  • naive-bayes
  • page-rank

Common changes

Code organization

  • Avoid relying on method side-effects in the code responsible for preparing the input datasets.
    • The benchmark setup code now follows a common pattern in which prepareInput(), loadData() and ensureCached() methods return values that need to be passed around.
    • This makes the data flow more explicit and easier to follow, in addition to clearly separating preparation and loading.
  • Use more (very) prominent names for the fields representing computation inputs and outputs.
  • Get rid of unnecessary state held between benchmark method calls.
  • Dump the result of the last repetition in benchmark teardown, so that we can find data to validate against.
    • Each benchmark (except page-rank, which already has validation) contains a dumpResult() method.
  • Stop using all caps for constant-like values (follows Scala style).

Benchmark parameters

  • Unify Spark parallelism level parameters
    • Introduce the spark_executor_count parameter, which determines the number of Spark executor instances. It currently defaults to 4, but making the parameter explicit will (eventually) allow setting it from outside.
    • Rename the thread_count to spark_executor_thread_count (to make it clear that it represents the number of threads per Spark executor) and make it available in all Spark benchmarks.
  • Add summary text to benchmark parameters which influence the workload size, e.g., copy_count, input_line_count.

Spark context handling

  • Simplify the calls to the setUpSparkContext() method
    • Pass the BenchmarkContext instance to the method so that it can query the spark_executor_count and the spark_executor_thread_count parameters and the benchmark's scratch directory on its own.
    • Query the benchmark's @Name annotation to find out the benchmark's name, instead of having to duplicate it in code.
  • Keep sparkContext in the SparkUtil trait and have the benchmarks inherit the field.
  • Use SparkSession and avoid creating SparkConf and SparkContext directly.

Spark RDD caching

  • The ensureCached() method now sets the RDD or Dataset storage level to MEMORY_ONLY using the persist() method, because the cache() method switched to using MEMORY_AND_DISK in newer Spark versions.

Spark logging

  • Spark loggers are now configured in all Spark benchmarks. For now, they are set to report at the WARN level.
  • Most benchmarks will now output less Spark-related information during startup because they used to report at the INFO level and switching to ERROR level on SparkContext.
  • The movie-lens benchmark may report a bit more because it used to report at the ERROR level only.

Scratch file handling

  • Use the benchmark-specific scratch directory (instead of the hard-coded target/<bench-name> directory) for preparing the input datasets as well as for dumping the computation output after the last iteration.
  • Use JDK-based file IO instead of relying on Apache Commons (FileUtils and IO) for such simple things.
    • JDK 9 and 11 provide an even better selection of file IO methods, but we have to (and can) make do with what's in JDK 8.

Benchmark-specific changes

In addition to the common changes above, these are changes made in the individual benchmarks.

als

  • Rename rating_count to user_count because that's what it actually is.
  • Add product_count to the als benchmark, because it also influenced workload size.
  • Add als_rank, als_lambda, and als_iterations parameters of the ALS algorithm
  • Generate input using a random generator with a fixed seed.
  • Configure ALS to only use memory for intermediate and final RDDs, and sets a fixed random seed.
  • Read CSV using the facilities from SparkUtil

chi-square

  • Rename the number_count parameter to point_count, because that's what it actually is.
  • Add the component_count parameter (which determines the number of components in a point) to expose the other factor influencing the workload size.

dec-tree

  • Dump hashable result so that it could be used for validation.

gauss-mix

  • Rename the number_count parameter to point_count, because that's what it actually is.
  • Add the component_count parameter (which determines the number of components in a point) to expose the other factor influencing the workload size.
  • Add the distribution_count parameter, which determines the desired number of clusters.
  • Use a fixed random seed for the GaussianMixture estimator.
    • This may make benchmark runs more repeatable.
  • Dump a result that could be used for validation (after accounting for potential numerical instability).

log-regression

  • Add the max_iterations parameter which controls the maximum number of the logistic regression algorithm iterations.
    • This has the same meaning as max_iterations in the gauss-mix benchmark.
  • Dump slightly more information about the result.
  • Add a very rudimentary validation.
    • We should at least check the intercept, but we need a configuration-specific expected value for that.

movie-lens

  • Configure ALS to only use memory for intermediate and final RDDs, and sets a fixed random seed.
  • Removes the individual ALS algorithm parameters (and their cartesian-product interpretation) and instead adds a CSV-like table of ALS configurations together with expected RMSE on the validation set.
  • The choice of ALS parameters is intended to produce different RMSE values to make it easier to validate the results in presence of numerical instability.

naive-bayes

  • Add the copy_count parameter to allow controlling the workload size, replacing the hard-coded constant.
    • The (default) value of 8000 results in writing an 800MB input file on benchmark setup.
  • Add test configuration with copy_count set to 5.
  • Add jmh configuration (so far without parameter tweaks) so that it at least exists.

page-rank

  • Add the max_iterations parameter which controls the number of iterations of the page rank algorithm.
    • It is not exactly the same as the max_iterations parameter in the gauss-mix and log-regression benchmarks, but it does control the number of algorithm iterations.

Things that came up (Spark-related)

  • Do we want to enable checkpointing for all benchmarks?
    • It is currently only used by movie-lens, but why there and not in the others? Maybe remove it from there?
    • Update: It appears that without checkpoint directory the movie-lens benchmark (which is not much different from als) kept failing the JMH runs in CI. Super strange.
  • What logging level do we want to use?
    • A few (I think two) benchmarks configure the Spark root logger level to ERROR and the Eclipse jetty server root logger level to OFF
    • All benchmarks configure the Spark context log level to ERROR.
    • I think we should go for WARN for a while and determine (and document) what kinds of warnings we are fine with.
  • The choice of machine learning APIs.
    • Some of our benchmarks still use the RDD-based MLlib API, which is in maintenance mode since Spark 2.0, with the DataFrame-based ML API being considered to be the recommended one.
    • Do we want to port the benchmarks, if possible, or do we keep them for as long as the API is supported?
    • We should probably discuss/track this in a separate issue.

@lbulej lbulej marked this pull request as draft April 30, 2021 15:20
@lbulej lbulej changed the title WiP: Clean up apache-spark benchmarks Clean up apache-spark benchmarks Apr 30, 2021
Copy link
Collaborator

@farquet farquet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@lbulej lbulej marked this pull request as ready for review May 10, 2021 01:20
@lbulej
Copy link
Member Author

lbulej commented May 10, 2021

The JMH runs of the movie-lens benchmark in the CI have suddenly become brittle. I can't reproduce it locally, but it seems that it is related to the changes that force the ALS algorithm to compute things in memory only (both intermediate and final RDDs), which applies to the als and movie-lens benchmarks. The movie-lens appears to be the one affected because it makes 8 ALS runs in one repetition, unlike als.

The restriction on keeping RDDs in memory is certainly up for discussion and can be easily reverted if we don't like it, I just thought that if we can, we should try to avoid disk IO in the benchmarks and instead size heap properly.

As for the brittleness of the JMH in the CI runs -- we run the JMH wrapper with Xms2500M, which will fork another VM with Xms2500M and I guess the CI VM may run out of memory because both JVMs will have a heap of 2.5G (5G in total). We should only pass the heap-sizing arguments to the forked VM and I'm currently testing if this helps with the JMH runs in CI.

@farquet
Copy link
Collaborator

farquet commented May 11, 2021

The restriction on keeping RDDs in memory is certainly up for discussion and can be easily reverted if we don't like it, I just thought that if we can, we should try to avoid disk IO in the benchmarks and instead size heap properly.

It looks like different use cases to me. I really like the fact that we minimize disk IO for benchmarking if we have control over it. But for testing purposes, we may not want to be too memory greedy and are ok to pay a price there since performance is not the point. This typically applies for the CI use case.

So my feeling is that the right solution is to expose in-memory as a parameter, which would default to true for all benchmarking configurations and to false for the test one.

@lbulej
Copy link
Member Author

lbulej commented May 11, 2021

So my feeling is that the right solution is to expose in-memory as a parameter, which would default to true for all benchmarking configurations and to false for the test one.

You are right, that's something we should do, regardless of the reason for JMH failures.

I'm trying to find out what is the root cause because I didn't really change movie-lens that much (I have another branch with more substantial refactoring). What I find odd is that it works on OpenJ9, apparently precisely because JMH does not support it.

On OracleJDK and OpenJDK, JMH is probably telling the JVM something that results in stack overflow. The biggest change is the parsing of CSV-like parameter string with ALS configurations that produce distinct RMSE values to produce verifiable results, but the stack overflow happens after that.

lbulej added 14 commits May 12, 2021 10:39
Creating RDDs from Datasets appears to be quite consuming, so we should
just read things ourselves while we are using the RDD-based API.
SparkSession appears to be a newer API and it also provides data
readers (even though we don't use them for RDDs).
- Use the sparkContext inherited from SparkUtil so that we don't
  have to handle it explicitly.

- Use the scratch directory to prepare input files and clean up
  the parsing of CSV files.

- Use I/O API from JDK instead of Apache Commons.

- Configure ALS to store intermediate RDDs in memory only and to
  use a fixed random seed.

There are further changes coming, but these are important to
finish the scratch file handling.
- Adds explicit ALS-related parameters

- Configures ALS to only use memory for intermediate and final RDDs,
  and sets a fixed random seed.

- Read CSV using the facilities from SparkUtil
- Removes the individual ALS algorithm parameters (and their
  cartesian-product interpretation) and instead adds a CSV-like table
  of ALS configurations together with expected RMSE.

- Choose ALS parameters that lead to different RMSE values to make it
  easier to validate the results in presence of numerical instability.
Instead, let the `ensureCached()` method do it, thus relying on
a policy defined in a single place.
This might be needed by some benchmarks even though we generally want
to avoid that, because checkpointing leads to IO.
Without the checkpoint directory set, JMH runs of the benchmark
in Travis CI tend to crash with stack overflow.
@lbulej lbulej force-pushed the topic/spark-benchmark-cleanup branch from 0b3b276 to 3004bce Compare May 12, 2021 15:23
@lbulej
Copy link
Member Author

lbulej commented May 12, 2021

Oddly enough, it appears that not configuring checkpoint directory was the reason for movie-lens to fail the JMH runs with Stack Overflow Error. Something fishy is going on, because apparently there was enough memory, and interestingly, it only happened in the JMH runs (but not on OpenJ9), not normal runs. The als benchmark is actually not much different from movie-lens (because it also uses the ALS algorithm), but it did not have the problem. Based on the logging output (INFO level), there was suddenly Stack Overflow in the middle of execution inside Spark. And based on the output, it might have something to do with serialization/deserialization on storing RDD results:

21/05/12 07:15:14 ERROR Executor: Exception in task 2.0 in stage 394.0 (TID 375)
java.lang.StackOverflowError
	at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2620)
	at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2636)
	at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:3137)
	at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1646)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1707)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
	at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

Apparently, the problem in JMH runs disappeared when I put back checkpoint directory into movie-lens setup (checkpointing does I/O which is why I tried to remove it in the first place and truth be said, stack overflow error is not something I would expect in return).

Anyway, it seems we might be ready to merge this, so that we can move on.

@lbulej lbulej merged commit 31d61f1 into master May 13, 2021
@lbulej lbulej deleted the topic/spark-benchmark-cleanup branch May 13, 2021 13:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants