Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Unsafe shuffle writer support in RSS #53

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

mayurdb
Copy link
Collaborator

@mayurdb mayurdb commented Jul 23, 2021

Key traits

  • Stores the map output data in serialized form
  • Buffers the data in memory as much as possible. Chunk the data before sending it to RSS servers.
  • Avoids any extra copying of the data before spill
  • Uses Java’s unsafe apis to acquire memory for storing the serialized record
  • After receiving a record, it is serialized and stored at an offset in the already acquired page in memory.
  • A tuple of (PartitionId, Memory location where record is stored) is maintained in an Array
  • Array is sorted by the partition before spilling. Once sorted, data is read from the accompanying pointer location

Details

Implementation uses Java's unsafe APIs for acquiring large chunks of memory. Tuple of partition Id and memory location where a record is stored in memory is stored in an array. The advantage of doing this is that the data can be sorted by just sorting the metadata array on. Data before spilling is read into chunks of configurable size and sent over the network. Similar to the above two approaches, this approach also interfaces with the TMM to acquire more memory for storing records or for expanding the metadata array.

Open Source Spark has already implemented the unsafe shuffle writer and currently gets used for most executing of the shuffle writes. Components from Spark’s implementation around memory allocation, storing data in memory and metadata based sort were reused in the implementation. Logic around spilling triggers and around reading data from the memory had to be changed to be compatible and more importantly performant with RSS.

image

Performance Numbers
image

TODOs:

  1. This is still a WIP and needs refactoring to be production ready
  2. ShuffleWithAggregationTest needs to be fixed as some UTs are failing with the records read/written checks
  3. The existing Scala UTs are passing. New UTs need to be added
  4. RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes
  5. General code cleanup

(cherry picked from commit a876e216c65f9381f6f2e551d701db978db995ee)
Copy link

@vladhlinsky vladhlinsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mayurdb.
This looks like a great improvement!
I understand that this is a WIP PR, but, please, let me leave a few comments from my side.
Thanks!

.createWithDefault(true)
val unsafeShuffleWriterBufferSize: ConfigEntry[Long] =
ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize")
.doc("Use unsafe shuffle writer")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the description should be updated, as it is the same as for the previous config entry.

ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize")
.doc("Use unsafe shuffle writer")
.longConf
.createWithDefault(5*1024l*1024)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark uses bytesConf for memory sizes, so users could specify values as bytes (b), kibibytes (k), mebibytes (m), etc. E.g. 50b, 100k, or 250m. It's also possible to omit a suffix. Should we also use bytesConf, as it's done for example for spark.memory.offHeap.size?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a good suggestion , we should consider this

var totalBufferedSize: Long = 0
var totalSendDataBlockTime: Long = 0

private val writerBufferSize = conf.get(RssOpts.unsafeShuffleWriterBufferSize);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like writerBufferSize is not used.

private val serializerInstance = serializer.newInstance()

def addRecord(partitionId: Int, record: Product2[K, V]): Seq[(Int, Array[Byte])] = {
var totalLookUpTime = 0L

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like totalLookUpTime is only being updated, but never read.


private var recordsWrittenCount: Int = 0

private var totalSerializationTime: Long = 0l

private var totalMemoryFethcWaitTime: Long = 0l

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: totalMemoryFethcWaitTime => totalMemoryFetchWaitTime


private final int initialSize;

public RssShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort, boolean a) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest argument boolean a is not used.

@vladhlinsky
Copy link

vladhlinsky commented Jul 27, 2021

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes.
Please, correct me If I'm missing something.

@mayurdb
Copy link
Collaborator Author

mayurdb commented Jul 27, 2021

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes.
Please, correct me If I'm missing something.

Oh that's great. These classes are just package private, so ideally they should just work for all the cases where the package structure is replicated. Also, the compile and runtime Spark jars will be different in most of the cases while using RSS.

I actually haven't looked into the issue as I just wanted to try this out and get the performance number first. I will check the details of the IllegalAccessError and get back

assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember some serializer like JavaSerializer may use object reference in serialized stream, each serialized object inside the serialized stream will not be independent. Thus getting the bytes from MyByteArrayOutputStream each time after adding key/value may not be always safe.

KyroSerializer does not use object reference (if I remember correctly), and will be safe here.

Maybe add some check here to make sure the serializer is KyroSerializer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always use kyroSerializer, can we just fail if we are not using Kyro

@mabansal mabansal self-requested a review July 30, 2021 00:34
Copy link
Collaborator

@mabansal mabansal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like needs to be updated

import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;

public class RadixSort {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you have removed this class and using the spark class, why this is still showing up here

import org.apache.spark.unsafe.array.LongArray;

public class RadixSort {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove this class and use spark classes

import org.apache.spark.util.Utils;


final class RssShuffleExternalSorter extends MemoryConsumer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add some comments here , why this class is being added and what is the functionality for that


private long sizeThreshold;

long numRecords = 0l;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is not private

private MemoryBlock currentPage = null;
private long pageCursor = -1;

long bytesWritten = 0l;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be private as well

// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
// TODO: Fix recordsWritten
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add what we need to fix or just fix it and remove this TODO

ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize")
.doc("Use unsafe shuffle writer")
.longConf
.createWithDefault(5*1024l*1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a good suggestion , we should consider this


private val enableMapSideAggregation = shuffleDependency.mapSideCombine && conf.get(RssOpts.enableMapSideAggregation)
val enableMapSideAggregation = shuffleDependency.mapSideCombine && conf.get(RssOpts.enableMapSideAggregation)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make them private

import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.storage.BlockId

class RssTestMemoryManager(conf: SparkConf)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some context here for the class


private val writeClientCloseLock = new Object()

var totalCompressionTime: Long = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these all variables should be private

@mayurdb
Copy link
Collaborator Author

mayurdb commented Aug 3, 2021

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.

Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes.
Please, correct me If I'm missing something.

@vladhlinsky did you run these commands from within the Intellij/Any other IDE or ran a spark-submit externally and passed the RSS jars? Also, if the command was ran externally, can you please confirm if RSS was used?

These classes are package private in Spark. To be able to access them, we will need same package structure and also both the interface and implementation should be loaded by same class loader. Looks like I'm hitting the second issue.

@mayurdb
Copy link
Collaborator Author

mayurdb commented Aug 3, 2021

RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat are just copy pasted from Spark code. They are package private in Spark but even after creating same package structure in RSS, they throw IllegalAccessError at runtime. This needs to be fixed, so that we don't need to add those classes

@mayurdb, I've deleted RadixSort, RssPackedRecordPointer, RssShuffleInMemorySorter, ShuffleSortDataFormat and modified RssShuffleExternalSorter to use Spark's classes. This worked fine in my environment with Spark version 2.4.3 and 2.4.4.
Tested locally in the spark-shell as follows:

...
scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).groupByKey(3).collect()
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 10, 1, 4)), (2,CompactBuffer(8, 2, 5)))

scala> sc.parallelize(1 to 10, 2).map(x => (x % 3, x)).reduceByKey((val1, _) => val1).collect()
res1: Array[(Int, Int)] = Array((0,6), (2,8), (1,7))

The IllegalAccessError could be thrown if the definition of a class has incompatibly changed. I think this may indicate that the Spark version used to compile RSS jars differs from the version, which is available on cluster nodes.
Please, correct me If I'm missing something.

@vladhlinsky did you run these commands from within the Intellij/Any other IDE or ran a spark-submit externally and passed the RSS jars? Also, if the command was ran externally, can you please confirm if RSS was used?

These classes are package private in Spark. To be able to access them, we will need same package structure and also both the interface and implementation should be loaded by same class loader. Looks like I'm hitting the second issue.

scala> Class.forName("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter").getClassLoader()
res19: ClassLoader = sun.misc.Launcher$AppClassLoader@36aa7bc2

// Class for RSS code base
scala>  Class.forName("org.apache.spark.shuffle.sort.RssShuffleExternalSorter").getClassLoader()
res20: ClassLoader = scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@2634d000```

@vladhlinsky
Copy link

vladhlinsky commented Aug 6, 2021

@mayurdb, I ran a spark-shell locally and verified that RSS was used, but in my case classes were loaded by the same class loader, so looks like it's really the cause of the issue:

scala> Class.forName("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter").getClassLoader()
res2: ClassLoader = sun.misc.Launcher$AppClassLoader@2f0e140b

scala> Class.forName("org.apache.spark.shuffle.sort.RssShuffleExternalSorter").getClassLoader()
res3: ClassLoader = sun.misc.Launcher$AppClassLoader@2f0e140b

@YutingWang98
Copy link

Hi @mayurdb, we have also been experiencing memory and map stage latency issues using Rss. We plan to test and work on this implementation as well. Wondering if you have any updates about this PR that you can share with us. Many thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants