Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into streaming

Conflicts:
	.gitignore
  • Loading branch information...
commit c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 2 parents 48c7e37 + 78ffe16
@tdas tdas authored
Showing with 13,250 additions and 3,293 deletions.
  1. +1 −0  .gitignore
  2. +7 −2 README.md
  3. +39 −2 bagel/pom.xml
  4. +89 −8 bagel/src/main/scala/spark/bagel/Bagel.scala
  5. +19 −0 bagel/src/test/scala/bagel/BagelSuite.scala
  6. +6 −4 bin/spark-daemon.sh
  7. +1 −1  bin/spark-daemons.sh
  8. +1 −1  bin/start-master.sh
  9. +3 −2 bin/start-slave.sh
  10. +10 −1 bin/start-slaves.sh
  11. +1 −1  bin/stop-master.sh
  12. +11 −1 bin/stop-slaves.sh
  13. +1 −0  conf/spark-env.sh.template
  14. +86 −6 core/pom.xml
  15. +3 −0  core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
  16. +3 −0  core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
  17. +23 −0 core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
  18. +13 −0 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
  19. +13 −0 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
  20. +63 −0 core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
  21. +329 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
  22. +77 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
  23. +272 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
  24. +105 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
  25. +171 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
  26. +547 −0 core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala
  27. +42 −0 core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
  28. +3 −0  core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
  29. +3 −0  core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
  30. +23 −0 core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
  31. +72 −0 core/src/main/java/spark/network/netty/FileClient.java
  32. +24 −0 core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
  33. +43 −0 core/src/main/java/spark/network/netty/FileClientHandler.java
  34. +86 −0 core/src/main/java/spark/network/netty/FileServer.java
  35. +25 −0 core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
  36. +65 −0 core/src/main/java/spark/network/netty/FileServerHandler.java
  37. +12 −0 core/src/main/java/spark/network/netty/PathResolver.java
  38. +25 −4 core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
  39. +1 −1  core/src/main/scala/spark/CacheManager.scala
  40. +15 −8 core/src/main/scala/spark/ClosureCleaner.scala
  41. +3 −1 core/src/main/scala/spark/Dependency.scala
  42. +16 −9 core/src/main/scala/spark/FetchFailedException.scala
  43. +7 −5 core/src/main/scala/spark/HadoopWriter.scala
  44. +18 −11 core/src/main/scala/spark/KryoSerializer.scala
  45. +4 −0 core/src/main/scala/spark/Logging.scala
  46. +63 −52 core/src/main/scala/spark/MapOutputTracker.scala
  47. +109 −27 core/src/main/scala/spark/PairRDDFunctions.scala
  48. +29 −0 core/src/main/scala/spark/Partitioner.scala
  49. +218 −23 core/src/main/scala/spark/RDD.scala
  50. +11 −4 core/src/main/scala/spark/RDDCheckpointData.scala
  51. +17 −12 core/src/main/scala/spark/SequenceFileRDDFunctions.scala
  52. +6 −1 core/src/main/scala/spark/ShuffleFetcher.scala
  53. +1 −1  core/src/main/scala/spark/SizeEstimator.scala
  54. +109 −53 core/src/main/scala/spark/SparkContext.scala
  55. +75 −15 core/src/main/scala/spark/SparkEnv.scala
  56. +7 −2 core/src/main/scala/spark/TaskContext.scala
  57. +13 −3 core/src/main/scala/spark/TaskEndReason.scala
  58. +173 −38 core/src/main/scala/spark/Utils.scala
  59. +6 −0 core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
  60. +52 −10 core/src/main/scala/spark/api/java/JavaPairRDD.scala
  61. +17 −6 core/src/main/scala/spark/api/java/JavaRDD.scala
  62. +49 −1 core/src/main/scala/spark/api/java/JavaRDDLike.scala
  63. +2 −2 core/src/main/scala/spark/api/java/JavaSparkContext.scala
  64. +11 −0 core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
  65. +39 −41 core/src/main/scala/spark/api/python/PythonRDD.scala
  66. +95 −0 core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
  67. +3 −2 core/src/main/scala/spark/deploy/ApplicationDescription.scala
  68. +17 −4 core/src/main/scala/spark/deploy/DeployMessage.scala
  69. +3 −2 core/src/main/scala/spark/deploy/JsonProtocol.scala
  70. +4 −4 core/src/main/scala/spark/deploy/LocalSparkCluster.scala
  71. +10 −4 core/src/main/scala/spark/deploy/client/Client.scala
  72. +1 −1  core/src/main/scala/spark/deploy/client/ClientListener.scala
  73. +2 −2 core/src/main/scala/spark/deploy/client/TestClient.scala
  74. +4 −2 core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
  75. +24 −16 core/src/main/scala/spark/deploy/master/Master.scala
  76. +12 −5 core/src/main/scala/spark/deploy/master/MasterArguments.scala
  77. +2 −2 core/src/main/scala/spark/deploy/master/MasterWebUI.scala
  78. +9 −0 core/src/main/scala/spark/deploy/master/WorkerInfo.scala
  79. +4 −2 core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
  80. +20 −19 core/src/main/scala/spark/deploy/worker/Worker.scala
  81. +10 −3 core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
  82. +5 −4 core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
  83. +84 −62 core/src/main/scala/spark/executor/Executor.scala
  84. +10 −6 core/src/main/scala/spark/executor/MesosExecutorBackend.scala
  85. +35 −6 core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
  86. +83 −0 core/src/main/scala/spark/executor/TaskMetrics.scala
  87. +94 −0 core/src/main/scala/spark/network/BufferMessage.scala
  88. +214 −77 core/src/main/scala/spark/network/Connection.scala
  89. +389 −160 core/src/main/scala/spark/network/ConnectionManager.scala
  90. +21 −0 core/src/main/scala/spark/network/ConnectionManagerId.scala
  91. +17 −161 core/src/main/scala/spark/network/Message.scala
  92. +25 −0 core/src/main/scala/spark/network/MessageChunk.scala
  93. +58 −0 core/src/main/scala/spark/network/MessageChunkHeader.scala
  94. +57 −0 core/src/main/scala/spark/network/netty/FileHeader.scala
  95. +101 −0 core/src/main/scala/spark/network/netty/ShuffleCopier.scala
  96. +53 −0 core/src/main/scala/spark/network/netty/ShuffleSender.scala
  97. +2 −7 core/src/main/scala/spark/rdd/BlockRDD.scala
  98. +21 −12 core/src/main/scala/spark/rdd/CheckpointRDD.scala
  99. +42 −8 core/src/main/scala/spark/rdd/CoGroupedRDD.scala
  100. +2 −2 core/src/main/scala/spark/rdd/CoalescedRDD.scala
  101. +16 −0 core/src/main/scala/spark/rdd/EmptyRDD.scala
  102. +16 −24 core/src/main/scala/spark/rdd/HadoopRDD.scala
  103. +103 −0 core/src/main/scala/spark/rdd/JdbcRDD.scala
  104. +8 −2 core/src/main/scala/spark/rdd/NewHadoopRDD.scala
  105. +22 −5 core/src/main/scala/spark/rdd/PipedRDD.scala
  106. +8 −4 core/src/main/scala/spark/rdd/ShuffledRDD.scala
  107. +41 −41 core/src/main/scala/spark/rdd/SubtractedRDD.scala
  108. +138 −0 core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
  109. +30 −11 core/src/main/scala/spark/rdd/ZippedRDD.scala
  110. +4 −1 core/src/main/scala/spark/scheduler/ActiveJob.scala
  111. +84 −40 core/src/main/scala/spark/scheduler/DAGScheduler.scala
  112. +13 −2 core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
  113. +156 −0 core/src/main/scala/spark/scheduler/InputFormatInfo.scala
  114. +306 −0 core/src/main/scala/spark/scheduler/JobLogger.scala
  115. +9 −1 core/src/main/scala/spark/scheduler/ResultTask.scala
  116. +49 −15 core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
  117. +178 −0 core/src/main/scala/spark/scheduler/SparkListener.scala
  118. +61 −0 core/src/main/scala/spark/scheduler/SplitInfo.scala
  119. +2 −2 core/src/main/scala/spark/scheduler/Stage.scala
  120. +12 −0 core/src/main/scala/spark/scheduler/StageInfo.scala
  121. +5 −2 core/src/main/scala/spark/scheduler/Task.scala
  122. +5 −2 core/src/main/scala/spark/scheduler/TaskResult.scala
  123. +4 −0 core/src/main/scala/spark/scheduler/TaskScheduler.scala
  124. +7 −1 core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
  125. +9 −2 core/src/main/scala/spark/scheduler/TaskSet.scala
  126. +333 −33 core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
  127. +747 −0 core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
  128. +104 −0 core/src/main/scala/spark/scheduler/cluster/Pool.scala
  129. +27 −0 core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
  130. +115 −0 core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
  131. +64 −0 core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
  132. +7 −0 core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
  133. +5 −4 core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
  134. +5 −2 core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
  135. +20 −17 core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
  136. +7 −1 core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
  137. +9 −421 core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
  138. +1 −1  core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
  139. +154 −66 core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
  140. +172 −0 core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
  141. +12 −28 core/src/main/scala/spark/serializer/Serializer.scala
  142. +45 −0 core/src/main/scala/spark/serializer/SerializerManager.scala
  143. +5 −0 core/src/main/scala/spark/storage/BlockException.scala
  144. +10 −0 core/src/main/scala/spark/storage/BlockFetchTracker.scala
  145. +330 −0 core/src/main/scala/spark/storage/BlockFetcherIterator.scala
  146. +308 −269 core/src/main/scala/spark/storage/BlockManager.scala
  147. +43 −19 core/src/main/scala/spark/storage/BlockManagerId.scala
  148. +25 −37 core/src/main/scala/spark/storage/BlockManagerMaster.scala
  149. +101 −114 core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
  150. +7 −4 core/src/main/scala/spark/storage/BlockManagerMessages.scala
  151. +7 −1 core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
  152. +13 −10 core/src/main/scala/spark/storage/BlockManagerUI.scala
  153. +9 −19 core/src/main/scala/spark/storage/BlockManagerWorker.scala
  154. +1 −0  core/src/main/scala/spark/storage/BlockMessageArray.scala
  155. +50 −0 core/src/main/scala/spark/storage/BlockObjectWriter.scala
  156. +157 −33 core/src/main/scala/spark/storage/DiskStore.scala
  157. +5 −3 core/src/main/scala/spark/storage/MemoryStore.scala
  158. +50 −0 core/src/main/scala/spark/storage/ShuffleBlockManager.scala
  159. +2 −6 core/src/main/scala/spark/storage/StorageLevel.scala
  160. +28 −19 core/src/main/scala/spark/storage/StorageUtils.scala
  161. +2 −3 core/src/main/scala/spark/storage/ThreadingTest.scala
  162. +13 −6 core/src/main/scala/spark/util/AkkaUtils.scala
  163. +45 −0 core/src/main/scala/spark/util/BoundedPriorityQueue.scala
  164. +25 −0 core/src/main/scala/spark/util/CompletionIterator.scala
  165. +65 −0 core/src/main/scala/spark/util/Distribution.scala
  166. +71 −0 core/src/main/scala/spark/util/NextIterator.scala
  167. +16 −10 core/src/main/scala/spark/util/StatCounter.scala
  168. +8 −0 core/src/main/scala/spark/util/TimeStampedHashMap.scala
  169. +5 −7 core/src/main/twirl/spark/deploy/master/app_details.scala.html
  170. +1 −1  core/src/main/twirl/spark/deploy/master/executor_row.scala.html
  171. +1 −1  core/src/main/twirl/spark/deploy/master/index.scala.html
  172. +1 −1  core/src/main/twirl/spark/deploy/master/worker_row.scala.html
  173. +1 −1  core/src/main/twirl/spark/deploy/worker/index.scala.html
  174. +1 −1  core/src/main/twirl/spark/storage/worker_table.scala.html
  175. +14 −0 core/src/test/resources/fairscheduler.xml
  176. +82 −7 core/src/test/scala/spark/DistributedSuite.scala
  177. +46 −0 core/src/test/scala/spark/FileSuite.scala
  178. +94 −1 core/src/test/scala/spark/JavaAPISuite.java
  179. +1 −0  core/src/test/scala/spark/KryoSerializerSuite.scala
  180. +2 −1  core/src/test/scala/spark/LocalSparkContext.scala
  181. +50 −42 core/src/test/scala/spark/MapOutputTrackerSuite.scala
  182. +23 −6 core/src/test/scala/spark/PartitioningSuite.scala
  183. +39 −0 core/src/test/scala/spark/PipedRDDSuite.scala
  184. +159 −1 core/src/test/scala/spark/RDDSuite.scala
  185. +17 −0 core/src/test/scala/spark/ShuffleNettySuite.scala
  186. +160 −3 core/src/test/scala/spark/ShuffleSuite.scala
  187. +34 −0 core/src/test/scala/spark/ZippedPartitionsSuite.scala
  188. +56 −0 core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
  189. +250 −0 core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
  190. +185 −445 core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
  191. +105 −0 core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
  192. +206 −0 core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
  193. +85 −0 core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
  194. +81 −18 core/src/test/scala/spark/storage/BlockManagerSuite.scala
  195. +25 −0 core/src/test/scala/spark/util/DistributionSuite.scala
  196. +68 −0 core/src/test/scala/spark/util/NextIteratorSuite.scala
  197. +1 −1  docs/README.md
  198. +3 −3 docs/_config.yml
  199. +20 −3 docs/_layouts/global.html
  200. +4 −4 docs/_plugins/copy_api_dirs.rb
  201. +66 −0 docs/building-with-maven.md
  202. +7 −0 docs/configuration.md
  203. +15 −15 docs/css/bootstrap.css
  204. +1 −1  docs/css/bootstrap.min.css
  205. +10 −7 docs/ec2-scripts.md
  206. +8 −10 docs/index.md
  207. +1 −1  docs/java-programming-guide.md
  208. +2 −2 docs/python-programming-guide.md
  209. +25 −22 docs/quick-start.md
  210. +49 −13 docs/running-on-yarn.md
  211. +12 −4 docs/scala-programming-guide.md
  212. +16 −17 docs/streaming-programming-guide.md
  213. +5 −5 docs/tuning.md
  214. +1 −1  ec2/README
  215. +25 −12 ec2/spark_ec2.py
  216. +91 −3 examples/pom.xml
  217. +5 −1 examples/src/main/java/spark/examples/JavaHdfsLR.java
  218. +114 −0 examples/src/main/java/spark/examples/JavaKMeans.java
  219. +114 −0 examples/src/main/java/spark/examples/JavaLogQuery.java
  220. +48 −0 examples/src/main/java/spark/examples/JavaSparkPi.java
  221. +3 −2 examples/src/main/java/spark/examples/JavaTC.java
  222. +3 −2 examples/src/main/java/spark/examples/JavaWordCount.java
  223. +2 −1  examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
  224. +2 −2 examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
  225. +2 −1  examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
  226. +6 −4 examples/src/main/scala/spark/examples/BroadcastTest.scala
  227. +196 −0 examples/src/main/scala/spark/examples/CassandraTest.scala
  228. +3 −2 examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
  229. +3 −2 examples/src/main/scala/spark/examples/GroupByTest.scala
  230. +35 −0 examples/src/main/scala/spark/examples/HBaseTest.scala
  231. +2 −1  examples/src/main/scala/spark/examples/HdfsTest.scala
  232. +3 −1 examples/src/main/scala/spark/examples/LocalALS.scala
  233. +72 −69 examples/src/main/scala/spark/examples/LocalKMeans.scala
  234. +3 −0  examples/src/main/scala/spark/examples/LocalLR.scala
  235. +3 −1 examples/src/main/scala/spark/examples/LogQuery.scala
  236. +13 −9 examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
  237. +4 −3 examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
  238. +8 −7 examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
  239. +32 −30 examples/src/main/scala/spark/examples/SparkALS.scala
  240. +12 −2 examples/src/main/scala/spark/examples/SparkHdfsLR.scala
  241. +6 −1 examples/src/main/scala/spark/examples/SparkKMeans.scala
  242. +5 −1 examples/src/main/scala/spark/examples/SparkLR.scala
  243. +3 −1 examples/src/main/scala/spark/examples/SparkPi.scala
  244. +2 −2 examples/src/main/scala/spark/examples/SparkTC.scala
  245. +2 −1  examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
  246. +2 −1  examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
  247. +2 −1  examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
  248. +2 −2 examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
  249. +2 −1  examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
  250. +4 −3 examples/src/main/scala/spark/streaming/examples/QueueStream.scala
  251. +3 −2 examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
  252. +3 −2 examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
  253. +2 −1  examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
  254. +2 −1  examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
  255. +3 −2 examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
  256. +3 −2 examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
  257. +104 −14 pom.xml
  258. +92 −31 project/SparkBuild.scala
  259. +1 −1  project/build.properties
  260. +6 −4 project/plugins.sbt
  261. +1 −1  python/examples/transitive_closure.py
  262. +158 −0 python/pyspark/daemon.py
  263. +10 −10 python/pyspark/join.py
  264. +29 −29 python/pyspark/rdd.py
  265. +4 −0 python/pyspark/serializers.py
  266. +43 −0 python/pyspark/tests.py
  267. +34 −21 python/pyspark/worker.py
  268. +57 −2 repl-bin/pom.xml
  269. +70 −13 repl/pom.xml
  270. +1 −2  repl/src/main/scala/spark/repl/ExecutorClassLoader.scala
  271. +10 −3 repl/src/main/scala/spark/repl/SparkILoop.scala
  272. +1 −0  repl/src/test/scala/spark/repl/ReplSuite.scala
  273. +83 −48 run
  274. +25 −1 run2.cmd
  275. +1 −1  sbt/sbt
  276. +45 −2 streaming/pom.xml
  277. +27 −7 streaming/src/main/scala/spark/streaming/Checkpoint.scala
  278. +1 −1  streaming/src/main/scala/spark/streaming/DStreamGraph.scala
  279. +31 −11 streaming/src/main/scala/spark/streaming/StreamingContext.scala
  280. +54 −1 streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
  281. +1 −0  streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
  282. +1 −1  streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala
  283. +9 −35 streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
  284. +7 −2 streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
  285. +1 −1  streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
  286. +5 −2 streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
  287. +3 −1 streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
  288. +16 −11 streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
  289. +1 −0  streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
View
1  .gitignore
@@ -37,3 +37,4 @@ dependency-reduced-pom.xml
.ensime
.ensime_lucene
checkpoint
+derby.log
View
9 README.md
@@ -12,11 +12,16 @@ This README file only contains basic setup instructions.
## Building
-Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT),
-which is packaged with it. To build Spark and its example programs, run:
+Spark requires Scala 2.9.2 (Scala 2.10 is not yet supported). The project is
+built using Simple Build Tool (SBT), which is packaged with it. To build
+Spark and its example programs, run:
sbt/sbt package
+Spark also supports building using Maven. If you would like to build using Maven,
+see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html)
+in the spark documentation..
+
To run Spark, you will need to have Scala's bin directory in your `PATH`, or
you will need to set the `SCALA_HOME` environment variable to point to where
you've installed Scala. Scala must be accessible through one of these
View
41 bagel/pom.xml
@@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -102,5 +102,42 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
View
97 bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -4,8 +4,37 @@ import spark._
import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
+import storage.StorageLevel
object Bagel extends Logging {
+ val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
+
+ /**
+ * Runs a Bagel program.
+ * @param sc [[spark.SparkContext]] to use for the program.
+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
+ * the vertex id.
+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
+ * empty array, i.e. sc.parallelize(Array[K, Message]()).
+ * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
+ * message before sending (which often involves network I/O).
+ * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
+ * and provides the result to each vertex in the next superstep.
+ * @param partitioner [[spark.Partitioner]] partitions values by key
+ * @param numPartitions number of partitions across which to split the graph.
+ * Default is the default parallelism of the SparkContext
+ * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
+ * Defaults to caching in memory.
+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
+ * optional Aggregator and the current superstep,
+ * and returns a set of (Vertex, outgoing Messages) pairs
+ * @tparam K key
+ * @tparam V vertex type
+ * @tparam M message type
+ * @tparam C combiner
+ * @tparam A aggregator
+ * @return an RDD of (K, V) pairs representing the graph after completion of the program
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C: Manifest, A: Manifest](
sc: SparkContext,
@@ -14,7 +43,8 @@ object Bagel extends Logging {
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
- numPartitions: Int
+ numPartitions: Int,
+ storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)] = {
@@ -32,8 +62,9 @@ object Bagel extends Logging {
val combinedMsgs = msgs.combineByKey(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
+ val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
- comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
+ comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
@@ -50,6 +81,7 @@ object Bagel extends Logging {
verts
}
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -59,12 +91,29 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ partitioner: Partitioner,
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, partitioner, numPartitions)(
+ sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}
+ /**
+ * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
+ * and default storage level
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -73,13 +122,29 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, part, numPartitions)(
+ sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}
+ /**
+ * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
+ * [[spark.bagel.DefaultCombiner]] and the default storage level
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -87,10 +152,24 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /**
+ * Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
+ * and [[spark.bagel.DefaultCombiner]]
+ */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[Array[M]], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, Array[M], Nothing](
- sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
+ sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, Array[M]](compute))
}
@@ -117,7 +196,8 @@ object Bagel extends Logging {
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
- compute: (V, Option[C]) => (V, Array[M])
+ compute: (V, Option[C]) => (V, Array[M]),
+ storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
@@ -135,7 +215,7 @@ object Bagel extends Logging {
numActiveVerts += 1
Some((newVert, newMsgs))
- }.cache
+ }.persist(storageLevel)
// Force evaluation of processed RDD for accurate performance measurements
processed.foreach(x => {})
@@ -166,6 +246,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
+/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)
View
19 bagel/src/test/scala/bagel/BagelSuite.scala
@@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
import scala.collection.mutable.ArrayBuffer
import spark._
+import storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
@@ -22,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
}
test("halting by voting") {
@@ -79,4 +81,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
}
}
+
+ test("using non-default persistence level") {
+ failAfter(10 seconds) {
+ sc = new SparkContext("local", "test")
+ val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
+ val msgs = sc.parallelize(Array[(String, TestMessage)]())
+ val numSupersteps = 50
+ val result =
+ Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
+ (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
+ (new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
+ }
+ for ((id, vert) <- result.collect) {
+ assert(vert.age === numSupersteps)
+ }
+ }
+ }
}
View
10 bin/spark-daemon.sh
@@ -30,7 +30,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##
-usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>"
+usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
# if no args specified, show usage
if [ $# -le 1 ]; then
@@ -48,6 +48,8 @@ startStop=$1
shift
command=$1
shift
+instance=$1
+shift
spark_rotate_log ()
{
@@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then
fi
# some variables
-export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log
+export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log
export SPARK_ROOT_LOGGER="INFO,DRFA"
-log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out
-pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid
+log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out
+pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid
# Set default scheduling priority
if [ "$SPARK_NICENESS" = "" ]; then
View
2  bin/spark-daemons.sh
@@ -2,7 +2,7 @@
# Run a Spark command on all slave hosts.
-usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
+usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
# if no args specified, show usage
if [ $# -le 1 ]; then
View
2  bin/start-master.sh
@@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
fi
fi
-"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
+"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
View
5 bin/start-slave.sh
@@ -6,9 +6,10 @@ bin=`cd "$bin"; pwd`
# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
- if [[ `hostname` == *ec2.internal ]]; then
+ # NOTE: ec2-metadata is installed on Amazon Linux AMI. Check based on that and hostname
+ if command -v ec2-metadata > /dev/null || [[ `hostname` == *ec2.internal ]]; then
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
-"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
+"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@"
View
11 bin/start-slaves.sh
@@ -21,4 +21,13 @@ fi
echo "Master IP: $SPARK_MASTER_IP"
# Launch the slaves
-exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+ exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+else
+ if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
+ SPARK_WORKER_WEBUI_PORT=8081
+ fi
+ for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+ "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 )) spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
+ done
+fi
View
2  bin/stop-master.sh
@@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemon.sh stop spark.deploy.master.Master
+"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1
View
12 bin/stop-slaves.sh
@@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
-"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker
+if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
+ . "${SPARK_CONF_DIR}/spark-env.sh"
+fi
+
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+ "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1
+else
+ for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+ "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 ))
+ done
+fi
View
1  conf/spark-env.sh.template
@@ -12,6 +12,7 @@
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
+# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
#
# Finally, Spark also relies on the following variables, but these can be set
# on just the *master* (i.e. in your driver program), and will automatically
View
92 core/pom.xml
@@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -32,8 +32,8 @@
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
- <groupId>asm</groupId>
- <artifactId>asm-all</artifactId>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -73,7 +73,7 @@
</dependency>
<dependency>
<groupId>cc.spray</groupId>
- <artifactId>spray-json_${scala.version}</artifactId>
+ <artifactId>spray-json_2.9.2</artifactId>
</dependency>
<dependency>
<groupId>org.tomdz.twirl</groupId>
@@ -81,14 +81,27 @@
</dependency>
<dependency>
<groupId>com.github.scala-incubator.io</groupId>
- <artifactId>scala-io-file_${scala.version}</artifactId>
+ <artifactId>scala-io-file_2.9.2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
<dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<scope>test</scope>
@@ -275,5 +288,72 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ <source>src/hadoop2-yarn/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-scala-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
View
3  core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -4,4 +4,7 @@ trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
+ jobId, isMap, taskId, attemptId)
}
View
3  core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -6,4 +6,7 @@ trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
+ jobId, isMap, taskId, attemptId)
}
View
23 core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
@@ -0,0 +1,23 @@
+package spark.deploy
+import org.apache.hadoop.conf.Configuration
+
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+object SparkHadoopUtil {
+
+ def getUserNameFromEnvironment(): String = {
+ // defaulting to -D ...
+ System.getProperty("user.name")
+ }
+
+ def runAsUser(func: (Product) => Unit, args: Product) {
+
+ // Add support, if exists - for now, simply run func !
+ func(args)
+ }
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ def newConfiguration(): Configuration = new Configuration()
+}
View
13 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
@@ -0,0 +1,13 @@
+
+package org.apache.hadoop.mapred
+
+import org.apache.hadoop.mapreduce.TaskType
+
+trait HadoopMapRedUtil {
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
+ new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
+}
View
13 core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
@@ -0,0 +1,13 @@
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+import task.{TaskAttemptContextImpl, JobContextImpl}
+
+trait HadoopMapReduceUtil {
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
+
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
+ new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
+}
View
63 core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -0,0 +1,63 @@
+package spark.deploy
+
+import collection.mutable.HashMap
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import java.security.PrivilegedExceptionAction
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+object SparkHadoopUtil {
+
+ val yarnConf = newConfiguration()
+
+ def getUserNameFromEnvironment(): String = {
+ // defaulting to env if -D is not present ...
+ val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name))
+
+ // If nothing found, default to user we are running as
+ if (retval == null) System.getProperty("user.name") else retval
+ }
+
+ def runAsUser(func: (Product) => Unit, args: Product) {
+ runAsUser(func, args, getUserNameFromEnvironment())
+ }
+
+ def runAsUser(func: (Product) => Unit, args: Product, user: String) {
+
+ // println("running as user " + jobUserName)
+
+ UserGroupInformation.setConfiguration(yarnConf)
+ val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user)
+ appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
+ def run: AnyRef = {
+ func(args)
+ // no return value ...
+ null
+ }
+ })
+ }
+
+ // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
+ def isYarnMode(): Boolean = {
+ val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
+ java.lang.Boolean.valueOf(yarnMode)
+ }
+
+ // Set an env variable indicating we are running in YARN mode.
+ // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ def setYarnMode() {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
+
+ def setYarnMode(env: HashMap[String, String]) {
+ env("SPARK_YARN_MODE") = "true"
+ }
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ // Always create a new config, dont reuse yarnConf.
+ def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+}
View
329 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,329 @@
+package spark.deploy.yarn
+
+import java.net.Socket
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import scala.collection.JavaConversions._
+import spark.{SparkContext, Logging, Utils}
+import org.apache.hadoop.security.UserGroupInformation
+import java.security.PrivilegedExceptionAction
+
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+ def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+ private var rpc: YarnRPC = YarnRPC.create(conf)
+ private var resourceManager: AMRMProtocol = null
+ private var appAttemptId: ApplicationAttemptId = null
+ private var userThread: Thread = null
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ private var yarnAllocator: YarnAllocationHandler = null
+
+ def run() {
+
+ // Initialization
+ val jobUserName = Utils.getUserNameFromEnvironment()
+ logInfo("running as user " + jobUserName)
+
+ // run as user ...
+ UserGroupInformation.setConfiguration(yarnConf)
+ val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName)
+ appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
+ def run: AnyRef = {
+ runImpl()
+ return null
+ }
+ })
+ }
+
+ private def runImpl() {
+
+ appAttemptId = getApplicationAttemptId()
+ resourceManager = registerWithResourceManager()
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+ // Compute number of threads for akka
+ val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+ if (minimumMemory > 0) {
+ val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ }
+ }
+
+ // Workaround until hadoop moves to something which has
+ // https://issues.apache.org/jira/browse/HADOOP-8406
+ // ignore result
+ // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+ // Hence args.workerCores = numCore disabled above. Any better option ?
+ // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+
+ ApplicationMaster.register(this)
+ // Start the user's JAR
+ userThread = startUserClass()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ waitForSparkMaster()
+
+ // Allocate all containers
+ allocateWorkers()
+
+ // Wait for the user class to Finish
+ userThread.join()
+
+ // Finish the ApplicationMaster
+ finishApplicationMaster()
+ // TODO: Exit based on success/failure
+ System.exit(0)
+ }
+
+ private def getApplicationAttemptId(): ApplicationAttemptId = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+ return appAttemptId
+ }
+
+ private def registerWithResourceManager(): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+ logInfo("Registering the ApplicationMaster")
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(appAttemptId)
+ // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ // What do we provide here ? Might make sense to expose something sensible later ?
+ appMasterRequest.setTrackingUrl("")
+ return resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+ private def waitForSparkMaster() {
+ logInfo("Waiting for spark driver to be reachable.")
+ var driverUp = false
+ while(!driverUp) {
+ val driverHost = System.getProperty("spark.driver.host")
+ val driverPort = System.getProperty("spark.driver.port")
+ try {
+ val socket = new Socket(driverHost, driverPort.toInt)
+ socket.close()
+ logInfo("Master now available: " + driverHost + ":" + driverPort)
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+ Thread.sleep(100)
+ }
+ }
+ }
+
+ private def startUserClass(): Thread = {
+ logInfo("Starting the user JAR in a separate Thread")
+ val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
+ .getMethod("main", classOf[Array[String]])
+ val t = new Thread {
+ override def run() {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ mainMethod.invoke(null, mainArgs)
+ }
+ }
+ t.start()
+ return t
+ }
+
+ private def allocateWorkers() {
+ logInfo("Waiting for spark context initialization")
+
+ try {
+ var sparkContext: SparkContext = null
+ ApplicationMaster.sparkContextRef.synchronized {
+ var count = 0
+ while (ApplicationMaster.sparkContextRef.get() == null) {
+ logInfo("Waiting for spark context initialization ... " + count)
+ count = count + 1
+ ApplicationMaster.sparkContextRef.wait(10000L)
+ }
+ sparkContext = ApplicationMaster.sparkContextRef.get()
+ assert(sparkContext != null)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ }
+
+
+ logInfo("Allocating " + args.numWorkers + " workers.")
+ // Wait until all containers have finished
+ // TODO: This is a bit ugly. Can we make it nicer?
+ // TODO: Handle container failure
+ while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
+ // If user thread exists, then quit !
+ userThread.isAlive) {
+
+ this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ ApplicationMaster.incrementAllocatorLoop(1)
+ Thread.sleep(100)
+ }
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ logInfo("All workers have launched.")
+
+ // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+ if (userThread.isAlive){
+ // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+ val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ // must be <= timeoutInterval/ 2.
+ // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+ // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ launchReporterThread(interval)
+ }
+ }
+
+ // TODO: We might want to extend this to allocate more containers in case they die !
+ private def launchReporterThread(_sleepTime: Long): Thread = {
+ val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+ val t = new Thread {
+ override def run() {
+ while (userThread.isAlive){
+ val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+ if (missingWorkerCount > 0) {
+ logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingWorkerCount)
+ }
+ else sendProgress()
+ Thread.sleep(sleepTime)
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ return t
+ }
+
+ private def sendProgress() {
+ logDebug("Sending progress")
+ // simulated with an allocate request with no nodes requested ...
+ yarnAllocator.allocateContainers(0)
+ }
+
+ /*
+ def printContainers(containers: List[Container]) = {
+ for (container <- containers) {
+ logInfo("Launching shell command on a new container."
+ + ", containerId=" + container.getId()
+ + ", containerNode=" + container.getNodeId().getHost()
+ + ":" + container.getNodeId().getPort()
+ + ", containerNodeURI=" + container.getNodeHttpAddress()
+ + ", containerState" + container.getState()
+ + ", containerResourceMemory"
+ + container.getResource().getMemory())
+ }
+ }
+ */
+
+ def finishApplicationMaster() {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ // TODO: Check if the application has failed or succeeded
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+}
+
+object ApplicationMaster {
+ // number of times to wait for the allocator loop to complete.
+ // each loop iteration waits for 100ms, so maximum of 3 seconds.
+ // This is to ensure that we have reasonable number of containers before we start
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more
+ // containers are available. Might need to handle this better.
+ private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ def incrementAllocatorLoop(by: Int) {
+ val count = yarnAllocatorLoop.getAndAdd(by)
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
+ yarnAllocatorLoop.synchronized {
+ // to wake threads off wait ...
+ yarnAllocatorLoop.notifyAll()
+ }
+ }
+ }
+
+ private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+
+ def register(master: ApplicationMaster) {
+ applicationMasters.add(master)
+ }
+
+ val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
+ val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+
+ def sparkContextInitialized(sc: SparkContext): Boolean = {
+ var modified = false
+ sparkContextRef.synchronized {
+ modified = sparkContextRef.compareAndSet(null, sc)
+ sparkContextRef.notifyAll()
+ }
+
+ // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
+ // Should not really have to do this, but it helps yarn to evict resources earlier.
+ // not to mention, prevent Client declaring failure even though we exit'ed properly.
+ if (modified) {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
+ logInfo("Adding shutdown hook for context " + sc)
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ // best case ...
+ for (master <- applicationMasters) master.finishApplicationMaster
+ }
+ } )
+ }
+
+ // Wait for initialization to complete and atleast 'some' nodes can get allocated
+ yarnAllocatorLoop.synchronized {
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
+ yarnAllocatorLoop.wait(1000L)
+ }
+ }
+ modified
+ }
+
+ def main(argStrings: Array[String]) {
+ val args = new ApplicationMasterArguments(argStrings)
+ new ApplicationMaster(args).run()
+ }
+}
View
77 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,77 @@
+package spark.deploy.yarn
+
+import spark.util.IntParam
+import collection.mutable.ArrayBuffer
+
+class ApplicationMasterArguments(val args: Array[String]) {
+ var userJar: String = null
+ var userClass: String = null
+ var userArgs: Seq[String] = Seq[String]()
+ var workerMemory = 1024
+ var workerCores = 1
+ var numWorkers = 2
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ val userArgsBuffer = new ArrayBuffer[String]()
+
+ var args = inputArgs
+
+ while (! args.isEmpty) {
+
+ args match {
+ case ("--jar") :: value :: tail =>
+ userJar = value
+ args = tail
+
+ case ("--class") :: value :: tail =>
+ userClass = value
+ args = tail
+
+ case ("--args") :: value :: tail =>
+ userArgsBuffer += value
+ args = tail
+
+ case ("--num-workers") :: IntParam(value) :: tail =>
+ numWorkers = value
+ args = tail
+
+ case ("--worker-memory") :: IntParam(value) :: tail =>
+ workerMemory = value
+ args = tail
+
+ case ("--worker-cores") :: IntParam(value) :: tail =>
+ workerCores = value
+ args = tail
+
+ case Nil =>
+ if (userJar == null || userClass == null) {
+ printUsageAndExit(1)
+ }
+
+ case _ =>
+ printUsageAndExit(1, args)
+ }
+ }
+
+ userArgs = userArgsBuffer.readOnly
+ }
+
+ def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ if (unknownParam != null) {
+ System.err.println("Unknown/unsupported param " + unknownParam)
+ }
+ System.err.println(
+ "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" +
+ "Options:\n" +
+ " --jar JAR_PATH Path to your application's JAR file (required)\n" +
+ " --class CLASS_NAME Name of your application's main class (required)\n" +
+ " --args ARGS Arguments to be passed to your application's main class.\n" +
+ " Mutliple invocations are possible, each will be passed in order.\n" +
+ " --num-workers NUM Number of workers to start (Default: 2)\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
+ " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
+ System.exit(exitCode)
+ }
+}
View
272 core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -0,0 +1,272 @@
+package spark.deploy.yarn
+
+import java.net.{InetSocketAddress, URI}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.client.YarnClientImpl
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import spark.{Logging, Utils}
+import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import spark.deploy.SparkHadoopUtil
+
+class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+
+ def this(args: ClientArguments) = this(new Configuration(), args)
+
+ var rpc: YarnRPC = YarnRPC.create(conf)
+ val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ def run() {
+ init(yarnConf)
+ start()
+ logClusterResourceDetails()
+
+ val newApp = super.getNewApplication()
+ val appId = newApp.getApplicationId()
+
+ verifyClusterResources(newApp)
+ val appContext = createApplicationSubmissionContext(appId)
+ val localResources = prepareLocalResources(appId, "spark")
+ val env = setupLaunchEnv(localResources)
+ val amContainer = createContainerLaunchContext(newApp, localResources, env)
+
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(amContainer)
+ appContext.setUser(args.amUser)
+
+ submitApp(appContext)
+
+ monitorApplication(appId)
+ System.exit(0)
+ }
+
+
+ def logClusterResourceDetails() {
+ val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+ logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
+
+ val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
+ logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
+ ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
+ }
+
+
+ def verifyClusterResources(app: GetNewApplicationResponse) = {
+ val maxMem = app.getMaximumResourceCapability().getMemory()
+ logInfo("Max mem capabililty of resources in this cluster " + maxMem)
+
+ // If the cluster does not have enough memory resources, exit.
+ val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory
+ if (requestedMem > maxMem) {
+ logError("Cluster cannot satisfy memory resource request of " + requestedMem)
+ System.exit(1)
+ }
+ }
+
+ def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
+ logInfo("Setting up application submission context for ASM")
+ val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+ appContext.setApplicationId(appId)
+ appContext.setApplicationName("Spark")
+ return appContext
+ }
+
+ def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = {
+ logInfo("Preparing Local resources")
+ val locaResources = HashMap[String, LocalResource]()
+ // Upload Spark and the application JAR to the remote file system
+ // Add them as local resources to the AM
+ val fs = FileSystem.get(conf)
+ Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
+ .foreach { case(destName, _localPath) =>
+ val localPath: String = if (_localPath != null) _localPath.trim() else ""
+ if (! localPath.isEmpty()) {
+ val src = new Path(localPath)
+ val pathSuffix = appName + "/" + appId.getId() + destName
+ val dst = new Path(fs.getHomeDirectory(), pathSuffix)
+ logInfo("Uploading " + src + " to " + dst)
+ fs.copyFromLocalFile(false, true, src, dst)
+ val destStatus = fs.getFileStatus(dst)
+
+ val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ amJarRsrc.setType(LocalResourceType.FILE)
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst))
+ amJarRsrc.setTimestamp(destStatus.getModificationTime())
+ amJarRsrc.setSize(destStatus.getLen())
+ locaResources(destName) = amJarRsrc
+ }
+ }
+ return locaResources
+ }
+
+ def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
+ logInfo("Setting up the launch environment")
+ val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
+
+ val env = new HashMap[String, String]()
+ Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
+
+ // If log4j present, ensure ours overrides all others
+ if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
+
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
+ Client.populateHadoopClasspath(yarnConf, env)
+ SparkHadoopUtil.setYarnMode(env)
+ env("SPARK_YARN_JAR_PATH") =
+ localResources("spark.jar").getResource().getScheme.toString() + "://" +
+ localResources("spark.jar").getResource().getFile().toString()
+ env("SPARK_YARN_JAR_TIMESTAMP") = localResources("spark.jar").getTimestamp().toString()
+ env("SPARK_YARN_JAR_SIZE") = localResources("spark.jar").getSize().toString()
+
+ env("SPARK_YARN_USERJAR_PATH") =
+ localResources("app.jar").getResource().getScheme.toString() + "://" +
+ localResources("app.jar").getResource().getFile().toString()
+ env("SPARK_YARN_USERJAR_TIMESTAMP") = localResources("app.jar").getTimestamp().toString()
+ env("SPARK_YARN_USERJAR_SIZE") = localResources("app.jar").getSize().toString()
+
+ if (log4jConfLocalRes != null) {
+ env("SPARK_YARN_LOG4J_PATH") =
+ log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
+ env("SPARK_YARN_LOG4J_TIMESTAMP") = log4jConfLocalRes.getTimestamp().toString()
+ env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
+ }
+
+ // Add each SPARK-* key to the environment
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ return env
+ }
+
+ def userArgsToString(clientArgs: ClientArguments): String = {
+ val prefix = " --args "
+ val args = clientArgs.userArgs
+ val retval = new StringBuilder()
+ for (arg <- args){
+ retval.append(prefix).append(" '").append(arg).append("' ")
+ }
+
+ retval.toString
+ }
+
+ def createContainerLaunchContext(newApp: GetNewApplicationResponse,
+ localResources: HashMap[String, LocalResource],
+ env: HashMap[String, String]): ContainerLaunchContext = {
+ logInfo("Setting up container launch context")
+ val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+ amContainer.setLocalResources(localResources)
+ amContainer.setEnvironment(env)
+
+ val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+
+ var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
+ (if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD
+
+ // Extra options for the JVM
+ var JAVA_OPTS = ""
+
+ // Add Xmx for am memory
+ JAVA_OPTS += "-Xmx" + amMemory + "m "
+
+ // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
+ // node, spark gc effects all other containers performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
+ // limited to subset of cores on a node.
+ if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) {
+ // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines
+ JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+ JAVA_OPTS += " -XX:+CMSIncrementalMode "
+ JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+ if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+ JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+ }
+
+ // Command for the ApplicationMaster
+ val commands = List[String]("java " +
+ " -server " +
+ JAVA_OPTS +
+ " spark.deploy.yarn.ApplicationMaster" +
+ " --class " + args.userClass +
+ " --jar " + args.userJar +
+ userArgsToString(args) +
+ " --worker-memory " + args.workerMemory +
+ " --worker-cores " + args.workerCores +
+ " --num-workers " + args.numWorkers +
+ " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ logInfo("Command for the ApplicationMaster: " + commands(0))
+ amContainer.setCommands(commands)
+
+ val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
+ // Memory for the ApplicationMaster
+ capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ amContainer.setResource(capability)
+
+ return amContainer
+ }
+
+ def submitApp(appContext: ApplicationSubmissionContext) = {
+ // Submit the application to the applications manager
+ logInfo("Submitting application to ASM")
+ super.submitApplication(appContext)
+ }
+
+ def monitorApplication(appId: ApplicationId): Boolean = {
+ while(true) {
+ Thread.sleep(1000)
+ val report = super.getApplicationReport(appId)
+
+ logInfo("Application report from ASM: \n" +
+ "\t application identifier: " + appId.toString() + "\n" +
+ "\t appId: " + appId.getId() + "\n" +
+ "\t clientToken: " + report.getClientToken() + "\n" +
+ "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
+ "\t appMasterHost: " + report.getHost() + "\n" +
+ "\t appQueue: " + report.getQueue() + "\n" +
+ "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+ "\t appStartTime: " + report.getStartTime() + "\n" +
+ "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
+ "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
+ "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
+ "\t appUser: " + report.getUser()
+ )
+
+ val state = report.getYarnApplicationState()
+ val dsStatus = report.getFinalApplicationStatus()
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ return true
+ }
+ }
+ return true
+ }
+}
+
+object Client {
+ def main(argStrings: Array[String]) {
+ val args = new ClientArguments(argStrings)
+ SparkHadoopUtil.setYarnMode()
+ new Client(args).run
+ }
+
+ // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
+ def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
+ for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+ }
+ }
+}
View
105 core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -0,0 +1,105 @@
+package spark.deploy.yarn
+
+import spark.util.MemoryParam
+import spark.util.IntParam
+import collection.mutable.{ArrayBuffer, HashMap}
+import spark.scheduler.{InputFormatInfo, SplitInfo}
+
+// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
+class ClientArguments(val args: Array[String]) {
+ var userJar: String = null
+ var userClass: String = null
+ var userArgs: Seq[String] = Seq[String]()
+ var workerMemory = 1024
+ var workerCores = 1
+ var numWorkers = 2
+ var amUser = System.getProperty("user.name")
+ var amQueue = System.getProperty("QUEUE", "default")
+ var amMemory: Int = 512
+ // TODO
+ var inputFormatInfo: List[InputFormatInfo] = null
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
+ val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
+
+ var args = inputArgs
+
+ while (! args.isEmpty) {
+
+ args match {
+ case ("--jar") :: value :: tail =>
+ userJar = value
+ args = tail
+
+ case ("--class") :: value :: tail =>
+ userClass = value
+ args = tail
+
+ case ("--args") :: value :: tail =>
+ userArgsBuffer += value
+ args = tail
+
+ case ("--master-memory") :: MemoryParam(value) :: tail =>
+ amMemory = value
+ args = tail
+
+ case ("--num-workers") :: IntParam(value) :: tail =>
+ numWorkers = value
+ args = tail
+
+ case ("--worker-memory") :: MemoryParam(value) :: tail =>
+ workerMemory = value
+ args = tail
+
+ case ("--worker-cores") :: IntParam(value) :: tail =>
+ workerCores = value
+ args = tail
+
+ case ("--user") :: value :: tail =>
+ amUser = value
+ args = tail
+
+ case ("--queue") :: value :: tail =>
+ amQueue = value
+ args = tail
+
+ case Nil =>
+ if (userJar == null || userClass == null) {
+ printUsageAndExit(1)
+ }
+
+ case _ =>
+ printUsageAndExit(1, args)
+ }
+ }
+
+ userArgs = userArgsBuffer.readOnly
+ inputFormatInfo = inputFormatMap.values.toList
+ }
+
+
+ def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ if (unknownParam != null) {
+ System.err.println("Unknown/unsupported param " + unknownParam)
+ }
+ System.err.println(
+ "Usage: spark.deploy.yarn.Client [options] \n" +
+ "Options:\n" +
+ " --jar JAR_PATH Path to your application's JAR file (required)\n" +
+ " --class CLASS_NAME Name of your application's main class (required)\n" +
+ " --args ARGS Arguments to be passed to your application's main class.\n" +
+ " Mutliple invocations are possible, each will be passed in order.\n" +
+ " --num-workers NUM Number of workers to start (Default: 2)\n" +
+ " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+ " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+ " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
+ " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n"
+ )
+ System.exit(exitCode)
+ }
+
+}
View
171 core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -0,0 +1,171 @@
+package spark.deploy.yarn
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
+import spark.{Logging, Utils}
+
+class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
+ slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
+ extends Runnable with Logging {
+
+ var rpc: YarnRPC = YarnRPC.create(conf)
+ var cm: ContainerManager = null
+ val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ def run = {
+ logInfo("Starting Worker Container")
+ cm = connectToCM
+ startContainer
+ }
+
+ def startContainer = {
+ logInfo("Setting up ContainerLaunchContext")
+
+ val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+ .asInstanceOf[ContainerLaunchContext]
+
+ ctx.setContainerId(container.getId())
+ ctx.setResource(container.getResource())
+ val localResources = prepareLocalResources
+ ctx.setLocalResources(localResources)
+
+ val env = prepareEnvironment
+ ctx.setEnvironment(env)
+
+ // Extra options for the JVM
+ var JAVA_OPTS = ""
+ // Set the JVM memory
+ val workerMemoryString = workerMemory + "m"
+ JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+ if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+ JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+ }
+ // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
+ // node, spark gc effects all other containers performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
+ // limited to subset of cores on a node.
+/*
+ else {
+ // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+ // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it.
+ // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines
+ // The options are based on
+ // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+ JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+ JAVA_OPTS += " -XX:+CMSIncrementalMode "
+ JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+*/
+
+ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ val commands = List[String]("java " +
+ " -server " +
+ // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+ // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
+ // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
+ " -XX:OnOutOfMemoryError='kill %p' " +
+ JAVA_OPTS +
+ " spark.executor.StandaloneExecutorBackend " +
+ masterAddress + " " +
+ slaveId + " " +
+ hostname + " " +
+ workerCores +
+ " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ logInfo("Setting up worker with commands: " + commands)
+ ctx.setCommands(commands)
+
+ // Send the start request to the ContainerManager
+ val startReq = Records.newRecord(classOf[StartContainerRequest])
+ .asInstanceOf[StartContainerRequest]
+ startReq.setContainerLaunchContext(ctx)
+ cm.startContainer(startReq)
+ }
+
+
+ def prepareLocalResources: HashMap[String, LocalResource] = {
+ logInfo("Preparing Local resources")
+ val locaResources = HashMap[String, LocalResource]()
+
+ // Spark JAR
+ val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ sparkJarResource.setType(LocalResourceType.FILE)
+ sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
+ new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
+ sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
+ sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
+ locaResources("spark.jar") = sparkJarResource
+ // User JAR
+ val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ userJarResource.setType(LocalResourceType.FILE)
+ userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
+ new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
+ userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
+ userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
+ locaResources("app.jar") = userJarResource
+
+ // Log4j conf - if available
+ if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
+ val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ log4jConfResource.setType(LocalResourceType.FILE)
+ log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
+ new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
+ log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
+ log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
+ locaResources("log4j.properties") = log4jConfResource
+ }
+
+
+ logInfo("Prepared Local resources " + locaResources)
+ return locaResources
+ }
+
+ def prepareEnvironment: HashMap[String, String] = {
+ val env = new HashMap[String, String]()
+ // should we add this ?
+ Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
+
+ // If log4j present, ensure ours overrides all others
+ if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
+ // Which is correct ?
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
+ }
+
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
+ Client.populateHadoopClasspath(yarnConf, env)
+
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ return env
+ }
+
+ def connectToCM: ContainerManager = {
+ val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+ val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+ logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+ return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
+ }
+
+}
View
547 core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -0,0 +1,547 @@
+package spark.deploy.yarn
+
+import spark.{Logging, Utils}
+import spark.scheduler.SplitInfo
+import scala.collection
+import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
+import spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.util.{RackResolver, Records}
+import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.hadoop.yarn.api.AMRMProtocol
+import collection.JavaConversions._
+import collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import org.apache.hadoop.conf.Configuration
+import java.util.{Collections, Set => JSet}
+import java.lang.{Boolean => JBoolean}
+
+object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
+ type AllocationType = Value
+ val HOST, RACK, ANY = Value
+}
+
+// too many params ? refactor it 'somehow' ?
+// needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive : should make it
+// more proactive and decoupled.
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
+// on how we are requesting for containers.
+private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceManager: AMRMProtocol,
+ val appAttemptId: ApplicationAttemptId,
+ val maxWorkers: Int, val workerMemory: Int, val workerCores: Int,
+ val preferredHostToCount: Map[String, Int],
+ val preferredRackToCount: Map[String, Int])
+ extends Logging {
+
+
+ // These three are locked on allocatedHostToContainersMap. Complementary data structures
+ // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+ // allocatedContainerToHostMap: container to host mapping
+ private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]()
+ private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+ // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an allocated node)
+ // As with the two data structures above, tightly coupled with them, and to be locked on allocatedHostToContainersMap
+ private val allocatedRackCount = new HashMap[String, Int]()
+
+ // containers which have been released.
+ private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
+ // containers to be released in next request to RM
+ private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+ private val numWorkersRunning = new AtomicInteger()
+ // Used to generate a unique id per worker
+ private val workerIdCounter = new AtomicInteger()
+ private val lastResponseId = new AtomicInteger()
+
+ def getNumWorkersRunning: Int = numWorkersRunning.intValue
+
+
+ def isResourceConstraintSatisfied(container: Container): Boolean = {
+ container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ }
+
+ def allocateContainers(workersToRequest: Int) {
+ // We need to send the request only once from what I understand ... but for now, not modifying this much.
+
+ // Keep polling the Resource Manager for containers
+ val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+
+ val _allocatedContainers = amResp.getAllocatedContainers()
+ if (_allocatedContainers.size > 0) {
+
+
+ logDebug("Allocated " + _allocatedContainers.size + " containers, current count " +
+ numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
+ ", pendingReleaseContainers : " + pendingReleaseContainers)
+ logDebug("Cluster Resources: " + amResp.getAvailableResources)
+
+ val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ // ignore if not satisfying constraints {
+ for (container <- _allocatedContainers) {
+ if (isResourceConstraintSatisfied(container)) {
+ // allocatedContainers += container
+
+ val host = container.getNodeId.getHost
+ val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
+
+ containers += container
+ }
+ // Add all ignored containers to released list
+ else releasedContainerList.add(container.getId())
+ }
+
+ // Find the appropriate containers to use
+ // Slightly non trivial groupBy I guess ...
+ val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ for (candidateHost <- hostToContainers.keySet)
+ {
+ val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+ val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+ var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
+ assert(remainingContainers != null)
+
+ if (requiredHostCount >= remainingContainers.size){
+ // Since we got <= required containers, add all to dataLocalContainers
+ dataLocalContainers.put(candidateHost, remainingContainers)
+ // all consumed
+ remainingContainers =