Skip to content

Commit

Permalink
[SPARK-25299] Shuffle locations api (#517)
Browse files Browse the repository at this point in the history
Implements the shuffle locations API as part of SPARK-25299.

This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly.

This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored.

There are a few caveats to this design:

- We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation.

- `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
  • Loading branch information
mccheah authored and bulldozer-bot[bot] committed Apr 19, 2019
1 parent 8b021b3 commit 16caee4
Show file tree
Hide file tree
Showing 29 changed files with 463 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.Serializable;

/**
* Represents metadata about where shuffle blocks were written in a single map task.
* <p>
* This is optionally returned by shuffle writers. The inner shuffle locations may
* be accessed by shuffle readers. Shuffle locations are only necessary when the
* location of shuffle blocks needs to be managed by the driver; shuffle plugins
* may choose to use an external database or other metadata management systems to
* track the locations of shuffle blocks instead.
*/
@Experimental
public interface MapShuffleLocations extends Serializable {

/**
* Get the location for a given shuffle block written by this map task.
*/
ShuffleLocation getLocationForBlock(int reduceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

/**
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers
* and writers are expected to cast this down to an implementation-specific representation.
*/
public interface ShuffleLocation {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.api.java.Optional;

/**
* :: Experimental ::
Expand All @@ -31,7 +32,7 @@
public interface ShuffleMapOutputWriter {
ShufflePartitionWriter getNextPartitionWriter() throws IOException;

void commitAllPartitions() throws IOException;
Optional<MapShuffleLocations> commitAllPartitions() throws IOException;

void abort(Throwable error) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand Down Expand Up @@ -134,8 +136,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
Optional<MapShuffleLocations> blockLocs = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
blockLocs.orNull(),
partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -168,8 +173,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
Optional<MapShuffleLocations> mapLocations = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapLocations.orNull(),
partitionLengths);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand All @@ -178,6 +186,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.ShuffleLocation;
import org.apache.spark.storage.BlockManagerId;

import java.util.Objects;

public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation {

/**
* We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be
* feasible.
*/
private static final LoadingCache<BlockManagerId, DefaultMapShuffleLocations>
DEFAULT_SHUFFLE_LOCATIONS_CACHE =
CacheBuilder.newBuilder()
.maximumSize(BlockManagerId.blockManagerIdCacheSize())
.build(new CacheLoader<BlockManagerId, DefaultMapShuffleLocations>() {
@Override
public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) {
return new DefaultMapShuffleLocations(blockManagerId);
}
});

private final BlockManagerId location;

public DefaultMapShuffleLocations(BlockManagerId blockManagerId) {
this.location = blockManagerId;
}

public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) {
return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId);
}

@Override
public ShuffleLocation getLocationForBlock(int reduceId) {
return this;
}

public BlockManagerId getBlockManagerId() {
return location;
}

@Override
public boolean equals(Object other) {
return other instanceof DefaultMapShuffleLocations
&& Objects.equals(((DefaultMapShuffleLocations) other).location, location);
}

@Override
public int hashCode() {
return Objects.hashCode(location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -221,6 +223,7 @@ void closeAndWriteOutput() throws IOException {
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
final long[] partitionLengths;
Optional<MapShuffleLocations> mapLocations;
try {
try {
partitionLengths = mergeSpills(spills, mapWriter);
Expand All @@ -231,7 +234,7 @@ void closeAndWriteOutput() throws IOException {
}
}
}
mapWriter.commitAllPartitions();
mapLocations = mapWriter.commitAllPartitions();
} catch (Exception e) {
try {
mapWriter.abort(e);
Expand All @@ -240,7 +243,10 @@ void closeAndWriteOutput() throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapLocations.orNull(),
partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public ShuffleWriteSupport writes() {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}
return new DefaultShuffleWriteSupport(sparkConf, blockResolver);
return new DefaultShuffleWriteSupport(sparkConf, blockResolver, blockManager.shuffleServerId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.io.OutputStream;
import java.nio.channels.FileChannel;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +53,7 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
private final int bufferSize;
private int currPartitionId = 0;
private long currChannelPosition;
private final BlockManagerId shuffleServerId;

private final File outputFile;
private File outputTempFile;
Expand All @@ -61,11 +66,13 @@ public DefaultShuffleMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions,
BlockManagerId shuffleServerId,
ShuffleWriteMetricsReporter metrics,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.shuffleServerId = shuffleServerId;
this.metrics = metrics;
this.blockResolver = blockResolver;
this.bufferSize =
Expand All @@ -90,10 +97,11 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
}

@Override
public void commitAllPartitions() throws IOException {
public Optional<MapShuffleLocations> commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.storage.BlockManagerId;

public class DefaultShuffleWriteSupport implements ShuffleWriteSupport {

private final SparkConf sparkConf;
private final IndexShuffleBlockResolver blockResolver;
private final BlockManagerId shuffleServerId;

public DefaultShuffleWriteSupport(
SparkConf sparkConf,
IndexShuffleBlockResolver blockResolver) {
IndexShuffleBlockResolver blockResolver,
BlockManagerId shuffleServerId) {
this.sparkConf = sparkConf;
this.blockResolver = blockResolver;
this.shuffleServerId = shuffleServerId;
}

@Override
Expand All @@ -41,7 +45,7 @@ public ShuffleMapOutputWriter createMapOutputWriter(
int mapId,
int numPartitions) {
return new DefaultShuffleMapOutputWriter(
shuffleId, mapId, numPartitions,
shuffleId, mapId, numPartitions, shuffleServerId,
TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf);
}
}
Loading

0 comments on commit 16caee4

Please sign in to comment.