Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

release commit

commit 9d91adbdbde22e91779b91eb40805f598da5b004 0 parents
@nathanmarz authored
Showing with 39,203 additions and 0 deletions.
  1. +21 −0 .gitignore
  2. +261 −0 LICENSE.html
  3. +26 −0 README.markdown
  4. +178 −0 TODO
  5. +41 −0 bin/build_release.sh
  6. +29 −0 bin/install_zmq.sh
  7. +2 −0  bin/javadoc.sh
  8. +80 −0 bin/storm
  9. +64 −0 conf/defaults.yaml
  10. +10 −0 conf/log4j.properties
  11. +16 −0 conf/storm.yaml.example
  12. +9 −0 log4j/log4j.properties
  13. +32 −0 project.clj
  14. +26 −0 src/clj/backtype/storm/LocalCluster.clj
  15. +38 −0 src/clj/backtype/storm/bootstrap.clj
  16. +71 −0 src/clj/backtype/storm/clojure.clj
  17. +422 −0 src/clj/backtype/storm/cluster.clj
  18. +9 −0 src/clj/backtype/storm/command/config_value.clj
  19. +13 −0 src/clj/backtype/storm/command/kill_topology.clj
  20. +15 −0 src/clj/backtype/storm/command/shell_submission.clj
  21. +139 −0 src/clj/backtype/storm/config.clj
  22. +96 −0 src/clj/backtype/storm/daemon/common.clj
  23. +81 −0 src/clj/backtype/storm/daemon/drpc.clj
  24. +638 −0 src/clj/backtype/storm/daemon/nimbus.clj
  25. +399 −0 src/clj/backtype/storm/daemon/supervisor.clj
  26. +498 −0 src/clj/backtype/storm/daemon/task.clj
  27. +254 −0 src/clj/backtype/storm/daemon/worker.clj
  28. +56 −0 src/clj/backtype/storm/event.clj
  29. +11 −0 src/clj/backtype/storm/log.clj
  30. +33 −0 src/clj/backtype/storm/process_simulator.clj
  31. +306 −0 src/clj/backtype/storm/stats.clj
  32. +432 −0 src/clj/backtype/storm/testing.clj
  33. +168 −0 src/clj/backtype/storm/thrift.clj
  34. +12 −0 src/clj/backtype/storm/tuple.clj
  35. +682 −0 src/clj/backtype/storm/ui/core.clj
  36. +115 −0 src/clj/backtype/storm/ui/helpers.clj
  37. +499 −0 src/clj/backtype/storm/util.clj
  38. +110 −0 src/clj/backtype/storm/zookeeper.clj
  39. +93 −0 src/clj/zilch/mq.clj
  40. +96 −0 src/clj/zilch/virtual_port.clj
  41. +1 −0  src/dev/resources/storm.fy
  42. +1 −0  src/dev/resources/storm.py
  43. +1 −0  src/dev/resources/storm.rb
  44. +10 −0 src/dev/resources/tester.fy
  45. +8 −0 src/dev/resources/tester.py
  46. +10 −0 src/dev/resources/tester.rb
  47. +6 −0 src/genthrift.sh
  48. +299 −0 src/jvm/backtype/storm/Config.java
  49. +6 −0 src/jvm/backtype/storm/Constants.java
  50. +16 −0 src/jvm/backtype/storm/ILocalCluster.java
  51. +96 −0 src/jvm/backtype/storm/StormSubmitter.java
  52. +94 −0 src/jvm/backtype/storm/clojure/ClojureBolt.java
  53. +5 −0 src/jvm/backtype/storm/daemon/Shutdownable.java
  54. +25 −0 src/jvm/backtype/storm/drpc/DRPCScheme.java
  55. +48 −0 src/jvm/backtype/storm/drpc/ReturnResults.java
  56. +6 −0 src/jvm/backtype/storm/drpc/SpoutAdder.java
  57. +320 −0 src/jvm/backtype/storm/generated/AlreadyAliveException.java
  58. +559 −0 src/jvm/backtype/storm/generated/Bolt.java
  59. +747 −0 src/jvm/backtype/storm/generated/BoltStats.java
  60. +583 −0 src/jvm/backtype/storm/generated/ClusterSummary.java
  61. +455 −0 src/jvm/backtype/storm/generated/ComponentCommon.java
  62. +301 −0 src/jvm/backtype/storm/generated/ComponentObject.java
  63. +1,566 −0 src/jvm/backtype/storm/generated/DistributedRPC.java
  64. +412 −0 src/jvm/backtype/storm/generated/ErrorInfo.java
  65. +406 −0 src/jvm/backtype/storm/generated/GlobalStreamId.java
  66. +460 −0 src/jvm/backtype/storm/generated/Grouping.java
  67. +320 −0 src/jvm/backtype/storm/generated/InvalidTopologyException.java
  68. +7,967 −0 src/jvm/backtype/storm/generated/Nimbus.java
  69. +320 −0 src/jvm/backtype/storm/generated/NotAliveException.java
  70. +219 −0 src/jvm/backtype/storm/generated/NullStruct.java
  71. +407 −0 src/jvm/backtype/storm/generated/ShellComponent.java
  72. +509 −0 src/jvm/backtype/storm/generated/SpoutSpec.java
  73. +744 −0 src/jvm/backtype/storm/generated/SpoutStats.java
  74. +417 −0 src/jvm/backtype/storm/generated/StateSpoutSpec.java
  75. +648 −0 src/jvm/backtype/storm/generated/StormTopology.java
  76. +449 −0 src/jvm/backtype/storm/generated/StreamInfo.java
  77. +590 −0 src/jvm/backtype/storm/generated/SupervisorSummary.java
  78. +302 −0 src/jvm/backtype/storm/generated/TaskSpecificStats.java
  79. +667 −0 src/jvm/backtype/storm/generated/TaskStats.java
  80. +906 −0 src/jvm/backtype/storm/generated/TaskSummary.java
  81. +640 −0 src/jvm/backtype/storm/generated/TopologyInfo.java
  82. +685 −0 src/jvm/backtype/storm/generated/TopologySummary.java
  83. +8 −0 src/jvm/backtype/storm/planner/CompoundSpout.java
  84. +8 −0 src/jvm/backtype/storm/planner/CompoundTask.java
  85. +16 −0 src/jvm/backtype/storm/planner/TaskBundle.java
  86. +29 −0 src/jvm/backtype/storm/serialization/FieldSerialization.java
  87. +31 −0 src/jvm/backtype/storm/serialization/ISerialization.java
  88. +207 −0 src/jvm/backtype/storm/serialization/SerializationFactory.java
  89. +49 −0 src/jvm/backtype/storm/serialization/TupleDeserializer.java
  90. +58 −0 src/jvm/backtype/storm/serialization/TupleSerializer.java
  91. +49 −0 src/jvm/backtype/storm/serialization/ValuesDeserializer.java
  92. +54 −0 src/jvm/backtype/storm/serialization/ValuesSerializer.java
  93. +71 −0 src/jvm/backtype/storm/spout/ISpout.java
  94. +13 −0 src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
  95. +15 −0 src/jvm/backtype/storm/spout/RawScheme.java
  96. +11 −0 src/jvm/backtype/storm/spout/Scheme.java
  97. +33 −0 src/jvm/backtype/storm/spout/ShellSpout.java
  98. +104 −0 src/jvm/backtype/storm/spout/SpoutOutputCollector.java
  99. +12 −0 src/jvm/backtype/storm/state/IStateSpout.java
  100. +5 −0 src/jvm/backtype/storm/state/IStateSpoutOutputCollector.java
  101. +8 −0 src/jvm/backtype/storm/state/ISubscribedState.java
  102. +7 −0 src/jvm/backtype/storm/state/ISynchronizeOutputCollector.java
  103. +11 −0 src/jvm/backtype/storm/state/StateSpoutOutputCollector.java
  104. +13 −0 src/jvm/backtype/storm/state/SynchronizeOutputCollector.java
  105. +219 −0 src/jvm/backtype/storm/task/CoordinatedBolt.java
  106. +67 −0 src/jvm/backtype/storm/task/IBolt.java
  107. +12 −0 src/jvm/backtype/storm/task/IInternalOutputCollector.java
  108. +15 −0 src/jvm/backtype/storm/task/IOutputCollector.java
  109. +61 −0 src/jvm/backtype/storm/task/KeyedFairBolt.java
  110. +166 −0 src/jvm/backtype/storm/task/OutputCollector.java
  111. +82 −0 src/jvm/backtype/storm/task/OutputCollectorImpl.java
  112. +194 −0 src/jvm/backtype/storm/task/ShellBolt.java
  113. +313 −0 src/jvm/backtype/storm/task/TopologyContext.java
  114. +8 −0 src/jvm/backtype/storm/testing/AckFailDelegate.java
  115. +35 −0 src/jvm/backtype/storm/testing/AckTracker.java
  116. +97 −0 src/jvm/backtype/storm/testing/BoltTracker.java
  117. +75 −0 src/jvm/backtype/storm/testing/FeederSpout.java
  118. +21 −0 src/jvm/backtype/storm/testing/FixedTuple.java
  119. +103 −0 src/jvm/backtype/storm/testing/FixedTupleSpout.java
  120. +94 −0 src/jvm/backtype/storm/testing/SpoutTracker.java
  121. +45 −0 src/jvm/backtype/storm/testing/TestAggregatesCounter.java
  122. +42 −0 src/jvm/backtype/storm/testing/TestGlobalCount.java
  123. +32 −0 src/jvm/backtype/storm/testing/TestPlannerBolt.java
  124. +52 −0 src/jvm/backtype/storm/testing/TestPlannerSpout.java
  125. +43 −0 src/jvm/backtype/storm/testing/TestWordCounter.java
  126. +59 −0 src/jvm/backtype/storm/testing/TestWordSpout.java
  127. +72 −0 src/jvm/backtype/storm/testing/TrackerAggregator.java
  128. +49 −0 src/jvm/backtype/storm/testing/TupleCaptureBolt.java
  129. +36 −0 src/jvm/backtype/storm/topology/BasicBoltExecutor.java
  130. +42 −0 src/jvm/backtype/storm/topology/BasicOutputCollector.java
  131. +11 −0 src/jvm/backtype/storm/topology/IBasicBolt.java
  132. +8 −0 src/jvm/backtype/storm/topology/IBasicOutputCollector.java
  133. +17 −0 src/jvm/backtype/storm/topology/IComponent.java
  134. +12 −0 src/jvm/backtype/storm/topology/IRichBolt.java
  135. +19 −0 src/jvm/backtype/storm/topology/IRichSpout.java
  136. +8 −0 src/jvm/backtype/storm/topology/IRichStateSpout.java
  137. +24 −0 src/jvm/backtype/storm/topology/InputDeclarer.java
  138. +15 −0 src/jvm/backtype/storm/topology/OutputFieldsDeclarer.java
  139. +36 −0 src/jvm/backtype/storm/topology/OutputFieldsGetter.java
  140. +271 −0 src/jvm/backtype/storm/topology/TopologyBuilder.java
  141. +53 −0 src/jvm/backtype/storm/tuple/Fields.java
  142. +81 −0 src/jvm/backtype/storm/tuple/MessageId.java
  143. +144 −0 src/jvm/backtype/storm/tuple/Tuple.java
  144. +16 −0 src/jvm/backtype/storm/tuple/Values.java
  145. +37 −0 src/jvm/backtype/storm/utils/BufferFileInputStream.java
  146. +27 −0 src/jvm/backtype/storm/utils/CRC32OutputStream.java
  147. +40 −0 src/jvm/backtype/storm/utils/DRPCClient.java
  148. +51 −0 src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
  149. +44 −0 src/jvm/backtype/storm/utils/LocalState.java
  150. +44 −0 src/jvm/backtype/storm/utils/NimbusClient.java
  151. +78 −0 src/jvm/backtype/storm/utils/Time.java
  152. +149 −0 src/jvm/backtype/storm/utils/TimeCacheMap.java
  153. +144 −0 src/jvm/backtype/storm/utils/Utils.java
  154. +166 −0 src/jvm/backtype/storm/utils/VersionedStore.java
  155. +358 −0 src/jvm/backtype/storm/utils/WritableUtils.java
  156. +9 −0 src/multilang/fy/storm.fancypack
  157. +163 −0 src/multilang/fy/storm.fy
  158. +143 −0 src/multilang/py/storm.py
  159. +116 −0 src/multilang/rb/storm.rb
  160. 0  src/py/__init__.py
  161. +86 −0 src/py/storm/DistributedRPC-remote
  162. +459 −0 src/py/storm/DistributedRPC.py
  163. +149 −0 src/py/storm/Nimbus-remote
  164. +2,283 −0 src/py/storm/Nimbus.py
  165. +1 −0  src/py/storm/__init__.py
  166. +10 −0 src/py/storm/constants.py
  167. +2,599 −0 src/py/storm/ttypes.py
  168. +174 −0 src/storm.thrift
Sorry, we could not display the entire diff because it was too big.
21 .gitignore
@@ -0,0 +1,21 @@
+/classes
+/lib
+deploy/lib
+deploy/logs
+.emacs-project
+*.jar
+bin/jzmq
+.DS_Store
+pom.xml
+deploy/classes
+*.fyc
+*.rbc
+*.pyc
+CHILD
+CHILDMAKER
+NANNY
+\#project.clj\#
+.\#project.clj
+.lein-failures
+_release
+*.zip
261 LICENSE.html
@@ -0,0 +1,261 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1" />
+<title>Eclipse Public License - Version 1.0</title>
+<style type="text/css">
+ body {
+ size: 8.5in 11.0in;
+ margin: 0.25in 0.5in 0.25in 0.5in;
+ tab-interval: 0.5in;
+ }
+ p {
+ margin-left: auto;
+ margin-top: 0.5em;
+ margin-bottom: 0.5em;
+ }
+ p.list {
+ margin-left: 0.5in;
+ margin-top: 0.05em;
+ margin-bottom: 0.05em;
+ }
+ </style>
+
+</head>
+
+<body lang="EN-US">
+<p>Copyright (c) Nathan Marz. All rights reserved.</p>
+<p align=center><b>Eclipse Public License - v 1.0</b></p>
+
+<p>THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+PUBLIC LICENSE (&quot;AGREEMENT&quot;). ANY USE, REPRODUCTION OR
+DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS
+AGREEMENT.</p>
+
+<p><b>1. DEFINITIONS</b></p>
+
+<p>&quot;Contribution&quot; means:</p>
+
+<p class="list">a) in the case of the initial Contributor, the initial
+code and documentation distributed under this Agreement, and</p>
+<p class="list">b) in the case of each subsequent Contributor:</p>
+<p class="list">i) changes to the Program, and</p>
+<p class="list">ii) additions to the Program;</p>
+<p class="list">where such changes and/or additions to the Program
+originate from and are distributed by that particular Contributor. A
+Contribution 'originates' from a Contributor if it was added to the
+Program by such Contributor itself or anyone acting on such
+Contributor's behalf. Contributions do not include additions to the
+Program which: (i) are separate modules of software distributed in
+conjunction with the Program under their own license agreement, and (ii)
+are not derivative works of the Program.</p>
+
+<p>&quot;Contributor&quot; means any person or entity that distributes
+the Program.</p>
+
+<p>&quot;Licensed Patents&quot; mean patent claims licensable by a
+Contributor which are necessarily infringed by the use or sale of its
+Contribution alone or when combined with the Program.</p>
+
+<p>&quot;Program&quot; means the Contributions distributed in accordance
+with this Agreement.</p>
+
+<p>&quot;Recipient&quot; means anyone who receives the Program under
+this Agreement, including all Contributors.</p>
+
+<p><b>2. GRANT OF RIGHTS</b></p>
+
+<p class="list">a) Subject to the terms of this Agreement, each
+Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free copyright license to reproduce, prepare derivative works
+of, publicly display, publicly perform, distribute and sublicense the
+Contribution of such Contributor, if any, and such derivative works, in
+source code and object code form.</p>
+
+<p class="list">b) Subject to the terms of this Agreement, each
+Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free patent license under Licensed Patents to make, use, sell,
+offer to sell, import and otherwise transfer the Contribution of such
+Contributor, if any, in source code and object code form. This patent
+license shall apply to the combination of the Contribution and the
+Program if, at the time the Contribution is added by the Contributor,
+such addition of the Contribution causes such combination to be covered
+by the Licensed Patents. The patent license shall not apply to any other
+combinations which include the Contribution. No hardware per se is
+licensed hereunder.</p>
+
+<p class="list">c) Recipient understands that although each Contributor
+grants the licenses to its Contributions set forth herein, no assurances
+are provided by any Contributor that the Program does not infringe the
+patent or other intellectual property rights of any other entity. Each
+Contributor disclaims any liability to Recipient for claims brought by
+any other entity based on infringement of intellectual property rights
+or otherwise. As a condition to exercising the rights and licenses
+granted hereunder, each Recipient hereby assumes sole responsibility to
+secure any other intellectual property rights needed, if any. For
+example, if a third party patent license is required to allow Recipient
+to distribute the Program, it is Recipient's responsibility to acquire
+that license before distributing the Program.</p>
+
+<p class="list">d) Each Contributor represents that to its knowledge it
+has sufficient copyright rights in its Contribution, if any, to grant
+the copyright license set forth in this Agreement.</p>
+
+<p><b>3. REQUIREMENTS</b></p>
+
+<p>A Contributor may choose to distribute the Program in object code
+form under its own license agreement, provided that:</p>
+
+<p class="list">a) it complies with the terms and conditions of this
+Agreement; and</p>
+
+<p class="list">b) its license agreement:</p>
+
+<p class="list">i) effectively disclaims on behalf of all Contributors
+all warranties and conditions, express and implied, including warranties
+or conditions of title and non-infringement, and implied warranties or
+conditions of merchantability and fitness for a particular purpose;</p>
+
+<p class="list">ii) effectively excludes on behalf of all Contributors
+all liability for damages, including direct, indirect, special,
+incidental and consequential damages, such as lost profits;</p>
+
+<p class="list">iii) states that any provisions which differ from this
+Agreement are offered by that Contributor alone and not by any other
+party; and</p>
+
+<p class="list">iv) states that source code for the Program is available
+from such Contributor, and informs licensees how to obtain it in a
+reasonable manner on or through a medium customarily used for software
+exchange.</p>
+
+<p>When the Program is made available in source code form:</p>
+
+<p class="list">a) it must be made available under this Agreement; and</p>
+
+<p class="list">b) a copy of this Agreement must be included with each
+copy of the Program.</p>
+
+<p>Contributors may not remove or alter any copyright notices contained
+within the Program.</p>
+
+<p>Each Contributor must identify itself as the originator of its
+Contribution, if any, in a manner that reasonably allows subsequent
+Recipients to identify the originator of the Contribution.</p>
+
+<p><b>4. COMMERCIAL DISTRIBUTION</b></p>
+
+<p>Commercial distributors of software may accept certain
+responsibilities with respect to end users, business partners and the
+like. While this license is intended to facilitate the commercial use of
+the Program, the Contributor who includes the Program in a commercial
+product offering should do so in a manner which does not create
+potential liability for other Contributors. Therefore, if a Contributor
+includes the Program in a commercial product offering, such Contributor
+(&quot;Commercial Contributor&quot;) hereby agrees to defend and
+indemnify every other Contributor (&quot;Indemnified Contributor&quot;)
+against any losses, damages and costs (collectively &quot;Losses&quot;)
+arising from claims, lawsuits and other legal actions brought by a third
+party against the Indemnified Contributor to the extent caused by the
+acts or omissions of such Commercial Contributor in connection with its
+distribution of the Program in a commercial product offering. The
+obligations in this section do not apply to any claims or Losses
+relating to any actual or alleged intellectual property infringement. In
+order to qualify, an Indemnified Contributor must: a) promptly notify
+the Commercial Contributor in writing of such claim, and b) allow the
+Commercial Contributor to control, and cooperate with the Commercial
+Contributor in, the defense and any related settlement negotiations. The
+Indemnified Contributor may participate in any such claim at its own
+expense.</p>
+
+<p>For example, a Contributor might include the Program in a commercial
+product offering, Product X. That Contributor is then a Commercial
+Contributor. If that Commercial Contributor then makes performance
+claims, or offers warranties related to Product X, those performance
+claims and warranties are such Commercial Contributor's responsibility
+alone. Under this section, the Commercial Contributor would have to
+defend claims against the other Contributors related to those
+performance claims and warranties, and if a court requires any other
+Contributor to pay any damages as a result, the Commercial Contributor
+must pay those damages.</p>
+
+<p><b>5. NO WARRANTY</b></p>
+
+<p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
+PROVIDED ON AN &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION,
+ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY
+OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely
+responsible for determining the appropriateness of using and
+distributing the Program and assumes all risks associated with its
+exercise of rights under this Agreement , including but not limited to
+the risks and costs of program errors, compliance with applicable laws,
+damage to or loss of data, programs or equipment, and unavailability or
+interruption of operations.</p>
+
+<p><b>6. DISCLAIMER OF LIABILITY</b></p>
+
+<p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT
+NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
+WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
+DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
+HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.</p>
+
+<p><b>7. GENERAL</b></p>
+
+<p>If any provision of this Agreement is invalid or unenforceable under
+applicable law, it shall not affect the validity or enforceability of
+the remainder of the terms of this Agreement, and without further action
+by the parties hereto, such provision shall be reformed to the minimum
+extent necessary to make such provision valid and enforceable.</p>
+
+<p>If Recipient institutes patent litigation against any entity
+(including a cross-claim or counterclaim in a lawsuit) alleging that the
+Program itself (excluding combinations of the Program with other
+software or hardware) infringes such Recipient's patent(s), then such
+Recipient's rights granted under Section 2(b) shall terminate as of the
+date such litigation is filed.</p>
+
+<p>All Recipient's rights under this Agreement shall terminate if it
+fails to comply with any of the material terms or conditions of this
+Agreement and does not cure such failure in a reasonable period of time
+after becoming aware of such noncompliance. If all Recipient's rights
+under this Agreement terminate, Recipient agrees to cease use and
+distribution of the Program as soon as reasonably practicable. However,
+Recipient's obligations under this Agreement and any licenses granted by
+Recipient relating to the Program shall continue and survive.</p>
+
+<p>Everyone is permitted to copy and distribute copies of this
+Agreement, but in order to avoid inconsistency the Agreement is
+copyrighted and may only be modified in the following manner. The
+Agreement Steward reserves the right to publish new versions (including
+revisions) of this Agreement from time to time. No one other than the
+Agreement Steward has the right to modify this Agreement. The Eclipse
+Foundation is the initial Agreement Steward. The Eclipse Foundation may
+assign the responsibility to serve as the Agreement Steward to a
+suitable separate entity. Each new version of the Agreement will be
+given a distinguishing version number. The Program (including
+Contributions) may always be distributed subject to the version of the
+Agreement under which it was received. In addition, after a new version
+of the Agreement is published, Contributor may elect to distribute the
+Program (including its Contributions) under the new version. Except as
+expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
+rights or licenses to the intellectual property of any Contributor under
+this Agreement, whether expressly, by implication, estoppel or
+otherwise. All rights in the Program not expressly granted under this
+Agreement are reserved.</p>
+
+<p>This Agreement is governed by the laws of the State of New York and
+the intellectual property laws of the United States of America. No party
+to this Agreement will bring a legal action under this Agreement more
+than one year after the cause of action arose. Each party waives its
+rights to a jury trial in any resulting litigation.</p>
+
+</body>
+
+</html>
26 README.markdown
@@ -0,0 +1,26 @@
+Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Storm is simple, can be used with any programming language, and is a lot of fun to use!
+
+## Documentation
+
+Documentation and tutorials can be found on the [Storm wiki](http://github.com/nathanmarz/storm/wiki).
+
+## Getting help
+
+Feel free to ask questions on Storm's mailing list: http://groups.google.com/group/storm-user
+
+You can also come to the #storm-user room on [freenode](http://freenode.net/). You can usually find a Storm developer there to help you out.
+
+## License
+
+The use and distribution terms for this software are covered by the
+Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+which can be found in the file LICENSE.html at the root of this distribution.
+By using this software in any fashion, you are agreeing to be bound by
+the terms of this license.
+You must not remove this notice, or any other, from this software.
+
+## Contributors
+
+* Nathan Marz ([@nathanmarz](http://twitter.com/nathanmarz))
+* Jason Jackson ([@jason_j](http://twitter.com/jason_j))
+* Christopher Bertels ([@bakkdoor](http://twitter.com/bakkdoor))
178 TODO
@@ -0,0 +1,178 @@
+Use cases:
+
+1. number of steps between 2 people in a graph (topology with cycles?)
+
+
+#################
+
+* Repackage jzmq and zmq as a leiningen "native dep"
+ - this might be good, since the native dep can package builds for all different systems/os's?
+
+
+* Deploy design:
+
+- storm swap {name} {jar} {class}
+- it's allowed to use resources equal to current running topology plus number of free resources
+- starts in deactivated mode
+- add TOPOLOGY_STARTUP_TIME config for the delay until nimbus activates a topology after launching it
+- for swap, after the startup time, deactivate the other topology, wait the TOPOLOGY_MESSAGE_TIMEOUT_SECS, and then activate the other topology
+- should be able to decrease the message timeout for killing or swapping (add optional thrift parameter) -- or just make it part of the config?
+- add killWithOptions, swap, swapWithOptions
+
+* Storm UI, stats, debugging, diagnosis tools
+-- need to be able to hide system streams/components from the calculations (another query param and should be default)
+-- need to optimize (slowness is probably on nimbus end of querying zk, consider adding heartbeat caching into nimbus)
+-- add margins
+-- add titles so its easier to distinguish the various pages
+-- right align all table columns except for the leftmost
+
+* Unit test the core pieces that have stabilized their APIs
+
+- process simulator
+- virtual ports
+- supervisor
+- utils
+- test worker/tasks
+
+* implement pseudo-distributed mode -- this is for testing the distributed parts of the code
+ - perhaps i can use pallet/vmfest for this
+
+* Need integration tests that run on an actual storm cluster (scp code/process code/zookeeper code not tested in unit tests)
+
+* bolts with none grouping can be pushed into a bolt. e.g. A -> B -> C
+ A -> D -> E
+
+If A -> B and A -> D are shuffle grouping = none, and B -> C and D -> E are not, then both can be run in A, b's branch goes to C and D's branch goes to E
+
+
+* Failure design
+
+Add fail method to outputcollector
+Fail sends fail message to Acker for those anchors, which sends fail message back to spout.
+Whenever spout fails a tuple, it emits it in its failure stream...
+
+Add fail method to drpc... Causes blocked thread to throw exception
+
+* Have worker heartbeat with its task ids, nimbus verifies - if wrong, reassign tasks?
+- detect and ignore stray tasks
+Each worker can choose a unique id for itself when heart beating
+- nimbus deletes those that aren't in topology
+
+* Subscriptions design
+
+-- new kind of spout: "subscription spout"
+ --> goal is to sync it's data across the tasks that subscribe to its streams
+ --> after doing a grouping, remembers what task it sent the tuple to (regardless of grouping). if a task dies, it knows its subscriptions and asks to be resynced
+ --> normal operation is to push to tasks, but pull done when a task starts up (b/c previous task died or something)
+ --> need to be able to add tuples to subscription or take tuples away (this is protocol with who you're subscribing to - e.g. rocket)
+ --> subscriptions can only happen in a spout because it requires persistent state
+ --> when subscription spout task dies, it polls the source (e.g. rocket) for all the subscription info
+ --> ideally you'd set things up to have one subscription spout per rocket server
+ --> TODO: Need some way to delete subscriptions -> part of tuple or extra metadata on tuple (extra metadata seems cleaner)
+ --> add isSubscription() method to Tuple as well as a getSubscriptionType() [which returns ADD or REMOVE]
+ --> when a spout starts up, it also needs to push all of its subscription info
+ --> acks are irrelevant for subscription tuples -- how should acks be managed as an abstraction?
+ -- maybe the synchronized state is done for you -- you just access the state directly and receive a callback whenever it changes?
+ -- so don't use tuples...
+ --> subscriptions break all the abstractions, perhaps I should generalize spouts and factor acking as a library on top of storm. subscriptions would just be another kind of library? -> no, it seems to break abstractions anyway (like keeping task -> tuples in memory)
+ --> maybe call it "syncspout"
+ --> if just do syncing (don't expose tuples directly?)
+ --> have a "SubscribedState" class that takes care of indexing/etc. --> expose it through topologycontext?
+ -- need a way to distinguish between states of different streams
+ -- has "add" and "remove" methods
+ -- bolt can give a statemanager object that implements add and remove in the prepare method
+ -- add(Tuple tuple)
+ -- remove(Tuple tuple)
+ --> synchronize protocol (when spout or source of data dies):
+ --> send how many tuples are going to be sent
+ --> send the tuples
+ --> OR: pack everything together into a single message (could be hard b/c where tuples are supposed to go is abstracted away)
+ --> tie everything together with a unique ID
+ --> once task receives everything, has info needed to remove tuples
+ --> statespout should do long-polling with timeout
+ --> to do subscriptions, the state should contain something like [url, subscriber]. some bolt appends subscriber to tuples, group by subscriber, and send info back
+ --> how to to fields grouping with an even distribution?
+ --> ********* tasks need to block on startup until they're synchronized *********
+ --> send sync messages in a loop until it's synchronized
+ --> add a task.synchronize.poll.freq.secs config (default to 10 seconds)
+ --> need to buffer other messages as topology is waiting for synchronization messages (use disk?)
+ --> could use acking system to know if a piece of state gets fully synchronized and communicate this with user
+ --> perhaps expose this through a special stream? (the state status stream -> similar to failure streams)
+ --> should be able to do updates of existing state
+ --> use case: have a knob that you can set externally
+ --> this isn't really any better than just using zookeeper directly
+
+
+_myState = context.setSubscribedState(_myState)
+
+StateSpout {
+ //does a timeout long poll and emits new add or remove state tuples (add and remove on the output collector)
+ nextTuple(StateSpoutOutputCollector) //collector has add and remove methods add(id, tuple). remove(id)
+ //emits all the tuples into the output collector (in the background, will also send ids and counts to tasks so they know how to synchronize)
+ //called on startup
+ //collector can have a synchronize method in case the source of data (e.g., rocket) craps out
+ synchronize(SynchronizationOutputCollector) //collector only has add(id, tuple) method
+}
+
+//task startup (in prepare method) [this is automatic]
+for(int taskId: statespoutids) {
+ emitDirect(SYNC_STREAM, tuple())
+}
+
+statespout synchronization():
+ id = uuid()
+ //getAlLStateTuples calls synchronize on the spout to get the tuples
+ for(Tuple t: getAllStateTuplesFromSource()) {
+ List tasks = emit(cons(id, t));
+ .. keep track of id -> tasks -> count
+ for(task: all output tasks) {
+ emitDirect(task, id, count)
+ }
+ }
+
+for synchronization to work, task needs to keep track of which tasks sent it tuples, and compare against only that set on synchronization
+
+Need a way to propogate information back up the topology - "subscriptions"
+e.g. browser -> rocket -> bolt -> bolt -> bolt.
+
+example: #retweets for a subscribed set of tweet ids
+
+storm topology
+
+ -> tweet spout (A) -> group on original id -> count (B) -> rocket
+
+subscriptions: rocket -> count (B) tweet id (need to group) -> spout (need to go to all)
+
+-- how does it work when stuff dies downstream or upstream? do people ask what the subscriptions are? or do you push your subscriptions up? a combination?
+
+-- maybe subscriptions are a "constant" spout? e..g, continuously emits and refreshes to make sure every task has the tuple. this seem amporphous and hard to implement... nimbus would need to refire all constant spouts whenever there's a reassignment that affects the flow of data. subscriptions seem more natural
+
+-- subscriptions are a special kind of stream that are driven by being asked to send it. e..g, rocket is a spout that emits subscription/unsubscription tuples. they only send it when they get something new, or are asked as to what all the subscriptions are
+
+-- maybe you just need a system stream to know when tasks are created. when you see that a downstream task is created, you know to fire subscriptions to it if its subscribed to your subscriptions stream? - how does this interplay with all the grouping types... you almost want to do a grouping and only send what to tasks that would have received. spouts would need to be able to subscribe to streams as well
+
+(use 'backtype.storm.testing)
+;;(start-simulating-time!)
+(def cluster (mk-local-storm-cluster))
+(use 'backtype.storm.bootstrap) (bootstrap)
+(import '[backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
+(def spout (feeder-spout ["word"]))
+(def topology (thrift/mk-topology
+ {1 (thrift/mk-spout-spec spout :parallelism-hint 3)}
+ {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
+ 3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
+ 4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
+ }))
+(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4 TOPOLOGY-DEBUG true} topology)
+
+
+* clean up project
+ - remove log4j dir and instead generate it in the deploy (it's only used in bin/storm -> create a console one and put into bin/)
+ - include system component / stream information in the topologycontext and clean up system specific code all over the place
+
+* Very rare errors
+
+weird nullptr exceptions:
+(tasks i) on send-fn
+no virtual port socket for outbound task (in worker)
+
41 bin/build_release.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+
+RELEASE=`head -1 project.clj | awk '{print $3}' | sed -e 's/\"//' | sed -e 's/\"//'`
+
+echo Making release $RELEASE
+
+DIR=_release/storm-$RELEASE
+
+rm -rf _release
+export LEIN_ROOT=1
+rm *jar
+lein clean
+lein deps
+lein compile
+mv conf/log4j.properties conf/storm.log.properties
+lein jar
+mv conf/storm.log.properties conf/log4j.properties
+mkdir -p $DIR
+mkdir $DIR/lib
+cp storm*jar $DIR/
+cp lib/*.jar $DIR/lib
+
+cp -R log4j $DIR/
+mkdir $DIR/logs
+
+mkdir $DIR/conf
+cp conf/storm.yaml.example $DIR/conf/storm.yaml
+
+cp -R src/ui/public $DIR/
+
+cp -R bin $DIR/
+
+cp README.markdown $DIR/
+cp LICENSE.html $DIR/
+
+cd _release
+zip -r storm-$RELEASE.zip *
+cd ..
+mv _release/storm-*.zip .
+rm -rf _release
+
29 bin/install_zmq.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+export JAVA_HOME=$(/usr/libexec/java_home)
+
+if [ ! -d "$JAVA_HOME/include" ]; then
+ echo "
+Looks like you're missing your 'include' directory. If you're using Mac OS X, You'll need to install the Java dev package.
+
+- Navigate to http://goo.gl/D8lI
+- Click the Java tab on the right
+- Install the appropriate version and try again.
+"
+ exit -1;
+fi
+
+#install zeromq
+wget http://download.zeromq.org/historic/zeromq-2.1.7.tar.gz
+tar -xzf zeromq-2.1.7.tar.gz
+cd zeromq-2.1.7
+./configure
+make
+sudo make install
+
+#install jzmq (both native and into local maven cache)
+git clone https://github.com/nathanmarz/jzmq.git
+cd jzmq
+./autogen.sh
+./configure
+make
+sudo make install
2  bin/javadoc.sh
@@ -0,0 +1,2 @@
+mkdir -p doc
+javadoc -d doc/ `find src -name "*.java" | grep -v generated`
80 bin/storm
@@ -0,0 +1,80 @@
+#!/usr/bin/python
+
+import os
+import sys
+import random
+import subprocess as sub
+
+CONF_DIR = os.path.expanduser("~/.storm")
+STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2])
+
+def get_jars_full(adir):
+ files = os.listdir(adir)
+ ret = []
+ for f in files:
+ if f.endswith(".jar"):
+ ret.append(adir + "/" + f)
+ return ret
+
+def get_classpath(extrajars):
+ ret = get_jars_full(STORM_DIR)
+ ret.extend(get_jars_full(STORM_DIR + "/lib"))
+ ret.extend(extrajars)
+ return ":".join(ret)
+
+def confvalue(name):
+ cp = get_classpath([])
+ command = ["java", "-client", "-cp", cp, "backtype.storm.command.config_value", name]
+ p = sub.Popen(command,stdout=sub.PIPE)
+ output, errors = p.communicate()
+ lines = output.split("\n")
+ for line in lines:
+ tokens = line.split(" ")
+ if tokens[0] == "VALUE:":
+ return tokens[1]
+
+def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[], prefix=""):
+ nativepath = confvalue("java.library.path")
+ command = prefix + " java " + jvmtype + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args)
+ print "Running: " + command
+ os.system(command)
+
+COMMAND = sys.argv[1]
+ARGS = sys.argv[2:]
+
+def jar(jarfile, klass, *args):
+ exec_storm_class(
+ klass,
+ childopts="-Dlog4j.configuration=storm.log.properties",
+ jvmtype="-client",
+ extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
+ args=args,
+ prefix="export STORM_JAR=" + jarfile + ";")
+
+def kill(name):
+ exec_storm_class("backtype.storm.command.kill_topology", args=[name], jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"], childopts="-Dlog4j.configuration=storm.log.properties")
+
+def shell(resourcesdir, command, *args):
+ tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
+ os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
+ runnerargs = [tmpjarpath, command]
+ runnerargs.extend(args)
+ exec_storm_class("backtype.storm.command.shell_submission", args=runnerargs, jvmtype="-client", extrajars=[CONF_DIR], childopts="-Dlog4j.configuration=storm.log.properties")
+ os.system("rm " + tmpjarpath)
+
+def nimbus():
+ childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=nimbus.log"
+ exec_storm_class("backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts)
+
+def supervisor():
+ childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=supervisor.log"
+ exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts)
+
+def ui():
+ childopts = "-Xmx768m -Dlogfile.name=ui.log"
+ exec_storm_class("backtype.storm.ui.core", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"])
+
+
+COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "supervisor": supervisor}
+
+COMMANDS[COMMAND](*ARGS)
64 conf/defaults.yaml
@@ -0,0 +1,64 @@
+########### These all have default values as shown
+########### Additional configuration goes into storm.yaml
+
+java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
+
+### storm.* configs are general configurations
+# the local dir is where jars are kept
+storm.local.dir: "/mnt/storm"
+storm.zookeeper.port: 2181
+storm.zookeeper.root: "/storm"
+storm.zookeeper.session.timeout: 10000
+storm.cluster.mode: "distributed" # can be distributed or local
+
+### nimbus.* configs are for the master
+nimbus.thrift.port: 6627
+nimbus.childopts: "-Xmx1024m"
+nimbus.task.timeout.secs: 30
+nimbus.supervisor.timeout.secs: 60
+nimbus.monitor.freq.secs: 10
+nimbus.task.launch.secs: 90
+nimbus.reassign: true
+nimbus.file.copy.expiration.secs: 600
+
+### supervisor.* configs are for node supervisors
+# Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication
+supervisor.slots.ports:
+ - 6700
+ - 6701
+ - 6702
+ - 6703
+supervisor.childopts: "-Xmx1024m"
+#how long supervisor will wait to ensure that a worker process is started
+supervisor.worker.start.timeout.secs: 20
+#how long between heartbeats until supervisor considers that worker dead and tries to restart it
+supervisor.worker.timeout.secs: 25
+#how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary
+supervisor.monitor.frequency.secs: 3
+#how frequently the supervisor heartbeats to the cluster state (for nimbus)
+supervisor.heartbeat.frequency.secs: 5
+supervisor.enable: true
+
+### worker.* configs are for task workers
+worker.childopts: "-Xmx768m"
+worker.heartbeat.frequency.secs: 1
+
+task.heartbeat.frequency.secs: 3
+task.refresh.poll.secs: 10
+
+zmq.threads: 1
+zmq.linger.millis: 5000
+
+### topology.* configs are for specific executing storms
+topology.debug: false
+topology.optimize: true
+topology.workers: 1
+topology.ackers: 1
+# maximum amount of time a message has to complete before it's considered failed
+topology.message.timeout.secs: 30
+topology.skip.missing.serializations: false
+topology.max.task.parallelism: null
+topology.max.spout.pending: null
+topology.state.synchronization.timeout.secs: 60
+topology.stats.sample.rate: 0.05
+
10 conf/log4j.properties
@@ -0,0 +1,10 @@
+#This file should be deleted when deployed to server (workaround to leiningen classpath putting dev resources on path)
+#This file is needed for tests
+
+log4j.rootLogger=INFO, A1
+
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.category.org.apache.zookeeper=warn
16 conf/storm.yaml.example
@@ -0,0 +1,16 @@
+########### These MUST be filled in for a storm configuration
+# storm.zookeeper.servers:
+# - "server1"
+# - "server2"
+#
+# nimbus.host: "nimbus"
+#
+#
+# ##### These may optionally be filled in:
+#
+## Map of tokens to a serialization class. tokens less than 32 are reserved by storm.
+## Tokens are written on the wire to identify the field.
+# topology.serializations:
+# 33: "org.mycompany.MyObjectSerialization"
+# 34: "org.mycompany.MyOtherObjectSerialization"
+
9 log4j/log4j.properties
@@ -0,0 +1,9 @@
+log4j.rootLogger=INFO, A1
+
+
+log4j.appender.A1 = org.apache.log4j.DailyRollingFileAppender
+log4j.appender.A1.File = logs/${logfile.name}
+log4j.appender.A1.Append = true
+log4j.appender.A1.DatePattern = '.'yyy-MM-dd
+log4j.appender.A1.layout = org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
32 project.clj
@@ -0,0 +1,32 @@
+(defproject storm "0.5.0"
+ :source-path "src/clj"
+ :test-path "test/clj"
+ :java-source-path "src/jvm"
+ :javac-options {:debug "true" :fork "true"}
+ :resources-path "conf"
+ :dev-resources-path "src/dev"
+ :dependencies [[org.clojure/clojure "1.2.0"]
+ [org.clojure/clojure-contrib "1.2.0"]
+ [commons-io "1.4"]
+ [org.apache.commons/commons-exec "1.1"]
+ [jvyaml "1.0.0"]
+ [backtype/thriftjava "1.0.0"]
+ [clj-time "0.3.0"]
+ [log4j/log4j "1.2.16"]
+ [org.apache.zookeeper/zookeeper "3.3.2"]
+ [backtype/jzmq "2.1.0"]
+ [com.googlecode.json-simple/json-simple "1.1"]
+ [compojure "0.6.4"]
+ [hiccup "0.3.6"]
+ [ring/ring-jetty-adapter "0.3.11"]
+ ]
+ :uberjar-exclusions [#"META-INF.*"]
+ :dev-dependencies [
+ [swank-clojure "1.2.1"]
+ [lein-ring "0.4.5"]
+ ]
+ :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
+ :ring {:handler backtype.storm.ui.core/app}
+ :extra-classpath-dirs ["src/ui"]
+ :aot :all
+)
26 src/clj/backtype/storm/LocalCluster.clj
@@ -0,0 +1,26 @@
+(ns backtype.storm.LocalCluster
+ (:use [backtype.storm testing])
+ (:gen-class
+ :init init
+ :implements [backtype.storm.ILocalCluster]
+ :constructors {[] []}
+ :state state ))
+
+(defn -init []
+ (let [ret (mk-local-storm-cluster)]
+ [[] ret]
+ ))
+
+(defn -submitTopology [this name conf topology]
+ (submit-local-topology (:nimbus (. this state))
+ name
+ conf
+ topology))
+
+(defn -shutdown [this]
+ (kill-local-storm-cluster (. this state))
+ )
+
+(defn -killTopology [this name]
+ (.killTopology (:nimbus (. this state)) name)
+ )
38 src/clj/backtype/storm/bootstrap.clj
@@ -0,0 +1,38 @@
+(ns backtype.storm.bootstrap)
+
+(defmacro bootstrap []
+ '(do
+ (import (quote [backtype.storm Constants]))
+ (import (quote [backtype.storm.testing FeederSpout TestPlannerBolt TestPlannerSpout AckFailDelegate AckTracker]))
+ (import (quote [backtype.storm.utils Utils LocalState Time TimeCacheMap
+ TimeCacheMap$ExpiredCallback BufferFileInputStream]))
+ (import (quote [backtype.storm.serialization TupleSerializer TupleDeserializer]))
+ (import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
+ (import (quote [backtype.storm.tuple Tuple Fields MessageId]))
+ (import (quote [backtype.storm.task IBolt IOutputCollector
+ OutputCollector OutputCollectorImpl IInternalOutputCollector
+ TopologyContext ShellBolt
+ CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt]))
+ (import (quote [backtype.storm.daemon Shutdownable]))
+ (use (quote [backtype.storm config util log clojure]))
+ (use (quote [clojure.contrib.seq :only [find-first]]))
+ (require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
+ [event :as event] [process-simulator :as psim]]))
+ (require (quote [clojure.set :as set]))
+ (require (quote [zilch [mq :as mq]]))
+ (require (quote [zilch [virtual-port :as mqvp]]))
+ (require (quote [backtype.storm [stats :as stats]]))
+ (import (quote [org.apache.log4j PropertyConfigurator Logger]))
+
+ (import (quote [backtype.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
+ NotAliveException AlreadyAliveException InvalidTopologyException
+ ClusterSummary TopologyInfo TopologySummary TaskSummary TaskStats TaskSpecificStats
+ SpoutStats BoltStats ErrorInfo SupervisorSummary]))
+ (import (quote [backtype.storm.daemon.common StormBase Assignment
+ TaskInfo SupervisorInfo WorkerHeartbeat TaskHeartbeat]))
+ (import (quote [java.io File FileOutputStream FileInputStream]))
+ (import (quote [java.util List Random Map HashMap]))
+ (import (quote [org.apache.commons.io FileUtils]))
+ (import (quote [java.util ArrayList]))
+ (mq/zeromq-imports)
+ ))
71 src/clj/backtype/storm/clojure.clj
@@ -0,0 +1,71 @@
+(ns backtype.storm.clojure
+ (:use backtype.storm.bootstrap)
+ (:import [backtype.storm.generated StreamInfo])
+ (:import [backtype.storm.tuple Tuple])
+ (:import [backtype.storm.task OutputCollector])
+ (:import backtype.storm.clojure.ClojureBolt)
+ (:require [backtype.storm [thrift :as thrift]]))
+
+(defn direct-stream [fields]
+ (StreamInfo. fields true))
+
+(defn clojure-bolt* [output-spec fn-var & args]
+ (let [m (meta fn-var)]
+ (ClojureBolt. (str (:ns m)) (str (:name m)) args (thrift/mk-output-spec output-spec))
+ ))
+
+(defmacro clojure-bolt [output-spec fn-sym & args]
+ `(clojure-bolt* ~output-spec (var ~fn-sym) ~@args))
+
+(defmacro defbolt [name output-spec [tuple-sym collector-sym] & body]
+ (let [worker-name (symbol (str name "__"))]
+ `(do
+ (defn ~worker-name []
+ (fn [^Tuple ~tuple-sym ^OutputCollector ~collector-sym]
+ ~@body
+ ))
+ (def ~name (clojure-bolt ~output-spec ~worker-name))
+ )))
+
+(defn hint [sym class-sym]
+ (with-meta sym {:tag class-sym})
+ )
+
+(defmulti hinted-args (fn [kw args] kw))
+
+(defmethod hinted-args :prepare [_ [conf context collector]]
+ [(hint conf 'java.util.Map)
+ (hint context 'backtype.storm.task.TopologyContext)
+ (hint collector 'backtype.storm.bolt.OutputCollector)]
+ )
+
+(defmethod hinted-args :execute [_ [tuple collector]]
+ [(hint tuple 'backtype.storm.tuple.Tuple)
+ (hint collector 'backtype.storm.task.OutputCollector)]
+ )
+
+(defmethod hinted-args :cleanup [_ [collector]]
+ [(hint collector 'backtype.storm.task.OutputCollector)]
+ )
+
+(defmacro defboltfull [name output-spec & kwargs]
+ (let [opts (apply hash-map kwargs)
+ worker-name (symbol (str name "__"))
+ let-bindings (:let opts)
+ hof-args (:params opts)
+ definer (if hof-args
+ `(defn ~name [& args#]
+ (apply clojure-bolt* ~output-spec (var ~worker-name) args#))
+ `(def ~name (clojure-bolt ~output-spec ~worker-name)))
+ fns (select-keys opts [:prepare :execute :cleanup])
+ fns (into {}
+ (for [[fn-kw [args & impl]] fns]
+ [fn-kw `(fn ~(hinted-args fn-kw args) ~@impl)]
+ ))]
+ `(do
+ (defn ~worker-name [~@hof-args]
+ (let [~@let-bindings]
+ ~fns
+ ))
+ ~definer
+ )))
422 src/clj/backtype/storm/cluster.clj
@@ -0,0 +1,422 @@
+(ns backtype.storm.cluster
+ (:import [org.apache.zookeeper.data Stat])
+ (:import [backtype.storm.utils Utils])
+ (:use [backtype.storm util log config])
+ (:use [clojure.contrib.core :only [dissoc-in]])
+ (:require [backtype.storm [zookeeper :as zk]])
+ )
+
+(defprotocol ClusterState
+ (set-ephemeral-node [this path data])
+ (delete-node [this path])
+ (set-data [this path data]) ;; if node does not exist, create persistent with this data
+ (get-data [this path watch?])
+ (get-children [this path watch?])
+ (mkdirs [this path])
+ (close [this])
+ (register [this callback])
+ (unregister [this id])
+ )
+
+(defn mk-distributed-cluster-state [conf]
+ (let [zk (zk/mk-client (mk-zk-connect-string (assoc conf STORM-ZOOKEEPER-ROOT "/")))]
+ (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
+ (.close zk)
+ )
+ (let [callbacks (atom {})
+ active (atom true)
+ mk-zk #(zk/mk-client (mk-zk-connect-string conf)
+ (conf STORM-ZOOKEEPER-SESSION-TIMEOUT)
+ %)
+ zk (atom nil)
+ watcher (fn this [state type path]
+ (when @active
+ (when-not (= :connected state)
+ (log-message "Zookeeper disconnected. Attempting to reconnect")
+ (reset! zk (mk-zk this))
+ )
+ (when-not (= :none type)
+ (doseq [callback (vals @callbacks)]
+ (callback type path))))
+ )]
+ (reset! zk (mk-zk watcher))
+ (reify
+ ClusterState
+ (register [this callback]
+ (let [id (uuid)]
+ (swap! callbacks assoc id callback)
+ id
+ ))
+ (unregister [this id]
+ (swap! callbacks dissoc id))
+ (set-ephemeral-node [this path data]
+ (zk/mkdirs @zk (parent-path path))
+ (if (zk/exists @zk path false)
+ (zk/set-data @zk path data) ; should verify that it's ephemeral
+ (zk/create-node @zk path data :ephemeral)
+ ))
+
+ (set-data [this path data]
+ ;; note: this does not turn off any existing watches
+ (if (zk/exists @zk path false)
+ (zk/set-data @zk path data)
+ (do
+ (zk/mkdirs @zk (parent-path path))
+ (zk/create-node @zk path data :persistent)
+ )))
+
+ (delete-node [this path]
+ (zk/delete-recursive @zk path)
+ )
+
+ (get-data [this path watch?]
+ (zk/get-data @zk path watch?)
+ )
+
+ (get-children [this path watch?]
+ (zk/get-children @zk path watch?))
+
+ (mkdirs [this path]
+ (zk/mkdirs @zk path))
+
+ (close [this]
+ (reset! active false)
+ (.close @zk))
+ )))
+
+(defprotocol StormClusterState
+ (assignments [this callback])
+ (assignment-info [this storm-id callback])
+ (active-storms [this])
+ (storm-base [this storm-id callback])
+
+ (task-storms [this])
+ (task-ids [this storm-id])
+ (task-info [this storm-id task-id])
+ (task-heartbeat [this storm-id task-id]) ;; returns nil if doesn't exist
+ (supervisors [this callback])
+ (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
+
+ (setup-heartbeats! [this storm-id])
+ (teardown-heartbeats! [this storm-id])
+ (teardown-task-errors! [this storm-id])
+ (heartbeat-storms [this])
+ (task-error-storms [this])
+ (heartbeat-tasks [this storm-id])
+
+ (set-task! [this storm-id task-id info])
+ (task-heartbeat! [this storm-id task-id info])
+ (remove-task-heartbeat! [this storm-id task-id])
+ (supervisor-heartbeat! [this supervisor-id info])
+ (activate-storm! [this storm-id storm-base])
+ (deactivate-storm! [this storm-id])
+ (set-assignment! [this storm-id info])
+ (remove-storm! [this storm-id])
+ (report-task-error [this storm-id task-id error])
+ (task-errors [this storm-id task-id])
+
+ (disconnect [this])
+ )
+
+
+(def ASSIGNMENTS-ROOT "assignments")
+(def TASKS-ROOT "tasks")
+(def CODE-ROOT "code")
+(def STORMS-ROOT "storms")
+(def SUPERVISORS-ROOT "supervisors")
+(def TASKBEATS-ROOT "taskbeats")
+(def TASKERRORS-ROOT "taskerrors")
+
+(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
+(def TASKS-SUBTREE (str "/" TASKS-ROOT))
+(def STORMS-SUBTREE (str "/" STORMS-ROOT))
+(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
+(def TASKBEATS-SUBTREE (str "/" TASKBEATS-ROOT))
+(def TASKERRORS-SUBTREE (str "/" TASKERRORS-ROOT))
+
+(defn supervisor-path [id]
+ (str SUPERVISORS-SUBTREE "/" id))
+
+(defn assignment-path [id]
+ (str ASSIGNMENTS-SUBTREE "/" id))
+
+(defn storm-path [id]
+ (str STORMS-SUBTREE "/" id))
+
+(defn storm-task-root [storm-id]
+ (str TASKS-SUBTREE "/" storm-id))
+
+(defn task-path [storm-id task-id]
+ (str (storm-task-root storm-id) "/" task-id))
+
+(defn taskbeat-storm-root [storm-id]
+ (str TASKBEATS-SUBTREE "/" storm-id))
+
+(defn taskbeat-path [storm-id task-id]
+ (str (taskbeat-storm-root storm-id) "/" task-id))
+
+(defn taskerror-storm-root [storm-id]
+ (str TASKERRORS-SUBTREE "/" storm-id))
+
+(defn taskerror-path [storm-id task-id]
+ (str (taskerror-storm-root storm-id) "/" task-id))
+
+
+(defn- issue-callback! [cb-atom]
+ (let [cb @cb-atom]
+ (reset! cb-atom nil)
+ (when cb
+ (cb))
+ ))
+
+(defn- issue-map-callback! [cb-atom id]
+ (let [cb (@cb-atom id)]
+ (swap! cb-atom dissoc id)
+ (when cb
+ (cb id))
+ ))
+
+(defn- maybe-deserialize [ser]
+ (when ser
+ (Utils/deserialize ser)))
+
+(defstruct TaskError :error :time-secs)
+
+(defn mk-storm-cluster-state [cluster-state-spec]
+ (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
+ [false cluster-state-spec]
+ [true (mk-distributed-cluster-state cluster-state-spec)])
+ assignment-info-callback (atom {})
+ supervisors-callback (atom nil)
+ assignments-callback (atom nil)
+ storm-base-callback (atom {})
+ state-id (register
+ cluster-state
+ (fn [type path]
+ (let [[subtree & args] (tokenize-path path)]
+ (condp = subtree
+ ASSIGNMENTS-ROOT (if (empty? args)
+ (issue-callback! assignments-callback)
+ (issue-map-callback! assignment-info-callback (first args)))
+ SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+ STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
+ ;; this should never happen
+ (halt-process! 30 "Unknown callback for subtree " subtree args)
+ )
+ )))]
+ (doseq [p [ASSIGNMENTS-SUBTREE TASKS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE TASKBEATS-SUBTREE TASKERRORS-SUBTREE]]
+ (mkdirs cluster-state p))
+ (reify
+ StormClusterState
+
+ (assignments [this callback]
+ (when callback
+ (reset! assignments-callback callback))
+ (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
+
+ (assignment-info [this storm-id callback]
+ (when callback
+ (swap! assignment-info-callback assoc storm-id callback))
+ (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
+ )
+
+ (active-storms [this]
+ (get-children cluster-state STORMS-SUBTREE false)
+ )
+
+ (heartbeat-storms [this]
+ (get-children cluster-state TASKBEATS-SUBTREE false)
+ )
+
+ (task-error-storms [this]
+ (get-children cluster-state TASKERRORS-SUBTREE false)
+ )
+
+ (heartbeat-tasks [this storm-id]
+ (get-children cluster-state (taskbeat-storm-root storm-id) false)
+ )
+
+ (task-storms [this]
+ (get-children cluster-state TASKS-SUBTREE false)
+ )
+
+ (task-ids [this storm-id]
+ (map parse-int
+ (get-children cluster-state (storm-task-root storm-id) false)
+ ))
+
+ (task-info [this storm-id task-id]
+ (maybe-deserialize (get-data cluster-state (task-path storm-id task-id) false))
+ )
+
+ ;; TODO: add a callback here so that nimbus can respond immediately when it goes down?
+ (task-heartbeat [this storm-id task-id]
+ (maybe-deserialize (get-data cluster-state (taskbeat-path storm-id task-id) false))
+ )
+
+ (supervisors [this callback]
+ (when callback
+ (reset! supervisors-callback callback))
+ (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))
+ )
+
+ (supervisor-info [this supervisor-id]
+ (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))
+ )
+
+ (set-task! [this storm-id task-id info]
+ (set-data cluster-state (task-path storm-id task-id) (Utils/serialize info))
+ )
+
+ (task-heartbeat! [this storm-id task-id info]
+ (set-ephemeral-node cluster-state (taskbeat-path storm-id task-id) (Utils/serialize info))
+ )
+
+ (remove-task-heartbeat! [this storm-id task-id]
+ (delete-node cluster-state (taskbeat-path storm-id task-id))
+ )
+
+ (setup-heartbeats! [this storm-id]
+ (mkdirs cluster-state (taskbeat-storm-root storm-id)))
+
+ (teardown-heartbeats! [this storm-id]
+ (delete-node cluster-state (taskbeat-storm-root storm-id)))
+
+ (teardown-task-errors! [this storm-id]
+ (delete-node cluster-state (taskerror-storm-root storm-id)))
+
+ (supervisor-heartbeat! [this supervisor-id info]
+ (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
+ )
+
+ (activate-storm! [this storm-id storm-base]
+ (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
+ )
+
+ (storm-base [this storm-id callback]
+ (when callback
+ (swap! storm-base-callback assoc storm-id callback))
+ (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
+ )
+
+ (deactivate-storm! [this storm-id]
+ (delete-node cluster-state (storm-path storm-id))
+ )
+
+ (set-assignment! [this storm-id info]
+ (set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
+ )
+
+ (remove-storm! [this storm-id]
+ ;; rmr the task related info. must remove assignment last
+ (delete-node cluster-state (storm-task-root storm-id))
+ (delete-node cluster-state (assignment-path storm-id))
+ )
+
+ (report-task-error [this storm-id task-id error]
+ (let [path (taskerror-path storm-id task-id)
+ _ (mkdirs cluster-state path)
+ children (get-children cluster-state path false)
+ times (sort (map #(Integer/parseInt %) children))
+ ]
+ (if (>= (count times) 10)
+ (delete-node cluster-state (str path "/" (first times)))
+ )
+ (set-data cluster-state
+ (str path "/" (current-time-secs))
+ (.getBytes ^String (stringify-error error)))
+ ))
+
+ (task-errors [this storm-id task-id]
+ (let [path (taskerror-path storm-id task-id)
+ _ (mkdirs cluster-state path)
+ children (get-children cluster-state path false)
+ errors (dofor [c children]
+ (let [^bytes v (get-data cluster-state (str path "/" c) false)]
+ (when v
+ (struct TaskError (String. v) (Integer/parseInt c))
+ )))
+ ]
+ (->> (filter not-nil? errors)
+ (sort-by :time-secs)
+ )
+ ))
+
+ (disconnect [this]
+ (unregister cluster-state state-id)
+ (when solo?
+ (close cluster-state)))
+ )))
+
+;; daemons have a single thread that will respond to events
+;; start with initialize event
+;; callbacks add events to the thread's queue
+
+;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified.
+;; master gives orders through state, and client records status in state (ephemerally)
+
+;; master tells nodes what workers to launch
+
+;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified
+;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up
+;; /assignments/{storm id}
+
+;; which tasks they talk to, etc. (immutable until shutdown)
+;; everyone reads this in full to understand structure
+;; /tasks/{storm id}/{task id} ; just contains bolt id
+
+
+;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
+;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here
+
+;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
+;; /taskbeats/{storm id}/{ephemeral task id}
+
+;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown
+;; master manipulates
+;; /storms/{storm id}
+
+
+
+;; Zookeeper flows:
+
+;; Master:
+;; job submit:
+;; 1. read which nodes are available
+;; 2. set up the worker/{storm}/{task} stuff (static)
+;; 3. set assignments
+;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
+
+;; Monitoring (or by checking when nodes go down or heartbeats aren't received):
+;; 1. read assignment
+;; 2. see which tasks/nodes are up
+;; 3. make new assignment to fix any problems
+;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
+
+
+;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even
+
+;; Supervisor:
+;; 1. monitor /storms/* and assignments
+;; 2. local state about which workers are local
+;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments
+;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup
+
+;; Worker:
+;; 1. On startup, start the tasks if the storm is on
+
+;; Task:
+;; 1. monitor assignments, reroute when assignments change
+;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off
+
+
+
+;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
+;; supervisor periodically checks to make sure processes are alive
+;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside
+
+;; all tasks in a worker share the same cluster state
+;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped
+;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear)
+;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9)
+;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die
9 src/clj/backtype/storm/command/config_value.clj
@@ -0,0 +1,9 @@
+(ns backtype.storm.command.config-value
+ (:use [backtype.storm config log])
+ (:gen-class))
+
+
+(defn -main [^String name]
+ (let [conf (read-storm-config)]
+ (println "VALUE:" (conf name))
+ ))
13 src/clj/backtype/storm/command/kill_topology.clj
@@ -0,0 +1,13 @@
+(ns backtype.storm.command.kill-topology
+ (:use [backtype.storm thrift config log])
+ (:gen-class))
+
+
+(defn -main [^String name]
+ (let [conf (read-storm-config)
+ host (conf NIMBUS-HOST)
+ port (conf NIMBUS-THRIFT-PORT)]
+ (with-nimbus-connection [nimbus host port]
+ (.killTopology nimbus name)
+ (log-message "Killed storm: " name)
+ )))
15 src/clj/backtype/storm/command/shell_submission.clj
@@ -0,0 +1,15 @@
+(ns backtype.storm.command.shell-submission
+ (:import [backtype.storm StormSubmitter])
+ (:use [backtype.storm thrift util config log])
+ (:require [clojure.string :as str])
+ (:gen-class))
+
+
+(defn -main [^String tmpjarpath & args]
+ (let [conf (read-storm-config)
+ host (conf NIMBUS-HOST)
+ port (conf NIMBUS-THRIFT-PORT)
+ jarpath (StormSubmitter/submitJar conf tmpjarpath)
+ args (concat args [host port jarpath])]
+ (exec-command! (str/join " " args))
+ ))
139 src/clj/backtype/storm/config.clj
@@ -0,0 +1,139 @@
+(ns backtype.storm.config
+ (:import [org.jvyaml YAML])
+ (:import [java.io FileReader File])
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.utils Utils LocalState])
+ (:import [org.apache.commons.io FileUtils])
+ (:require [clojure.contrib [str-utils2 :as str]])
+ (:use [backtype.storm util])
+ )
+
+(def RESOURCES-SUBDIR "resources")
+
+;; define clojure constants for every configuration parameter
+(doseq [f (seq (.getFields Config))]
+ (let [name (.getName f)
+ new-name (.replace (.toUpperCase name) "_" "-")]
+ (eval
+ `(def ~(symbol new-name) (. Config ~(symbol name))))
+ ))
+
+(defn cluster-mode [conf & args]
+ (keyword (conf STORM-CLUSTER-MODE)))
+
+(defn local-mode? [conf]
+ (let [mode (conf STORM-CLUSTER-MODE)]
+ (condp = mode
+ "local" true
+ "distributed" false
+ (throw (IllegalArgumentException.
+ (str "Illegal cluster mode in conf: " mode)))
+ )))
+
+(defn sampling-rate [conf]
+ (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
+ (/ 1)
+ int))
+
+(defn mk-stats-sampler [conf]
+ (even-sampler (sampling-rate conf)))
+
+; storm.zookeeper.servers:
+; - "server1"
+; - "server2"
+; - "server3"
+; nimbus.host: "master"
+;
+; ########### These all have default values as shown
+;
+; ### storm.* configs are general configurations
+; # the local dir is where jars are kept
+; storm.local.dir: "/mnt/storm"
+; storm.zookeeper.port: 2181
+; storm.zookeeper.root: "/storm"
+
+(defn mk-zk-connect-string [conf]
+ (let [servers (conf STORM-ZOOKEEPER-SERVERS)
+ port (conf STORM-ZOOKEEPER-PORT)
+ root (conf STORM-ZOOKEEPER-ROOT)]
+ (str
+ (str/join ","
+ (for [s servers]
+ (str s ":" port)))
+ root)
+ ))
+
+(defn read-default-config []
+ (clojurify-structure (Utils/readDefaultConfig)))
+
+(defn read-storm-config []
+ (clojurify-structure (Utils/readStormConfig)))
+
+(defn read-yaml-config [name]
+ (clojurify-structure (Utils/findAndReadConfigFile name true)))
+
+(defn master-stormdist-root [conf storm-id]
+ (str (conf STORM-LOCAL-DIR) "/stormdist/" storm-id))
+
+(defn master-stormjar-path [stormroot]
+ (str stormroot "/stormjar.jar"))
+
+(defn master-stormcode-path [stormroot]
+ (str stormroot "/stormcode.ser"))
+
+(defn master-stormconf-path [stormroot]
+ (str stormroot "/stormconf.ser"))
+
+(defn master-inbox [conf]
+ (let [ret (str (conf STORM-LOCAL-DIR) "/inbox")]
+ (FileUtils/forceMkdir (File. ret))
+ ret ))
+
+(defn supervisor-stormdist-root
+ ([conf] (str (conf STORM-LOCAL-DIR) "/stormdist"))
+ ([conf storm-id]
+ (str (supervisor-stormdist-root conf) "/" storm-id)))
+
+(defn supervisor-stormjar-path [stormroot]
+ (str stormroot "/stormjar.jar"))
+
+(defn supervisor-stormcode-path [stormroot]
+ (str stormroot "/stormcode.ser"))
+
+(defn supervisor-stormconf-path [stormroot]
+ (str stormroot "/stormconf.ser"))
+
+(defn supervisor-tmp-dir [conf]
+ (let [ret (str (conf STORM-LOCAL-DIR) "/tmp")]
+ (FileUtils/forceMkdir (File. ret))
+ ret ))
+
+(defn supervisor-storm-resources-path [stormroot]
+ (str stormroot "/" RESOURCES-SUBDIR))
+
+(defn ^LocalState supervisor-state [conf]
+ (LocalState. (str (conf STORM-LOCAL-DIR) "/localstate")))
+
+
+(defn worker-root
+ ([conf]
+ (str (conf STORM-LOCAL-DIR) "/workers"))
+ ([conf id]
+ (str (worker-root conf) "/" id)))
+
+(defn worker-pids-root
+ [conf id]
+ (str (worker-root conf id) "/pids"))
+
+(defn worker-pid-path [conf id pid]
+ (str (worker-pids-root conf id) "/" pid))
+
+(defn worker-heartbeats-root
+ [conf id]
+ (str (worker-root conf id) "/heartbeats"))
+
+;; workers heartbeat here with pid and timestamp
+;; if supervisor stops receiving heartbeat, it kills and restarts the process
+;; in local mode, keep a global map of ids to threads for simulating process management
+(defn ^LocalState worker-state [conf id]
+ (LocalState. (worker-heartbeats-root conf id)))
96 src/clj/backtype/storm/daemon/common.clj
@@ -0,0 +1,96 @@
+(ns backtype.storm.daemon.common
+ (:use [clojure.contrib.seq-utils :only [find-first]])
+ (:use [backtype.storm log config util])
+ )
+
+(def ACKER-COMPONENT-ID -1)
+(def ACKER-INIT-STREAM-ID -1)
+(def ACKER-ACK-STREAM-ID -2)
+(def ACKER-FAIL-STREAM-ID -3)
+
+
+(defn system-component? [id]
+ (< id 0))
+
+;; the task id is the virtual port
+;; node->host is here so that tasks know who to talk to just from assignment
+;; this avoid situation where node goes down and task doesn't know what to do information-wise
+(defrecord Assignment [master-code-dir node->host task->node+port task->start-time-secs])
+
+(defrecord StormBase [storm-name launch-time-secs])
+
+(defrecord SupervisorInfo [time-secs hostname worker-ports uptime-secs])
+
+(defrecord TaskInfo [component-id])
+
+(defprotocol DaemonCommon
+ (waiting? [this]))
+
+(def LS-WORKER-HEARTBEAT "worker-heartbeat")
+
+;; LocalState constants
+(def LS-ID "supervisor-id")
+(def LS-LOCAL-ASSIGNMENTS "local-assignments")
+(def LS-APPROVED-WORKERS "approved-workers")
+
+
+
+(defrecord WorkerHeartbeat [time-secs storm-id task-ids port])
+
+;; should include stats in here
+;; TODO: want to know how many it has processed from every source
+;; component/stream pair
+;; TODO: want to know how many it has emitted to every stream
+(defrecord TaskStats [^long processed
+ ^long acked
+ ^long emitted
+ ^long transferred
+ ^long failed])
+
+(defrecord TaskHeartbeat [time-secs uptime-secs stats])
+
+(defn new-task-stats []
+ (TaskStats. 0 0 0 0 0))
+
+;technically this is only active task ids
+(defn storm-task-ids [storm-cluster-state storm-id]
+ (keys (:task->node+port (.assignment-info storm-cluster-state storm-id nil))))
+
+(defn storm-task-info
+ "Returns map from task -> component id"
+ [storm-cluster-state storm-id]
+ (let [task-ids (.task-ids storm-cluster-state storm-id)]
+ (into {}
+ (dofor [id task-ids]
+ [id (:component-id (.task-info storm-cluster-state storm-id id))]
+ ))))
+
+(defn get-storm-id [storm-cluster-state storm-name]
+ (let [active-storms (.active-storms storm-cluster-state)]
+ (find-first
+ #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
+ active-storms)
+ ))
+
+(defn topology-bases [storm-cluster-state]
+ (let [active-topologies (.active-storms storm-cluster-state)]
+ (into {}
+ (dofor [id active-topologies]
+ [id (.storm-base storm-cluster-state id nil)]
+ ))
+ ))
+
+(defn validate-distributed-mode! [conf]
+ (if (local-mode? conf)
+ (throw
+ (IllegalArgumentException. "Cannot start server in local mode!"))))
+
+(defmacro defserverfn [name & body]
+ `(let [exec-fn# (fn ~@body)]
+ (defn ~name [& args#]
+ (try
+ (apply exec-fn# args#)
+ (catch Throwable t#
+ (log-error t# "Error on initialization of server " ~(str name))
+ (halt-process! 13 "Error on initialization")
+ )))))
81 src/clj/backtype/storm/daemon/drpc.clj
@@ -0,0 +1,81 @@
+(ns backtype.storm.daemon.drpc
+ (:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
+ (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
+ (:import [org.apache.thrift TException])
+ (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
+ (:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor])
+ (:import [java.util.concurrent Semaphore])
+ (:import [backtype.storm.drpc SpoutAdder])
+ (:import [java.net InetAddress])
+ (:use [backtype.storm bootstrap])
+ (:gen-class))
+
+(bootstrap)
+
+
+(def DEFAULT-PORT 3772) ; "drpc"
+(def REQUEST-TIMEOUT-SECS 600)
+(def TIMEOUT-CHECK-SECS 60)
+
+;; TODO: change this to use TimeCacheMap
+(defn service-handler [^SpoutAdder spout-adder port]
+ (let [ctr (atom 0)
+ id->sem (atom {})
+ id->result (atom {})
+ id->start (atom {})
+ cleanup (fn [id] (swap! id->sem dissoc id)
+ (swap! id->result dissoc id)
+ (swap! id->start dissoc id))
+ my-ip (.getHostAddress (InetAddress/getLocalHost))
+ ]
+ (async-loop
+ (fn []
+ (doseq [[id start] @id->start]
+ (when (> (time-delta start) REQUEST-TIMEOUT-SECS)
+ (if-let [sem (@id->sem id)]
+ (.release sem))
+ (cleanup id)
+ ))
+ TIMEOUT-CHECK-SECS
+ ))
+ (reify DistributedRPC$Iface
+ (^String execute [this ^String function ^String args]
+ (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
+ ^Semaphore sem (Semaphore. 0)
+ return-info (to-json {"ip" my-ip "port" port "id" id})
+ ]
+ (swap! id->start assoc id (current-time-secs))
+ (swap! id->sem assoc id sem)
+ (.add spout-adder function args return-info)
+ (.acquire sem)
+ (let [result (@id->result id)]
+ (cleanup id)
+ result
+ )))
+ (^void result [this ^String id ^String result]
+ (let [^Semaphore sem (@id->sem id)]
+ (when sem
+ (swap! id->result assoc id result)
+ (.release sem)
+ )))
+ )))
+
+(defn launch-server!
+ ([spout-adder]
+ (launch-server! DEFAULT-PORT spout-adder))
+ ([port spout-adder]
+ (let [options (THsHaServer$Options.)
+ _ (set! (. options maxWorkerThreads) 64)
+ service-handler (service-handler spout-adder port)
+ server (THsHaServer.
+ (DistributedRPC$Processor. service-handler)
+ (TNonblockingServerSocket. port)
+ (TBinaryProtocol$Factory.) options)]
+ (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
+ (log-message "Starting Distributed RPC server...")
+ (.serve server))))
+
+(defn -main [spout-adder-class & args]
+ (let [form (concat ['new (symbol spout-adder-class)] args)]
+ (launch-server! (eval form))
+ ))
638 src/clj/backtype/storm/daemon/nimbus.clj
@@ -0,0 +1,638 @@
+(ns backtype.storm.daemon.nimbus
+ (:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
+ (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
+ (:import [org.apache.thrift TException])
+ (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
+ (:use [backtype.storm bootstrap])
+ (:use [backtype.storm.daemon common])
+ (:gen-class))
+
+(bootstrap)
+
+(defmulti setup-jar cluster-mode)
+
+
+;; Master:
+;; job submit:
+;; 1. read which nodes are available
+;; 2. set up the worker/{storm}/{task} stuff (static)
+;; 3. set assignments
+;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
+
+;; Monitoring (or by checking when nodes go down or heartbeats aren't received):
+;; 1. read assignment
+;; 2. see which tasks/nodes are up
+;; 3. make new assignment to fix any problems
+;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
+
+(defn- assigned-slots
+ "Returns a map from node-id to a set of ports"
+ [storm-cluster-state]
+ (let [assignments (.assignments storm-cluster-state nil)
+ ]
+ (defaulted
+ (apply merge-with set/union
+ (for [a assignments
+ [_ [node port]] (-> (.assignment-info storm-cluster-state a nil) :task->node+port)]
+ {node #{port}}
+ ))
+ {})
+ ))
+
+(defn- all-supervisor-info
+ ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
+ ([storm-cluster-state callback]
+ (let [supervisor-ids (.supervisors storm-cluster-state callback)]
+ (into {}
+ (mapcat
+ (fn [id]
+ (if-let [info (.supervisor-info storm-cluster-state id)]
+ [[id info]]
+ ))
+ supervisor-ids))
+ )))
+
+(defn- available-slots
+ [conf storm-cluster-state callback]
+ (let [supervisor-ids (.supervisors storm-cluster-state callback)
+ supervisor-infos (all-supervisor-info storm-cluster-state callback)
+ ;; TODO: this is broken. need to maintain a map since last time
+ ;; supervisor hearbeats like is done for tasks
+ ;; maybe it's ok to trust ephemeral nodes here?
+ ;;[[id info]]
+ ;; (when (< (time-delta (:time-secs info))
+ ;; (conf NIMBUS-SUPERVISOR-TIMEOUT-SECS))
+ ;; [[id info]]
+ ;; )
+ all-slots (map-val (comp set :worker-ports) supervisor-infos)
+ existing-slots (assigned-slots storm-cluster-state)
+ ]
+ [(map-val :hostname supervisor-infos)
+ (mapcat
+ (fn [[id slots]]
+ (for [s (set/difference slots (existing-slots id))]
+ [id s]))
+ all-slots)
+ ]))
+
+(defn state-spout-parallelism [state-spout-spec]
+ (-> state-spout-spec .get_common thrift/parallelism-hint))
+
+(defn- spout-parallelism [spout-spec]
+ (if (.is_distributed spout-spec)
+ (-> spout-spec .get_common thrift/parallelism-hint)
+ 1 ))
+
+(defn bolt-parallelism [bolt-spec]
+ (let [hint (-> bolt-spec .get_common thrift/parallelism-hint)
+ fully-global? (every?
+ thrift/global-grouping?
+ (vals (.get_inputs bolt-spec)))]
+ (if fully-global?
+ 1
+ hint
+ )))
+
+(defn- optimize-topology [topology]
+ ;; TODO: create new topology by collapsing bolts into CompoundSpout
+ ;; and CompoundBolt
+ ;; need to somehow maintain stream/component ids inside tuples
+ topology)
+
+(defn mk-task-maker [max-parallelism parallelism-func id-counter]
+ (fn [[component-id spec]]
+ (let [parallelism (parallelism-func spec)
+ parallelism (if max-parallelism (min parallelism max-parallelism) parallelism)
+ num-tasks (max 1 parallelism)]
+ (for-times num-tasks
+ [(id-counter) component-id])
+ )))
+
+(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
+ (let [stormroot (master-stormdist-root conf storm-id)]
+ (FileUtils/forceMkdir (File. stormroot))
+ (FileUtils/cleanDirectory (File. stormroot))
+ (setup-jar conf tmp-jar-location stormroot)
+ (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
+ (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
+ ))
+
+
+(defn- read-storm-conf [conf storm-id]
+ (let [stormroot (master-stormdist-root conf storm-id)]
+ (merge conf
+ (Utils/deserialize
+ (FileUtils/readFileToByteArray
+ (File. (master-stormconf-path stormroot))
+ )))))
+
+(defn- read-storm-topology [conf storm-id]
+ (let [stormroot (master-stormdist-root conf storm-id)]
+ (Utils/deserialize
+ (FileUtils/readFileToByteArray
+ (File. (master-stormcode-path stormroot))
+ ))))
+
+
+(defn max-message-timeout-time [conf storm-ids]
+ (apply max
+ (for [id storm-ids]
+ ((read-storm-conf conf id) TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+ )))
+
+
+(defn task-dead? [conf storm-cluster-state storm-id task-id]
+ (let [info (.task-heartbeat storm-cluster-state storm-id task-id)]
+ (or (not info)
+ (> (time-delta (:time-secs info))
+ (conf NIMBUS-TASK-TIMEOUT-SECS)))
+ ))
+
+;; public so it can be mocked in tests
+(defn mk-task-component-assignments [conf storm-id]
+ (let [storm-conf (read-storm-conf conf storm-id)
+ max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
+ topology (read-storm-topology conf storm-id)
+ slots-to-use (storm-conf TOPOLOGY-WORKERS)
+ counter (mk-counter)
+ tasks (concat
+ (mapcat (mk-task-maker max-parallelism bolt-parallelism counter)
+ (.get_bolts topology))
+ (mapcat (mk-task-maker max-parallelism spout-parallelism counter)
+ (.get_spouts topology))
+ (mapcat (mk-task-maker max-parallelism state-spout-parallelism counter)
+ (.get_state_spouts topology))
+ (repeatedly (storm-conf TOPOLOGY-ACKERS)
+ (fn [] [(counter) ACKER-COMPONENT-ID]))
+ )]
+ (into {}
+ tasks)
+ ))
+
+(defn- setup-storm-static [conf storm-id storm-cluster-state]
+ (doseq [[task-id component-id] (mk-task-component-assignments conf storm-id)]
+ (.set-task! storm-cluster-state storm-id task-id (TaskInfo. component-id))
+ ))
+
+
+;; Does not assume that clocks are synchronized. Task heartbeat is only used so that
+;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
+;; tracked through task-heartbeat-cache
+(defn- alive-tasks [conf storm-id storm-cluster-state task-ids task-start-times task-heartbeats-cache]
+ (doall
+ (filter
+ (fn [task-id]
+ (let [heartbeat (.task-heartbeat storm-cluster-state storm-id task-id)
+ reported-time (:time-secs heartbeat)
+ {last-nimbus-time :nimbus-time
+ last-reported-time :task-reported-time} (get-in @task-heartbeats-cache
+ [storm-id task-id])
+ task-start-time (get task-start-times task-id)
+ nimbus-time (if (or (not last-nimbus-time)
+ (not= last-reported-time reported-time))
+ (current-time-secs)
+ last-nimbus-time
+ )
+ ]
+ (swap! task-heartbeats-cache
+ assoc-in [storm-id task-id]
+ {:nimbus-time nimbus-time
+ :task-reported-time reported-time})
+ (if (and task-start-time
+ (or
+ (< (time-delta task-start-time)
+ (conf NIMBUS-TASK-LAUNCH-SECS))
+ (not nimbus-time)
+ (< (time-delta nimbus-time)
+ (conf NIMBUS-TASK-TIMEOUT-SECS))
+ ))
+ true
+ (do
+ (log-message "Task " storm-id ":" task-id " timed out")
+ false)
+ )))
+ task-ids
+ )))
+
+(defn- keeper-slots [existing-slots num-task-ids num-workers]
+ (if (= 0 num-workers)
+ {}
+ (let [distribution (atom (integer-divided num-task-ids num-workers))
+ keepers (atom {})]
+ (doseq [[node+port task-list] existing-slots :let [task-count (count task-list)]]
+ (when (pos? (get @distribution task-count 0))
+ (swap! keepers assoc node+port task-list)
+ (swap! distribution update-in [task-count] dec)
+ ))
+ @keepers
+ )))
+
+
+(defn sort-slots [all-slots]
+ (let [split-up (vals (group-by first all-slots))]
+ (apply interleave-all split-up)
+ ))
+
+;; NEW NOTES
+;; only assign to supervisors who are there and haven't timed out
+;; need to reassign workers with tasks that have timed out (will this make it brittle?)
+;; need to read in the topology and storm-conf from disk
+;; if no slots available and no slots used by this storm, just skip and do nothing
+;; otherwise, package rest of tasks into available slots (up to how much it needs)
+
+;; in the future could allocate tasks intelligently (so that "close" tasks reside on same machine)
+
+
+;; TODO: slots that have dead task should be reused as long as supervisor is active
+
+;; public so it can be mocked out
+(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state available-slots callback task-heartbeats-cache]
+ (let [existing-assigned (reverse-map (:task->node+port existing-assignment))
+ storm-conf (read-storm-conf conf storm-id)
+ all-task-ids (set (.task-ids storm-cluster-state storm-id))
+ alive-ids (set (alive-tasks conf storm-id storm-cluster-state
+ all-task-ids (:task->start-time-secs existing-assignment) task-heartbeats-cache))
+ alive-assigned (filter-val (partial every? alive-ids) existing-assigned)
+ alive-node-ids (map first (keys alive-assigned))
+ total-slots-to-use (min (storm-conf TOPOLOGY-WORKERS)
+ (+ (count available-slots) (count alive-assigned)))
+ keep-assigned (keeper-slots alive-assigned (count all-task-ids) total-slots-to-use)
+ freed-slots (keys (apply dissoc alive-assigned (keys keep-assigned)))
+ reassign-slots (take (- total-slots-to-use (count keep-assigned))
+ (sort-slots (concat available-slots freed-slots)))
+ reassign-ids (sort (set/difference all-task-ids (set (apply concat (vals keep-assigned)))))
+ reassignment (into {}
+ (map vector
+ reassign-ids
+ ;; for some reason it goes into infinite loop without limiting the repeat-seq
+ (repeat-seq (count reassign-ids) reassign-slots)))
+ stay-assignment (into {} (mapcat (fn [[node+port task-ids]] (for [id task-ids] [id node+port])) keep-assigned))]
+ (when-not (empty? reassignment)
+ (log-message "Reassigning " storm-id " to " total-slots-to-use " slots")
+ (log-message "Reassign ids: " (vec reassign-ids))
+ (log-message "Available slots: " (pr-str available-slots))
+ )
+ (merge stay-assignment reassignment)
+ ))
+
+
+(defn changed-ids [task->node+port new-task->node+port]
+ (let [slot-assigned (reverse-map task->node+port)
+ new-slot-assigned (reverse-map new-task->node+port)
+ brand-new-slots (map-diff slot-assigned new-slot-assigned)]
+ (apply concat (vals brand-new-slots))
+ ))
+
+;; get existing assignment (just the task->node+port map) -> default to {}
+;; filter out ones which have a task timeout
+;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many tasks should be in each slot (e.g., 4, 4, 4, 5)
+;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
+;; edge case for slots with no task timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the task will timeout and won't assign here next time around
+(defn- mk-assignments [conf storm-id storm-cluster-state callback task-heartbeats-cache]
+ (log-debug "Determining assignment for " storm-id)
+ (let [existing-assignment (.assignment-info storm-cluster-state storm-id nil)
+ [node->host available-slots] (available-slots conf storm-cluster-state callback)
+ task->node+port (compute-new-task->node+port conf storm-id existing-assignment
+ storm-cluster-state available-slots callback task-heartbeats-cache)
+ all-node->host (merge (:node->host existing-assignment) node->host)
+ reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)
+ now-secs (current-time-secs)
+ start-times (merge (:task->start-time-secs existing-assignment)
+ (into {}
+ (for [id reassign-ids]
+ [id now-secs]
+ )))
+
+ assignment (Assignment.
+ (master-stormdist-root conf storm-id)
+ (select-keys all-node->host (map first (vals task->node+port)))
+ task->node+port
+ start-times
+ )
+ ]
+ ;; tasks figure out what tasks to talk to by looking at topology at runtime
+ ;; only log/set when there's been a change to the assignment
+ (if (= existing-assignment assignment)
+ (log-debug "Assignment for " storm-id " hasn't changed")
+ (do
+ (log-message "Setting new assignment for storm id " storm-id ": " (pr-str assignment))
+ (.set-assignment! storm-cluster-state storm-id assignment)
+ ))
+ ))
+
+(defn- start-storm [storm-name storm-cluster-state storm-id]
+ (log-message "Activating " storm-name ": " storm-id)
+ (.activate-storm! storm-cluster-state storm-id (StormBase. storm-name (current-time-secs)))
+ )
+
+;; Master:
+;; job submit:
+;; 1. read which nodes are available
+;; 2. set up the worker/{storm}/{task} stuff (static)
+;; 3. set assignments
+;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
+
+(defn storm-active? [storm-cluster-state storm-name]
+ (not-nil? (get-storm-id storm-cluster-state storm-name)))
+
+(defn inactive-storm-ids [storm-cluster-state]
+ (let [assigned-ids (set (.assignments storm-cluster-state nil))
+ active-ids (set (.active-storms storm-cluster-state))]
+ (set/difference assigned-ids active-ids)
+ ))
+
+(defn cleanup-storm-ids [conf storm-cluster-state]
+ (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
+ error-ids (set (.task-error-storms storm-cluster-state))
+ assigned-ids (set (.assignments storm-cluster-state nil))
+ storm-ids (set/difference (set/union heartbeat-ids error-ids) assigned-ids)]
+ (filter
+ (fn [storm-id]
+ (every?
+ (partial task-dead? conf storm-cluster-state storm-id)
+ (.heartbeat-tasks storm-cluster-state storm-id)
+ ))
+ storm-ids
+ )))
+
+(defn validate-topology! [topology]
+ (let [bolt-ids (keys (.get_bolts topology))
+ spout-ids (keys (.get_spouts topology))
+ state-spout-ids (keys (.get_state_spouts topology))
+ common (any-intersection bolt-ids spout-ids state-spout-ids)]
+ (when-not (empty? common)
+ (throw
+ (InvalidTopologyException.
+ (str "Cannot use same component id for both spout and bolt: " (vec common))
+ )))
+ (when-not (every? #(> % 0) (concat bolt-ids spout-ids state-spout-ids))
+ (throw
+ (InvalidTopologyException.
+ "All component ids must be positive")))
+ ;; TODO: validate that every declared stream is positive
+ ))
+
+(defn file-cache-map [conf]
+ (TimeCacheMap.
+ (int (conf NIMBUS-FILE-COPY-EXPIRATION-SECS))
+ (reify TimeCacheMap$ExpiredCallback
+ (expire [this id stream]
+ (.close stream)
+ ))
+ ))
+
+(defserverfn service-handler [conf]
+ (let [submitted-count (atom 0)
+ active (atom true)
+ conf (merge (read-storm-config) conf) ;; useful when testing
+ storm-cluster-state (cluster/mk-storm-cluster-state conf)
+ [event-manager cleanup-manager :as managers] [(event/event-manager false) (event/event-manager false)]
+ inbox (master-inbox conf)
+ storm-submit-lock (Object.)
+ task-heartbeats-cache (atom {}) ; map from storm id -> task id -> {:nimbus-time :task-reported-time}
+ downloaders (file-cache-map conf)
+ uploaders (file-cache-map conf)
+ uptime (uptime-computer)
+
+ cleanup-fn (fn []
+ (let [to-kill-ids (locking storm-submit-lock (inactive-storm-ids storm-cluster-state))]
+ (when-not (empty? to-kill-ids)
+ (let [sleep-amt (max-message-timeout-time conf to-kill-ids)]
+ (log-message "Waiting for " sleep-amt " seconds to kill topologies " (pr-str to-kill-ids))
+ ;; sleep to let the storm finish processing whatever messages are still inside it
+ (sleep-secs sleep-amt)
+ (doseq [id to-kill-ids]
+ ;; technically a supervisor could still think there's an assignment and try to d/l
+ ;; this will cause supervisor to go down and come back up... eventually it should sync
+ ;; TODO: removing code locally should be done separately (since topology that doesn't start will still have code)
+ (rmr (master-stormdist-root conf id))
+ (.remove-storm! storm-cluster-state id))
+ (log-message "Killed topologies: " to-kill-ids))))
+ (let [to-cleanup-ids (locking storm-submit-lock (cleanup-storm-ids conf storm-cluster-state))]
+ (when-not (empty? to-cleanup-ids)
+ (doseq [id to-cleanup-ids]
+ (.teardown-heartbeats! storm-cluster-state id)
+ (.teardown-task-errors! storm-cluster-state id)
+ (swap! task-heartbeats-cache dissoc id)
+ )
+ (log-message "Cleaned up topology task heartbeats: " (pr-str to-cleanup-ids))
+ )))
+ reassign-fn (fn this []
+ (when (conf NIMBUS-REASSIGN)
+ (locking storm-submit-lock
+ (let [callback (fn [& ignored] (.add event-manager this))
+ active-storm-ids (.active-storms storm-cluster-state)]
+ (doseq [storm-id active-storm-ids]
+ (let [base (.storm-base storm-cluster-state storm-id nil)]
+ (mk-assignments conf storm-id storm-cluster-state callback task-heartbeats-cache)))
+ ))))
+ threads [(async-loop
+ (fn []
+ (.add event-manager reassign-fn)
+ (.add cleanup-manager cleanup-fn)
+ (when @active (conf NIMBUS-MONITOR-FREQ-SECS))
+ ))
+ ]]
+
+ (reify Nimbus$Iface
+ (^void submitTopology
+ [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
+ (when (storm-active? storm-cluster-state storm-name)
+ (throw (AlreadyAliveException. storm-name)))
+ (validate-topology! topology)
+ (swap! submitted-count inc)
+ (let [storm-id (str storm-name "-" @submitted-count "-" (current-time-secs))
+ storm-conf (from-json serializedConf)
+ storm-conf (assoc storm-conf STORM-ID storm-id)
+
+ total-storm-conf (merge conf storm-conf)
+ topology (if (total-storm-conf TOPOLOGY-OPTIMIZE) (optimize-topology topology) topology)]
+ (log-message "Received topology submission for " storm-name " with conf " storm-conf)
+ (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
+ ;; protects against multiple storms being submitted at once and cleanup thread killing storm in b/w
+ ;; assignment and starting the storm
+ (locking storm-submit-lock
+ (.setup-heartbeats! storm-cluster-state storm-id)
+ (setup-storm-static conf storm-id storm-cluster-state)
+ (mk-assignments conf storm-id storm-cluster-state (fn [& ignored] (.add event-manager reassign-fn)) task-heartbeats-cache)
+ (start-storm storm-name storm-cluster-state storm-id))
+ ))
+
+ (^void killTopology [this ^String storm-name]
+ (let [storm-id (get-storm-id storm-cluster-state storm-name)]
+ (when-not storm-id
+ (throw (NotAliveException. storm-name)))
+ (.deactivate-storm! storm-cluster-state storm-id)
+ (.add cleanup-manager cleanup-fn)
+ (log-message "Deactivated " storm-name " and scheduled to be killed")
+ ))
+
+ (beginFileUpload [this]
+ (let [fileloc (str inbox "/stormjar-" (uuid) ".jar")]
+ (.put uploaders fileloc (FileOutputStream. fileloc))
+ (log-message "Uploading file from client to " fileloc)
+ fileloc
+ ))
+
+ (^void uploadChunk [this ^String location ^bytes chunk]
+ (let [^FileOutputStream os (.get uploaders location)]
+ (when-not os
+ (throw (RuntimeException.
+ "File for that location does not exist (or timed out)")))
+ (.write os chunk)
+ (.put uploaders location os)
+ ))
+
+ (^void finishFileUpload [this ^String location]
+ (let [^FileOutputStream os (.get uploaders location)]
+ (when-not os
+ (throw (RuntimeException.
+ "File for that location does not exist (or timed out)")))
+ (.close os)
+ (log-message "Finished uploading file from client: " location)
+ (.remove uploaders location)
+ ))
+
+ (^String beginFileDownload [this ^String file]
+ (let [is (BufferFileInputStream. file)
+ id (uuid)]
+ (.put downloaders id is)
+ id
+ ))
+
+ (^bytes downloadChunk [this ^String id]
+ (let [^BufferFileInputStream is (.get downloaders id)]
+ (when-not is
+ (throw (RuntimeException.
+ "Could not find input stream for that id")))
+ (let [ret (.read is)]
+ (.put downloaders id is)
+ (when (empty? ret)
+ (.remove downloaders id))
+ ret
+ )))
+
+ (^String getTopologyConf [this ^String id]
+ (to-json (read-storm-conf conf id)))
+
+ (^StormTopology getTopology [this ^String id]
+ (read-storm-topology conf id))
+
+ (^ClusterSummary getClusterInfo [this]
+ (let [assigned (assigned-slots storm-cluster-state)
+ supervisor-infos (all-supervisor-info storm-cluster-state)
+ supervisor-summaries (dofor [[id info] supervisor-infos]
+ (let [ports (set (:worker-ports info))
+ ]
+ (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (assigned id)))
+ ))
+ nimbus-uptime (uptime)
+ bases (topology-bases storm-cluster-state)
+ topology-summaries (dofor [[id base] bases]
+ (let [assignment (.assignment-info storm-cluster-state id nil)]
+ (TopologySummary. id
+ (:storm-name base)
+ (-> (:task->node+port assignment)
+ keys
+ count)
+ (-> (:task->node+port assignment)
+ vals
+ set
+ count)
+ (time-delta (:launch-time-secs base))
+ )
+ ))
+ ]
+ (ClusterSummary. supervisor-summaries
+ nimbus-uptime
+ topology-summaries)
+ ))
+
+ (^TopologyInfo getTopologyInfo [this ^String storm-id]
+ (let [task-info (storm-task-info storm-cluster-state storm-id)
+ base (.storm-base storm-cluster-state storm-id nil)
+ assignment (.assignment-info storm-cluster-state storm-id nil)
+ task-summaries (dofor [[task component] task-info]
+ (let [[node port] (get-in assignment [:task->node+port task])
+ host (-> assignment :node->host (get node))
+ heartbeat (.task-heartbeat storm-cluster-state storm-id task)
+ errors (.task-errors storm-cluster-state storm-id task)
+ errors (dofor [e errors] (ErrorInfo. (:error e) (:time-secs e)))
+ stats (:stats heartbeat)
+ stats (if stats
+ (stats/thriftify-task-stats stats))]
+ (doto
+ (TaskSummary. task
+ component
+ host
+ port
+ (nil-to-zero
+ (:uptime-secs heartbeat))
+ errors
+ )
+ (.set_stats stats))
+ ))
+ ]
+ (TopologyInfo. storm-id
+ (:storm-name base)
+ (time-delta (:launch-time-secs base))
+ task-summaries
+ )
+ ))
+