Permalink
Browse files

Fixed indexing bugs similar to the ones in the mos-shuffle-parallel b…

…ranch.
  • Loading branch information...
1 parent f4d0e91 commit 071edf6eb3647477f003e70dddd4d9cef2fee4b4 Mosharaf Chowdhury committed Dec 22, 2010
View
@@ -1 +1 @@
--Dspark.shuffle.class=spark.CustomParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=4 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000
+-Dspark.shuffle.class=spark.HttpBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=4 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000
@@ -201,12 +201,12 @@ extends Shuffle[K, V, C] with Logging {
override def run: Unit = {
try {
// TODO: Everything will break if BLOCKNUM is not correctly received
- // First get the BLOCKNUM file if totalBlocksInSplit(inputId) is unknown
+ // First get the BLOCKNUM file if totalBlocksInSplit(splitIndex) is unknown
if (totalBlocksInSplit(inputId) == -1) {
val url = "%s/shuffle/%d/%d/BLOCKNUM-%d".format(serverUri, shuffleId,
inputId, myId)
val inputStream = new ObjectInputStream(new URL(url).openStream())
- totalBlocksInSplit(inputId) =
+ totalBlocksInSplit(splitIndex) =
inputStream.readObject().asInstanceOf[Int]
inputStream.close()
}
@@ -239,10 +239,10 @@ extends Shuffle[K, V, C] with Logging {
logInfo("Reading " + url + " took " + readTime + " millis.")
// Reception completed. Update stats.
- hasBlocksInSplit(inputId) = hasBlocksInSplit(inputId) + 1
+ hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received
- if (hasBlocksInSplit(inputId) == totalBlocksInSplit(inputId)) {
+ if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
hasSplitsBitVector.synchronized {
hasSplitsBitVector.set(splitIndex)
}
@@ -194,22 +194,22 @@ extends Shuffle[K, V, C] with Logging {
override def run: Unit = {
try {
- // First get the INDEX file if totalBlocksInSplit(inputId) is unknown
- if (totalBlocksInSplit(inputId) == -1) {
+ // First get the INDEX file if totalBlocksInSplit(splitIndex) is unknown
+ if (totalBlocksInSplit(splitIndex) == -1) {
val url = "%s/shuffle/%d/%d/INDEX-%d".format(serverUri, shuffleId,
inputId, myId)
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
- blocksInSplit(inputId) +=
+ blocksInSplit(splitIndex) +=
inputStream.readObject().asInstanceOf[Long]
}
} catch {
case e: EOFException => {}
}
- totalBlocksInSplit(inputId) = blocksInSplit(inputId).size
+ totalBlocksInSplit(splitIndex) = blocksInSplit(splitIndex).size
inputStream.close()
}
@@ -220,11 +220,11 @@ extends Shuffle[K, V, C] with Logging {
url.openConnection().asInstanceOf[HttpURLConnection]
// Set the range to download
- val blockStartsAt = hasBlocksInSplit(inputId) match {
+ val blockStartsAt = hasBlocksInSplit(splitIndex) match {
case 0 => 0
- case _ => blocksInSplit(inputId)(hasBlocksInSplit(inputId) - 1) + 1
+ case _ => blocksInSplit(splitIndex)(hasBlocksInSplit(splitIndex) - 1) + 1
}
- val blockEndsAt = blocksInSplit(inputId)(hasBlocksInSplit(inputId))
+ val blockEndsAt = blocksInSplit(splitIndex)(hasBlocksInSplit(splitIndex))
httpConnection.setRequestProperty("Range",
"bytes=" + blockStartsAt + "-" + blockEndsAt)
@@ -261,10 +261,10 @@ extends Shuffle[K, V, C] with Logging {
httpConnection.disconnect()
// Reception completed. Update stats.
- hasBlocksInSplit(inputId) = hasBlocksInSplit(inputId) + 1
+ hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received
- if (hasBlocksInSplit(inputId) == totalBlocksInSplit(inputId)) {
+ if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
hasSplitsBitVector.synchronized {
hasSplitsBitVector.set(splitIndex)
}

0 comments on commit 071edf6

Please sign in to comment.