Skip to content

Commit

Permalink
[SPARK-23806] Broadcast.unpersist can cause fatal exception when used…
Browse files Browse the repository at this point in the history
… with dynamic allocation apache#20924

What changes were proposed in this pull request?

ignore errors when you are waiting for a broadcast.unpersist. This is handling it the same way as doing rdd.unpersist in https://issues.apache.org/jira/browse/SPARK-22618
How was this patch tested?

Patch was tested manually against a couple jobs that exhibit this behavior, with the change the application no longer dies due to this and just prints the warning.

Please review http://spark.apache.org/contributing.html before opening a pull request.
  • Loading branch information
zzcclp committed Dec 6, 2018
1 parent 7e340c8 commit 96ee540
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,15 @@ class BlockManagerMasterEndpoint(
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId", e)
0 // zero blocks were removed
}
}.toSeq

Future.sequence(futures)
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
Expand Down

0 comments on commit 96ee540

Please sign in to comment.