From fd4bb9e6bf8c0ef824c2515665abc3156c6d9963 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 9 May 2015 18:07:44 -0700 Subject: [PATCH] Use own ByteBufferOutputStream rather than Kryo's --- .../unsafe/ByteBufferOutputStream.java | 46 +++++++++++++++++++ .../unsafe/UnsafeShuffleExternalSorter.java | 2 +- .../shuffle/unsafe/UnsafeShuffleSorter.java | 2 +- .../shuffle/unsafe/UnsafeShuffleWriter.java | 2 - 4 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/shuffle/unsafe/ByteBufferOutputStream.java diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/ByteBufferOutputStream.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/ByteBufferOutputStream.java new file mode 100644 index 0000000000000..3410cd2911ebe --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/ByteBufferOutputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.unsafe; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +class ByteBufferOutputStream extends OutputStream { + + private final ByteBuffer byteBuffer; + + public ByteBufferOutputStream(ByteBuffer byteBuffer) { + this.byteBuffer = byteBuffer; + } + + @Override + public void write(int b) throws IOException { + byteBuffer.put((byte) b); + } + + @Override + public void write(byte[] b) throws IOException { + byteBuffer.put(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + byteBuffer.put(b, off, len); + } +} diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index 6ca3e09e3e439..892a78796335b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -53,7 +53,7 @@ * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a * specialized merge procedure that avoids extra serialization/deserialization. */ -public final class UnsafeShuffleExternalSorter { +final class UnsafeShuffleExternalSorter { private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class); diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java index d9ffe9a44fec7..d15da8a7ee126 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java @@ -21,7 +21,7 @@ import org.apache.spark.util.collection.Sorter; -public final class UnsafeShuffleSorter { +final class UnsafeShuffleSorter { private final Sorter sorter; private static final class SortComparator implements Comparator { diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index b442162946afb..d20c19547adf1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -28,7 +28,6 @@ import scala.reflect.ClassTag; import scala.reflect.ClassTag$; -import com.esotericsoftware.kryo.io.ByteBufferOutputStream; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; @@ -75,7 +74,6 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private UnsafeShuffleExternalSorter sorter = null; private byte[] serArray = null; private ByteBuffer serByteBuffer; - // TODO: we should not depend on this class from Kryo; copy its source or find an alternative private SerializationStream serOutputStream; /**