Browse files

Version 1.0.0

  • Loading branch information...
0 parents commit fb261e8950659837b9839c6f12f5407cbcd0441e @cpettitt cpettitt committed Sep 18, 2012
Showing with 15,062 additions and 0 deletions.
  1. +6 −0 .gitignore
  2. +1 −0 .reviewboardrc
  3. +142 −0 CHANGELOG
  4. +255 −0 LICENSE
  5. +74 −0 README.md
  6. +199 −0 build.xml
  7. +60 −0 example/com/linkedin/parseq/example/common/AbstractExample.java
  8. +44 −0 example/com/linkedin/parseq/example/common/ErrorMockRequest.java
  9. +91 −0 example/com/linkedin/parseq/example/common/ExampleUtil.java
  10. +27 −0 example/com/linkedin/parseq/example/common/MockRequest.java
  11. +58 −0 example/com/linkedin/parseq/example/common/MockService.java
  12. +42 −0 example/com/linkedin/parseq/example/common/SimpleMockRequest.java
  13. +185 −0 example/com/linkedin/parseq/example/composite/MergeSortExample.java
  14. +165 −0 example/com/linkedin/parseq/example/composite/TimeBoundSearchExample.java
  15. +121 −0 example/com/linkedin/parseq/example/composite/TwoStageFanoutExample.java
  16. +27 −0 example/com/linkedin/parseq/example/composite/classifier/Classification.java
  17. +26 −0 example/com/linkedin/parseq/example/composite/classifier/Classifier.java
  18. +73 −0 example/com/linkedin/parseq/example/composite/classifier/ClassifierDriver.java
  19. +209 −0 example/com/linkedin/parseq/example/composite/classifier/ClassifierPlanFactory.java
  20. +38 −0 example/com/linkedin/parseq/example/composite/classifier/ConnectedClassifier.java
  21. +36 −0 example/com/linkedin/parseq/example/composite/classifier/DefaultClassifier.java
  22. +58 −0 example/com/linkedin/parseq/example/composite/classifier/Network.java
  23. +40 −0 example/com/linkedin/parseq/example/composite/classifier/NetworkClassifier.java
  24. +38 −0 example/com/linkedin/parseq/example/composite/classifier/SelfClassifier.java
  25. +43 −0 example/com/linkedin/parseq/example/composite/classifier/TruthMapClassifier.java
  26. +27 −0 example/com/linkedin/parseq/example/composite/classifier/client/Client.java
  27. +31 −0 example/com/linkedin/parseq/example/composite/classifier/client/Request.java
  28. +40 −0 example/com/linkedin/parseq/example/composite/classifier/client/impl/AbstractRequest.java
  29. +68 −0 example/com/linkedin/parseq/example/composite/classifier/client/impl/ClientImpl.java
  30. +44 −0 example/com/linkedin/parseq/example/composite/classifier/client/impl/GetNetworkRequest.java
  31. +57 −0 example/com/linkedin/parseq/example/composite/classifier/client/impl/TruthMapRequest.java
  32. +81 −0 example/com/linkedin/parseq/example/simple/BranchExecutedExample.java
  33. +80 −0 example/com/linkedin/parseq/example/simple/BranchSkippedExample.java
  34. +51 −0 example/com/linkedin/parseq/example/simple/ErrorPropagationExample.java
  35. +51 −0 example/com/linkedin/parseq/example/simple/ErrorRecoveryExample.java
  36. +51 −0 example/com/linkedin/parseq/example/simple/FanInExample.java
  37. +42 −0 example/com/linkedin/parseq/example/simple/FanOutExample.java
  38. +44 −0 example/com/linkedin/parseq/example/simple/TimeoutWithErrorExample.java
  39. +6 −0 ivy-tracevis.xml
  40. +25 −0 ivy.xml
  41. +37 −0 pub-version
  42. +44 −0 src/com/linkedin/parseq/ActionTask.java
  43. +34 −0 src/com/linkedin/parseq/After.java
  44. +419 −0 src/com/linkedin/parseq/BaseTask.java
  45. +48 −0 src/com/linkedin/parseq/CallableTask.java
  46. +36 −0 src/com/linkedin/parseq/Cancellable.java
  47. +64 −0 src/com/linkedin/parseq/Context.java
  48. +37 −0 src/com/linkedin/parseq/DelayedExecutor.java
  49. +48 −0 src/com/linkedin/parseq/EarlyFinishException.java
  50. +225 −0 src/com/linkedin/parseq/Engine.java
  51. +145 −0 src/com/linkedin/parseq/EngineBuilder.java
  52. +30 −0 src/com/linkedin/parseq/EngineShutdownException.java
  53. +66 −0 src/com/linkedin/parseq/MultiException.java
  54. +44 −0 src/com/linkedin/parseq/ParTask.java
  55. +149 −0 src/com/linkedin/parseq/ParTaskImpl.java
  56. +29 −0 src/com/linkedin/parseq/Priority.java
  57. +90 −0 src/com/linkedin/parseq/SeqTask.java
  58. +110 −0 src/com/linkedin/parseq/Task.java
  59. +39 −0 src/com/linkedin/parseq/TaskLog.java
  60. +159 −0 src/com/linkedin/parseq/Tasks.java
  61. +88 −0 src/com/linkedin/parseq/TimeoutWithErrorTask.java
  62. +38 −0 src/com/linkedin/parseq/internal/AfterPromises.java
  63. +42 −0 src/com/linkedin/parseq/internal/ArgumentUtil.java
  64. +40 −0 src/com/linkedin/parseq/internal/CancellableScheduledFuture.java
  65. +247 −0 src/com/linkedin/parseq/internal/ContextImpl.java
  66. +100 −0 src/com/linkedin/parseq/internal/FIFOPriorityQueue.java
  67. +71 −0 src/com/linkedin/parseq/internal/InternalUtil.java
  68. +25 −0 src/com/linkedin/parseq/internal/Prioritizable.java
  69. +24 −0 src/com/linkedin/parseq/internal/PrioritizableRunnable.java
  70. +86 −0 src/com/linkedin/parseq/internal/SerialExecutor.java
  71. +158 −0 src/com/linkedin/parseq/internal/TaskLogImpl.java
  72. +48 −0 src/com/linkedin/parseq/promise/CountDownPromiseListener.java
  73. +87 −0 src/com/linkedin/parseq/promise/DelegatingPromise.java
  74. +125 −0 src/com/linkedin/parseq/promise/Promise.java
  75. +37 −0 src/com/linkedin/parseq/promise/PromiseException.java
  76. +37 −0 src/com/linkedin/parseq/promise/PromiseListener.java
  77. +34 −0 src/com/linkedin/parseq/promise/PromiseResolvedException.java
  78. +35 −0 src/com/linkedin/parseq/promise/PromiseUnresolvedException.java
  79. +81 −0 src/com/linkedin/parseq/promise/Promises.java
  80. +47 −0 src/com/linkedin/parseq/promise/PropagateResultListener.java
  81. +41 −0 src/com/linkedin/parseq/promise/SettablePromise.java
  82. +169 −0 src/com/linkedin/parseq/promise/SettablePromiseImpl.java
  83. +81 −0 src/com/linkedin/parseq/trace/Related.java
  84. +33 −0 src/com/linkedin/parseq/trace/Relationship.java
  85. +68 −0 src/com/linkedin/parseq/trace/RelationshipBuilder.java
  86. +50 −0 src/com/linkedin/parseq/trace/ResultType.java
  87. +202 −0 src/com/linkedin/parseq/trace/ShallowTrace.java
  88. +177 −0 src/com/linkedin/parseq/trace/ShallowTraceBuilder.java
  89. +102 −0 src/com/linkedin/parseq/trace/Trace.java
  90. +36 −0 src/com/linkedin/parseq/trace/TraceBuilder.java
  91. +142 −0 src/com/linkedin/parseq/trace/TraceBuilderImpl.java
  92. +115 −0 src/com/linkedin/parseq/trace/TraceImpl.java
  93. +251 −0 src/com/linkedin/parseq/trace/TraceRelationshipBuilder.java
  94. +46 −0 src/com/linkedin/parseq/trace/codec/TraceCodec.java
  95. +80 −0 src/com/linkedin/parseq/trace/codec/json/JsonTraceCodec.java
  96. +135 −0 src/com/linkedin/parseq/trace/codec/json/JsonTraceDeserializer.java
  97. +145 −0 src/com/linkedin/parseq/trace/codec/json/JsonTraceSerializer.java
  98. +82 −0 test/com/linkedin/parseq/BaseEngineTest.java
  99. +158 −0 test/com/linkedin/parseq/EngineTest.java
  100. +317 −0 test/com/linkedin/parseq/ListLogger.java
  101. +54 −0 test/com/linkedin/parseq/ListLoggerFactory.java
  102. +273 −0 test/com/linkedin/parseq/TestContext.java
  103. +272 −0 test/com/linkedin/parseq/TestTaskLogging.java
  104. +439 −0 test/com/linkedin/parseq/TestTaskStates.java
  105. +682 −0 test/com/linkedin/parseq/TestTasks.java
  106. +98 −0 test/com/linkedin/parseq/TestUtil.java
  107. +25 −0 test/com/linkedin/parseq/ThrowingRunnable.java
  108. +125 −0 test/com/linkedin/parseq/internal/TestFIFOPriorityQueue.java
  109. +103 −0 test/com/linkedin/parseq/promise/TestDelegatingPromise.java
  110. +85 −0 test/com/linkedin/parseq/promise/TestPromises.java
  111. +326 −0 test/com/linkedin/parseq/promise/TestSettablePromise.java
  112. +115 −0 test/com/linkedin/parseq/trace/TestRelationshipBuilder.java
  113. +88 −0 test/com/linkedin/parseq/trace/TestShallowTraceBuilder.java
  114. +530 −0 test/com/linkedin/parseq/trace/TestTaskToTrace.java
  115. +238 −0 test/com/linkedin/parseq/trace/TestTraceBuilder.java
  116. +509 −0 test/com/linkedin/parseq/trace/codec/json/TestJsonTraceCodec.java
  117. +4 −0 test/log4j.properties
  118. +17 −0 tools/tracevis/Makefile
  119. +22 −0 tools/tracevis/README
  120. +51 −0 tools/tracevis/css/table.css
  121. +40 −0 tools/tracevis/css/waterfall.css
  122. +19 −0 tools/tracevis/index.js
  123. +297 −0 tools/tracevis/js/graphviz.js
  124. +112 −0 tools/tracevis/js/table.js
  125. +392 −0 tools/tracevis/js/trace.js
  126. +164 −0 tools/tracevis/js/waterfall.js
  127. +32 −0 tools/tracevis/lib/d3.v2.min.js
  128. +662 −0 tools/tracevis/lib/dig.js
  129. +21 −0 tools/tracevis/lib/dig.min.js
  130. +20 −0 tools/tracevis/package.json
  131. +286 −0 tools/tracevis/test/graphviz-test.js
  132. +81 −0 tools/tracevis/test/testUtil.js
  133. +251 −0 tools/tracevis/test/trace-test.js
  134. +139 −0 tools/tracevis/trace.html
  135. +1 −0 version.properties
6 .gitignore
@@ -0,0 +1,6 @@
+.idea
+*.iml
+build
+dist
+out
+node_modules
1 .reviewboardrc
@@ -0,0 +1 @@
+REPOSITORY="parseq"
142 CHANGELOG
@@ -0,0 +1,142 @@
+v1.0.0
+======
+
+ * Minor performance improvements.
+
+v0.4.5
+======
+
+ * Add visualization support to hide all parent tasks.
+ * PromiseListener.onResolved(..) now has the Promise that was completed as a
+ parameter.
+
+v0.4.4
+======
+
+ * Trace output now contains relationship of potential parents. A potential
+ parent relationship is defined as a parent task running a child task that has
+ already been completed.
+ * Dag visualization now includes:
+ * Dash edge between a potential parent to an potential child task.
+ * Dash edge between a predecessor to a successor task that has been
+ completed.
+ * Solid edge between a potential parent to an potential child sink task.
+
+v0.4.3
+======
+
+ * No changes
+
+v0.4.2
+======
+
+ * Dist target now includes a tarball for tracevis.
+ * Added support for system hidden task with support for visualization. User
+ defined hidden task should use ShallowTrace.setHidden(...).
+
+v0.4.1
+======
+
+ * Misc code hygiene improvements
+
+v0.4.0
+======
+
+ * BACKWARD INCOMPATIBLE:
+ * Removed Tasks.async. Use BaseTask instead.
+ * Renamed Tasks.sync to Tasks.callable.
+ * Removed Tasks.value which was only used for test purposes.
+ * Removed getStartNanos() and getEndNanos() from Task. Instead use
+ Task.getShallowTrace() which returns a ShallowTrace that contains the
+ getStartNanos() and getEndNanos().
+ * Engine creation has changed:
+ * Replace:
+ new Engine(taskExecutor, timerExecutor)
+ * With:
+ new EngineBuilder()
+ .setTaskExecutor(taskExecutor)
+ .setTimerExecutor(timerExecutor)
+ .build();
+ * Engine.awaitTermination(...) provides a mechanism to wait for the engine
+ to shutdown.
+ * Visualization improvements / fixes:
+ * Table: use a textarea for values
+ * Waterfall: use nanosecond precision for laying out bars
+ * Don't include value.toString() in task name for value task.
+ * Logging
+ * We now provide three loggers to collect task information at runtime:
+ * com.linkedin.parseq.Engine:all - logs all tasks
+ * com.linkedin.parseq.Engine:root - logs root tasks only
+ * com.linkedin.parseq.Engine:planClass=xyz - When xyz is a root
+ class it and all of its descendants are logged.
+ * We provide two log levels with these loggers:
+ * DEBUG - logs task name and result type
+ * TRACE - logs task name, result type, and value
+ * Added Tasks.seq to support Iterable<Tasks<?>> as a parameter.
+ * Added support for hidden trace to indicate if it should be displayed in
+ the visualization.
+ * Added attributes to trace such that additional information can be added.
+ * Added TraceBuilder to support customize Trace information.
+
+v0.3.0
+======
+
+ * Added support for priorities to Tasks. Task priorities only influence
+ ordering in a particular context.
+ * Remove the existing waterfall trace visualization and replace it with a
+ more scalable and interactive javascript visualization.
+ * Rename BaseTask.run(...) to BaseTask.contextRun(...) and
+ BaseTask.doRun(...) to BaseTask.run(...) to better match the method
+ purpose.
+ * Replace existing trace printers with javascript based equivalents.
+ * Dag trace now includes the start time of the task.
+ * Added par(Iterable<Task<T>> tasks) to the Tasks class. The new par(...) will
+ return the result of each of the supplied tasks. The ParTask will fail
+ if any of the executed task fails. Additional methods are available for
+ ParTask:
+ * getTasks() for the set of tasks related to ParTask.
+ * getSuccessful() to get the values of successfully executed tasks.
+ * Updated par(...) to return ParTask.
+
+v0.2.1
+======
+
+ * Misc code hygiene improvements
+
+v0.2.0
+======
+
+ * Trace improvements
+ * Rename TaskTrace to Trace
+ * Move JsonTraceCodec to com.linkedin.parseq.trace.codec.json
+ * Move trace printers to com.linkedin.parseq.trace.printer
+ * More compact JSON serialization for traces:
+ * Don't include null or empty fields
+ * Render each trace once
+ * Set up edges independently of the traces
+ * Tasks now use null to indicate no value for startNanos and endNanos
+ * Traces are now fully immutable and thread-safe. Use TraceBuilders to
+ create new traces or to copy and edit existing ones.
+
+ * Task construction improvments
+ * Remove TaskDefs (use Tasks.action, Tasks.sync, Tasks.async, and new
+ BaseTask()) in their place
+ * Added ValueTask - returns a pre-determined value upon execution
+ * Remove Named. The Tasks factory methods take a name and Task has a
+ constructor that takes a name. If the empty Task constructor is used
+ then getName() will return the value of toString() unless it has been
+ overridden.
+
+v0.1.0
+======
+
+ * Promises and Tasks now take Throwables instead of Exceptions. This
+ provides better interoperability with frameworks like Pegasus and Play.
+ * Trace creation has changed:
+ * Old: TaskTraces.convertToTaskTrace(trace)
+ * New: task.getTrace()
+ * Remove/rename "assembler"
+ * Move com.linkedin.parseq.assembler.trace to com.linkedin.parseq.trace
+ * Move all classes in com.linkedin.parseq.assember to com.linkedin.parseq
+ * Rename Assembler to Engine
+ * Replace references to "assembler" with "engine"
255 LICENSE
@@ -0,0 +1,255 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2012 LinkedIn Corporation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------------------------------------------------------------------------------
+License for d3
+------------------------------------------------------------------------------
+Copyright (c) 2012, Michael Bostock
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* The name Michael Bostock may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+------------------------------------------------------------------------------
+License for dig.js
+------------------------------------------------------------------------------
+Copyright (c) 2012 Chris Pettitt
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
74 README.md
@@ -0,0 +1,74 @@
+ParSeq
+======
+
+ParSeq is a framework that makes it easier to write and maintain fast, scalable
+applications in Java.
+
+Some of the key benefits of ParSeq include:
+
+* Parallelization of asynchronous operations (such as IO)
+* Serialized execution for non-blocking computation
+* Code reuse via task composition
+* Simple error propagation and recovery
+* Execution tracing and visualization
+
+The remainder of this README provides a very basic introduction to ParSeq. We
+recommend looking at the ParSeq wiki for more details.
+
+Key Concepts
+------------
+
+In ParSeq, we have a few basic concepts:
+
+* *Promise*: like a Java Future, a Promise allows the user to get the result of
+ an asynchronous computation. However, a Promise allows the user to wait for
+ the result asynchronously instead of requiring a blocking `get` call.
+* *Task*: a Task is an action that can be scheduled with an Engine (see below).
+ Tasks can be sequenced using `seq` and `par` (see below).
+* *par*: composes a group of Tasks that can be executed in parallel.
+* *seq*: composes an ordered list of Tasks that will be executed sequentially.
+* *Engine*: a pool of workers that executes Tasks.
+
+Example Usage
+-------------
+
+For this example, suppose we want to get the home page for a few popular
+browsers in parallel and then combine them. In ParSeq, we would code this up as
+follows:
+
+
+ final Task<String> google = httpClient.fetch("http://www.google.com");
+ final Task<String> bing = httpClient.fetch("http://www.bing.com");
+ final Task<String> yahoo = httpClient.fetch("http://www.yahoo.com");
+
+ final Task<String> combination = new BaseTask<String>() {
+ public Promise<String> run(Context ctx) {
+ String googleStr = google.get();
+ String bingStr = bing.get();
+ String yahooStr = yahoo.get();
+
+ // Build some combination out of the above three strings.
+
+ return result;
+ }
+ };
+
+ Task<String> tasks = Tasks.seq(Tasks.par(google, bing, yahoo),
+ combination);
+ engine.run(tasks);
+
+This will first fetch the URL for various pages in parallel and after they have
+all been retrieved it will combine them using the `combination` task.
+
+For many more examples, please see the `example` module in the source code.
+
+What Next?
+----------
+
+To learn more about ParSeq, please visit our Wiki.
+
+License
+-------
+
+ParSeq is licensed under the terms of the [Apache License, Version
+2.0](http://www.apache.org/licenses/LICENSE-2.0).
199 build.xml
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:ivy="antlib:org.apache.ivy.ant" name="parseq" default="build" basedir=".">
+ <description>ParSeq Project</description>
+
+ <property name="example.dir" value="${basedir}/example" />
+ <property name="src.dir" value="${basedir}/src" />
+ <property name="test.dir" value="${basedir}/test" />
+ <property name="tools.dir" value="${basedir}/tools" />
+ <property name="build.dir" value="${basedir}/build" />
+ <property name="example.build.dir" value="${build.dir}/example" />
+ <property name="src.build.dir" value="${build.dir}/src" />
+ <property name="lib.build.dir" value="${build.dir}/lib" />
+ <property name="test.build.dir" value="${build.dir}/test" />
+ <property name="test.result.dir" value="${test.build.dir}/reports"/>
+ <property name="doc.build.dir" value="${build.dir}/doc"/>
+ <property name="dist.dir" value="${basedir}/dist" />
+
+ <!-- IVY configuration -->
+ <property name="ivy.jar.version" value="2.2.0" />
+ <property name="ivy.jar.file" value="${build.dir}/ivy.jar" />
+ <property name="ivy.jar.url" value="http://repo1.maven.org/maven2/org/apache/ivy/ivy/${ivy.jar.version}/ivy-${ivy.jar.version}.jar" />
+ <!-- This has special meaning for the ivy:retrieve task: it points to the location the JARs will be placed -->
+ <property name="ivy.lib.dir" value="${lib.build.dir}"/>
+
+ <property name="javac.source" value="1.5" />
+ <property name="javac.target" value="1.5" />
+ <!-- Enables/disables inclusion of debug symbols in .class files. -->
+ <property name="debug" value="true" />
+
+ <path id="build.classpath">
+ <pathelement location="${src.build.dir}"/>
+ <fileset dir="${lib.build.dir}/build" includes="*.jar"/>
+ </path>
+
+ <path id="test.classpath">
+ <path refid="build.classpath"/>
+ <pathelement location="${test.build.dir}"/>
+ <fileset dir="${lib.build.dir}/test" includes="*.jar"/>
+ </path>
+
+ <target name="bootstrap" unless="bootstrap.done">
+ <mkdir dir="${lib.build.dir}"/>
+
+ <!-- Pull down IVY and load taskdefs -->
+ <get src="${ivy.jar.url}" dest="${ivy.jar.file}" usetimestamp="true" />
+ <taskdef resource="org/apache/ivy/ant/antlib.xml" uri="antlib:org.apache.ivy.ant">
+ <classpath>
+ <fileset file="${ivy.jar.file}" />
+ </classpath>
+ </taskdef>
+
+ <ivy:resolve file="ivy.xml" conf="bootstrap"/>
+ <ivy:retrieve pattern="${ivy.lib.dir}/[conf]/[artifact].[ext]"/>
+
+ <!-- load testng taskdefs -->
+ <taskdef resource="testngtasks">
+ <classpath>
+ <fileset file="${lib.build.dir}/bootstrap/testng.jar"/>
+ </classpath>
+ </taskdef>
+
+ <property name="bootstrap.done" value="true"/>
+ </target>
+
+ <target name="version-props" unless="version">
+ <loadproperties srcFile="${basedir}/version.properties"/>
+ </target>
+
+ <target name="prepare" depends="bootstrap, version-props">
+ <mkdir dir="${example.build.dir}" />
+ <mkdir dir="${src.build.dir}" />
+ <mkdir dir="${lib.build.dir}" />
+ <mkdir dir="${test.build.dir}" />
+ <mkdir dir="${test.result.dir}"/>
+ <mkdir dir="${doc.build.dir}" />
+ <mkdir dir="${dist.dir}" />
+
+ <ivy:resolve file="ivy.xml" conf="*"/>
+ <ivy:retrieve pattern="${ivy.lib.dir}/[conf]/[artifact].[ext]"/>
+ </target>
+
+ <target name="build-src" depends="prepare" description="Builds src and example files.">
+ <javac includeantruntime="false" destdir="${src.build.dir}" source="${javac.source}" target="${javac.target}" debug="${debug}">
+ <src path="${src.dir}"/>
+ <classpath refid="build.classpath"/>
+ <compilerarg value="-Xlint"/>
+ </javac>
+ <javac includeantruntime="false" destdir="${example.build.dir}" source="${javac.source}" target="${javac.target}" debug="${debug}">
+ <src path="${example.dir}"/>
+ <classpath refid="build.classpath"/>
+ <compilerarg value="-Xlint"/>
+ </javac>
+ </target>
+
+ <target name="jar-src" depends="build-src" description="Creates a JAR for src classes">
+ <make-jar destfile="${dist.dir}/${ant.project.name}-${version}.jar" basedir="${src.build.dir}" />
+ </target>
+
+ <target name="zip-src" depends="prepare" description="Creates a ZIP for the src files">
+ <make-jar destfile="${dist.dir}/${ant.project.name}-${version}-sources.jar" basedir="${src.dir}" />
+ </target>
+
+ <target name="tar-tracevis" depends="prepare" description="Creates a tarball of the tracevis tool">
+ <tar destfile="${dist.dir}/${ant.project.name}-tracevis-${version}.tar.gz" compression="gzip">
+ <tarfileset dir="${tools.dir}/tracevis" prefix="tracevis" />
+ </tar>
+ </target>
+
+ <target name="doc" description="Build the javadoc." depends="build-src">
+ <javadoc destdir="${doc.build.dir}">
+ <packageset dir="${src.dir}"/>
+ <classpath refid="build.classpath"/>
+ </javadoc>
+ </target>
+
+ <target name="zip-doc" depends="doc" description="Creates a ZIP for the javadoc files">
+ <make-jar destfile="${dist.dir}/${ant.project.name}-${version}-javadoc.jar" basedir="${doc.build.dir}" />
+ </target>
+
+ <target name="build-test" depends="build-src" description="Builds test files.">
+ <javac includeantruntime="false" destdir="${test.build.dir}" source="${javac.source}" target="${javac.target}" debug="${debug}">
+ <src path="${test.dir}"/>
+ <classpath refid="test.classpath"/>
+ <compilerarg value="-Xlint"/>
+ </javac>
+ <copy file="${test.dir}/log4j.properties" todir="${test.build.dir}"/>
+ </target>
+
+ <target name="test" depends="test-unit, test-tracevis"
+ description="Runs the unit and integration tests. Set property no.test.tracevis to skip visualization tests."/>
+
+ <target name="test-unit" depends="build-test" description="Runs the unit tests.">
+ <testng haltonfailure="true" outputDir="${test.result.dir}">
+ <classfileset dir="${test.build.dir}" includes="**/Test*.class"/>
+ <classpath refid="test.classpath"/>
+ </testng>
+ </target>
+
+ <target name="build" depends="build-src,build-test" description="Builds everything" />
+
+ <target name="dist" depends="jar-src, zip-src, tar-tracevis, zip-doc"
+ description="Builds all distributables to ${dist.dir}">
+ <ivy:deliver deliverpattern="${dist.dir}/[artifact]-[revision].[ext]" pubrevision="${version}" validate="true"/>
+ <ivy:resolve file="ivy-tracevis.xml" conf="default"/>
+ <ivy:deliver deliverpattern="${dist.dir}/tracevis-[artifact]-[revision].[ext]" pubrevision="${version}" validate="true"/>
+ </target>
+
+ <target name="clean" description="Cleans build and dist dirs">
+ <delete dir="${build.dir}" />
+ <delete dir="${dist.dir}" />
+ <make dir="${tools.dir}/tracevis" >
+ <args>
+ <arg value="clean" />
+ </args>
+ </make>
+ </target>
+
+ <target name="test-tracevis" unless="no.test.tracevis" description="Run visualization tests">
+ <make dir="${tools.dir}/tracevis">
+ <args>
+ <arg value="test" />
+ </args>
+ </make>
+ </target>
+
+ <macrodef name="make-jar">
+ <attribute name="destfile"/>
+ <attribute name="basedir"/>
+ <sequential>
+ <git-sha1 outputproperty="sha1"/>
+ <jar destfile="@{destfile}" basedir="@{basedir}">
+ <manifest>
+ <attribute name="Implementation-Version" value="${sha1}"/>
+ </manifest>
+ </jar>
+ </sequential>
+ </macrodef>
+
+ <macrodef name="make">
+ <attribute name="dir" />
+ <element name="args" />
+ <sequential>
+ <exec executable="make" failonerror="true" dir="@{dir}">
+ <args/>
+ </exec>
+ </sequential>
+ </macrodef>
+
+ <macrodef name="git-sha1">
+ <attribute name="outputproperty"/>
+ <sequential>
+ <exec executable="git" dir="${basedir}" outputproperty="@{outputproperty}">
+ <arg value="show-ref"/>
+ <arg value="-s"/>
+ <arg value="HEAD"/>
+ </exec>
+ </sequential>
+ </macrodef>
+</project>
60 example/com/linkedin/parseq/example/common/AbstractExample.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+import com.linkedin.parseq.Engine;
+import com.linkedin.parseq.EngineBuilder;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public abstract class AbstractExample
+{
+ private volatile ScheduledExecutorService _serviceScheduler;
+
+ public void runExample() throws Exception
+ {
+ _serviceScheduler = Executors.newScheduledThreadPool(2);
+ final int numCores = Runtime.getRuntime().availableProcessors();
+ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
+ final Engine engine = new EngineBuilder()
+ .setTaskExecutor(scheduler)
+ .setTimerScheduler(scheduler)
+ .build();
+ try
+ {
+ doRunExample(engine);
+ }
+ finally
+ {
+ engine.shutdown();
+ scheduler.shutdownNow();
+ _serviceScheduler.shutdown();
+ _serviceScheduler = null;
+ }
+ }
+
+ protected abstract void doRunExample(Engine engine) throws Exception;
+
+ protected <T> MockService<T> getService()
+ {
+ return new MockService<T>(_serviceScheduler);
+ }
+}
44 example/com/linkedin/parseq/example/common/ErrorMockRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class ErrorMockRequest<T> implements MockRequest<T>
+{
+ private final long _latency;
+ private final Exception _error;
+
+ public ErrorMockRequest(final long latency, final Exception error)
+ {
+ _latency = latency;
+ _error = error;
+ }
+
+ public long getLatency()
+ {
+ return _latency;
+ }
+
+ @Override
+ public T getResult() throws Exception
+ {
+ throw _error;
+ }
+
+}
91 example/com/linkedin/parseq/example/common/ExampleUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+import com.linkedin.parseq.BaseTask;
+import com.linkedin.parseq.Context;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.promise.Promise;
+import com.linkedin.parseq.trace.Trace;
+import com.linkedin.parseq.trace.codec.json.JsonTraceCodec;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class ExampleUtil
+{
+ private static final Random RANDOM = new Random();
+ private static final int DEFAULT_LATENCY_MEAN = 100;
+ private static final int DEFAULT_LATENCY_STDDEV = 50;
+ private static final int LATENCY_MIN = 10;
+
+ private ExampleUtil() {}
+
+ public static <RES> Task<RES> callService(final String name,
+ final MockService<RES> service,
+ final MockRequest<RES> request)
+ {
+ return new BaseTask<RES>(name) {
+ @Override
+ protected Promise<RES> run(final Context context) throws Exception
+ {
+ return service.call(request);
+ }
+ };
+ }
+
+ public static Task<String> fetchUrl(final MockService<String> httpClient,
+ final String url)
+ {
+ final long mean = DEFAULT_LATENCY_MEAN;
+ final long stddev = DEFAULT_LATENCY_STDDEV;
+ final long latency = Math.max(LATENCY_MIN, (int)(RANDOM.nextGaussian() * stddev + mean));
+ return callService("fetch[url=" + url + "]", httpClient, new SimpleMockRequest<String>(latency, "HTTP response for " + url));
+ }
+
+ public static Task<String> fetch404Url(final MockService<String> httpClient,
+ final String url)
+ {
+ final long mean = DEFAULT_LATENCY_MEAN;
+ final long stddev = DEFAULT_LATENCY_STDDEV;
+ final long latency = Math.max(LATENCY_MIN, (int)(RANDOM.nextGaussian() * stddev + mean));
+ return callService("fetch[url=" + url + "]", httpClient, new ErrorMockRequest<String>(latency, new Exception("404")));
+ }
+
+ public static void printTracingResults(final Task<?> task)
+ {
+ final Trace trace = task.getTrace();
+
+ System.out.println();
+ System.out.println();
+ System.out.println("JSON Trace:");
+
+ try
+ {
+ new JsonTraceCodec().encode(trace, System.out);
+ }
+ catch (IOException e)
+ {
+ System.err.println("Failed to encode JSON");
+ e.printStackTrace();
+ }
+ System.out.println();
+ }
+}
27 example/com/linkedin/parseq/example/common/MockRequest.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public interface MockRequest<RES>
+{
+ long getLatency();
+
+ RES getResult() throws Exception;
+}
58 example/com/linkedin/parseq/example/common/MockService.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+import com.linkedin.parseq.promise.Promise;
+import com.linkedin.parseq.promise.Promises;
+import com.linkedin.parseq.promise.SettablePromise;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class MockService<RES>
+{
+ private final ScheduledExecutorService _scheduler;
+
+ public MockService(ScheduledExecutorService scheduler)
+ {
+ _scheduler = scheduler;
+ }
+
+ public Promise<RES> call(final MockRequest<RES> request)
+ {
+ final SettablePromise<RES> promise = Promises.settable();
+ _scheduler.schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ promise.done(request.getResult());
+ }
+ catch (Exception e)
+ {
+ promise.fail(e);
+ }
+ }
+ }, request.getLatency(), TimeUnit.MILLISECONDS);
+ return promise;
+ }
+}
42 example/com/linkedin/parseq/example/common/SimpleMockRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.common;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class SimpleMockRequest<RES> implements MockRequest<RES>
+{
+ private final long _latency;
+ private final RES _result;
+
+ public SimpleMockRequest(final long latency, final RES result)
+ {
+ _latency = latency;
+ _result = result;
+ }
+
+ public long getLatency()
+ {
+ return _latency;
+ }
+
+ public RES getResult()
+ {
+ return _result;
+ }
+}
185 example/com/linkedin/parseq/example/composite/MergeSortExample.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite;
+
+import com.linkedin.parseq.BaseTask;
+import com.linkedin.parseq.Context;
+import com.linkedin.parseq.Engine;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.example.common.AbstractExample;
+import com.linkedin.parseq.example.common.ExampleUtil;
+import com.linkedin.parseq.promise.Promise;
+import com.linkedin.parseq.promise.Promises;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import static com.linkedin.parseq.Tasks.callable;
+
+/**
+ * The merge sort example demonstrates how branching and recursive plan
+ * execution work. It is not intended as a model for doing parallel
+ * computation!
+ *
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class MergeSortExample extends AbstractExample
+{
+ public static void main(String[] args) throws Exception
+ {
+ new MergeSortExample().runExample();
+ }
+
+ @Override
+ protected void doRunExample(final Engine engine) throws Exception
+ {
+ final int[] toSort = createRandomArray(10, new Random());
+
+ final Task<int[]> mergeSort = new MergeSortPlan(toSort);
+ engine.run(mergeSort);
+ mergeSort.await();
+
+ System.out.println("Before sort: " + Arrays.toString(toSort));
+ System.out.println("After sort: " + Arrays.toString(mergeSort.get()));
+
+ ExampleUtil.printTracingResults(mergeSort);
+ }
+
+ private static int[] createRandomArray(final int arraySize, final Random random)
+ {
+ final int[] nums = new int[arraySize];
+ for (int i = 0; i < arraySize; i++)
+ {
+ nums[i] = random.nextInt();
+ }
+ return nums;
+ }
+
+ private static class MergeSortPlan extends BaseTask<int[]>
+ {
+ private final int[] _toSort;
+ private final Range _range;
+
+ public MergeSortPlan(final int[] toSort)
+ {
+ this(toSort, new Range(0, toSort.length));
+ }
+
+ private MergeSortPlan(final int[] toSort, final Range range)
+ {
+ super("MergeSort " + range);
+ _toSort = toSort;
+ _range = range;
+ }
+
+ @Override
+ public Promise<int[]> run(final Context ctx)
+ {
+ if (_range.size() == 0)
+ {
+ return Promises.value(new int[0]);
+ }
+ else if (_range.size() == 1)
+ {
+ return Promises.value(new int[] {_toSort[_range.start()]});
+ }
+ else
+ {
+ // Neither base case applied, so recursively split this problem into
+ // smaller problems and then merge the results.
+ final Range fstRange = _range.firstHalf();
+ final Range sndRange = _range.secondHalf();
+ final Task<int[]> fst = new MergeSortPlan(_toSort, fstRange);
+ final Task<int[]> snd = new MergeSortPlan(_toSort, sndRange);
+ final Task<int[]> merge = mergePlan(fstRange, fst, sndRange, snd);
+ ctx.after(fst, snd).run(merge);
+ ctx.run(fst, snd);
+ return merge;
+ }
+ }
+
+ private Task<int[]> mergePlan(final Range fstRange,
+ final Promise<int[]> fstPromise,
+ final Range sndRange,
+ final Promise<int[]> sndPromise)
+ {
+ return callable("Merge " + fstRange + " " + sndRange, new Callable<int[]>()
+ {
+ @Override
+ public int[] call() throws Exception
+ {
+ final int[] fst = fstPromise.get();
+ final int[] snd = sndPromise.get();
+ final int[] results = new int[fst.length + snd.length];
+ for (int i = 0, l = 0, r = 0; i < results.length; i++)
+ {
+ if (l == fst.length)
+ results[i] = snd[r++];
+ else if (r == snd.length)
+ results[i] = fst[l++];
+ else
+ results[i] = fst[l] < snd[r] ? fst[l++] : snd[r++];
+ }
+ return results;
+ }
+ });
+ }
+ }
+
+ private static class Range
+ {
+ private final int _start;
+ private final int _end;
+
+ public Range(int start, int end)
+ {
+ _start = start;
+ _end = end;
+ }
+
+ public int start()
+ {
+ return _start;
+ }
+
+ public Range firstHalf()
+ {
+ return new Range(_start, midpoint());
+ }
+
+ public Range secondHalf()
+ {
+ return new Range(midpoint(), _end);
+ }
+
+ public int size()
+ {
+ return _end - _start;
+ }
+
+ public String toString()
+ {
+ return "[" + _start + "," + _end + ")";
+ }
+
+ private int midpoint()
+ {
+ return (_end - _start) / 2 + _start;
+ }
+ }
+}
165 example/com/linkedin/parseq/example/composite/TimeBoundSearchExample.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite;
+
+import com.linkedin.parseq.BaseTask;
+import com.linkedin.parseq.Context;
+import com.linkedin.parseq.Engine;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.example.common.AbstractExample;
+import com.linkedin.parseq.example.common.MockService;
+import com.linkedin.parseq.example.common.SimpleMockRequest;
+import com.linkedin.parseq.promise.Promise;
+import com.linkedin.parseq.promise.Promises;
+import com.linkedin.parseq.promise.SettablePromise;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.linkedin.parseq.Tasks.action;
+import static com.linkedin.parseq.Tasks.seq;
+import static com.linkedin.parseq.example.common.ExampleUtil.callService;
+import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class TimeBoundSearchExample extends AbstractExample
+{
+ // How long it takes to get a response for each request
+ private static final long[] REQUEST_LATENCIES = new long[] {175, 67, 30, 20, 177, 350};
+
+ // How long the engine will wait for index number of responses
+ private static final long[] WAIT_TIMES = new long[] {400, 300, 200, 100, 0};
+
+ public static void main(String[] args) throws Exception
+ {
+ new TimeBoundSearchExample().runExample();
+ }
+
+ @Override
+ protected void doRunExample(final Engine engine) throws Exception
+ {
+ final MockService<Integer> service = getService();
+
+ final SearchTask example = new SearchTask(service);
+
+ System.out.printf("This com.linkedin.asm.example will issue %d parallel requests\n", REQUEST_LATENCIES.length);
+ System.out.println();
+ for (int i = 0; i < REQUEST_LATENCIES.length; i++)
+ {
+ System.out.printf("Request %d will take %3dms to complete\n", i, REQUEST_LATENCIES[i]);
+ }
+
+ System.out.println();
+ System.out.println("Latency rules:");
+ System.out.println("--------------");
+ for (int i = 0; i < WAIT_TIMES.length; i++)
+ {
+ System.out.printf("Finish if received %d responses after %3dms\n", i, WAIT_TIMES[i]);
+ }
+
+ System.out.println();
+ System.out.println("Execution:");
+ System.out.println("----------");
+ final long startMillis = System.currentTimeMillis();
+ engine.run(example);
+ example.await();
+ final long endMillis = System.currentTimeMillis();
+
+ System.out.println("Responses: " + example.get());
+ System.out.println("Execution time: " + (endMillis - startMillis) + "ms");
+
+ printTracingResults(example);
+ }
+
+ private static class SearchTask extends BaseTask<List<Integer>>
+ {
+ private final MockService<Integer> _service;
+ private final List<Integer> _responses = new ArrayList<Integer>();
+ private final SettablePromise<List<Integer>> _result = Promises.settable();
+
+ private long _startMillis;
+
+ public SearchTask(final MockService<Integer> service)
+ {
+ super("search");
+ _service = service;
+ }
+
+ @Override
+ public Promise<List<Integer>> run(final Context ctx)
+ {
+ // Save the start time so we can determine when to finish
+ _startMillis = System.currentTimeMillis();
+
+ // Set up timeouts for responses
+ long lastWaitTime = Integer.MAX_VALUE;
+ for (final long waitTime : WAIT_TIMES)
+ {
+ if (waitTime < lastWaitTime && waitTime > 0)
+ {
+ ctx.createTimer(waitTime, TimeUnit.MILLISECONDS, checkDone());
+ lastWaitTime = waitTime;
+ }
+ }
+
+ // Issue requests
+ for (int i = 0; i < REQUEST_LATENCIES.length; i++)
+ {
+ final long requestLatency = REQUEST_LATENCIES[i];
+ final Task<Integer> callSvc =
+ callService("subSearch[" + i + "]",
+ _service,
+ new SimpleMockRequest<Integer>(requestLatency, i));
+
+ ctx.run(seq(callSvc, addResponse(callSvc), checkDone()));
+ }
+
+ return _result;
+ }
+
+ private Task<?> checkDone()
+ {
+ return action("checkDone", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ final int index = Math.min(WAIT_TIMES.length - 1, _responses.size());
+ if (WAIT_TIMES[index] + _startMillis <= System.currentTimeMillis())
+ {
+ _result.done(_responses);
+ }
+ }
+ });
+ }
+
+ private Task<?> addResponse(final Promise<Integer> response)
+ {
+ return action("addResponse", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _responses.add(response.get());
+ }
+ });
+ }
+ }
+}
121 example/com/linkedin/parseq/example/composite/TwoStageFanoutExample.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite;
+
+import com.linkedin.parseq.BaseTask;
+import com.linkedin.parseq.Context;
+import com.linkedin.parseq.Engine;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.example.common.AbstractExample;
+import com.linkedin.parseq.example.common.MockService;
+import com.linkedin.parseq.promise.Promise;
+
+import java.util.concurrent.Callable;
+
+import static com.linkedin.parseq.Tasks.action;
+import static com.linkedin.parseq.Tasks.callable;
+import static com.linkedin.parseq.Tasks.par;
+import static com.linkedin.parseq.Tasks.seq;
+import static com.linkedin.parseq.example.common.ExampleUtil.fetchUrl;
+import static com.linkedin.parseq.example.common.ExampleUtil.printTracingResults;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class TwoStageFanoutExample extends AbstractExample
+{
+ public static void main(String[] args) throws Exception
+ {
+ new TwoStageFanoutExample().runExample();
+ }
+
+ @Override
+ protected void doRunExample(final Engine engine) throws Exception
+ {
+ final MockService<String> httpClient = getService();
+
+ final FanoutTask fanout = new FanoutTask(httpClient);
+ final Task<?> printResults = action("printResults", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println(fanout.get());
+ printTracingResults(fanout);
+ }
+ });
+
+ engine.run(seq(fanout, printResults));
+
+ printResults.await();
+ }
+
+ private static class FanoutTask extends BaseTask<String>
+ {
+ private final MockService<String> _httpClient;
+ private final StringBuilder _result = new StringBuilder();
+
+ private FanoutTask(final MockService<String> httpClient)
+ {
+ super("TwoStageFanout");
+ _httpClient = httpClient;
+ }
+
+ @Override
+ public Promise<String> run(final Context ctx)
+ {
+ final Task<String> twoStage =
+ seq(par(fetchAndLog("http://www.bing.com"),
+ fetchAndLog("http://www.yahoo.com")),
+ fetchAndLog("http://www.google.com"),
+ buildResult());
+ ctx.run(twoStage);
+ return twoStage;
+ }
+
+ private Task<String> buildResult()
+ {
+ return callable("buildResult", new Callable<String>()
+ {
+ @Override
+ public String call()
+ {
+ return _result.toString();
+ }
+ });
+ }
+
+ private Task<?> fetchAndLog(final String url)
+ {
+ final Task<String> fetch = fetchUrl(_httpClient, url);
+ final Task<?> logResult = logResult(url, fetch);
+ return seq(fetch, logResult);
+ }
+
+ private Task<?> logResult(final String url, final Promise<String> promise)
+ {
+ return action("logResult[" + url + "]", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _result.append(String.format("%10s => %s\n", url, promise.get()));
+ }
+ });
+ }
+ }
+}
27 example/com/linkedin/parseq/example/composite/classifier/Classification.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public enum Classification
+{
+ FULL_VISIBILITY,
+ PARTIAL_VISIBILITY,
+ NO_VISIBILITY
+}
26 example/com/linkedin/parseq/example/composite/classifier/Classifier.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt
+ * @version $Revision$
+ */
+public interface Classifier
+{
+ Classification classify(long vieweeId);
+}
73 example/com/linkedin/parseq/example/composite/classifier/ClassifierDriver.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+import com.linkedin.parseq.Engine;
+import com.linkedin.parseq.EngineBuilder;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.example.common.ExampleUtil;
+import com.linkedin.parseq.example.composite.classifier.client.Client;
+import com.linkedin.parseq.example.composite.classifier.client.impl.ClientImpl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class ClassifierDriver
+{
+ public static void main(String[] args) throws InterruptedException
+ {
+ final long viewerId = 0;
+
+ final Set<Long> unclassified = new HashSet<Long>();
+ for (long i = 0; i < 20; i++) {
+ unclassified.add(i);
+ }
+
+ final ScheduledExecutorService serviceScheduler = Executors.newSingleThreadScheduledExecutor();
+ final Client restLiClient = new ClientImpl(serviceScheduler);
+
+ final int numCores = Runtime.getRuntime().availableProcessors();
+ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
+ final Engine engine = new EngineBuilder()
+ .setTaskExecutor(scheduler)
+ .setTimerScheduler(scheduler)
+ .build();
+
+ final ClassifierPlanFactory classifier = new ClassifierPlanFactory(restLiClient);
+ try
+ {
+ final Task<Map<Long, Classification>> classifications = classifier.classify(viewerId, unclassified);
+ engine.run(classifications);
+ classifications.await();
+ System.out.println(classifications.get());
+
+ ExampleUtil.printTracingResults(classifications);
+ }
+ finally
+ {
+ serviceScheduler.shutdownNow();
+ engine.shutdown();
+ scheduler.shutdownNow();
+ }
+ }
+}
209 example/com/linkedin/parseq/example/composite/classifier/ClassifierPlanFactory.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+import com.linkedin.parseq.BaseTask;
+import com.linkedin.parseq.Context;
+import com.linkedin.parseq.Task;
+import com.linkedin.parseq.example.composite.classifier.client.Client;
+import com.linkedin.parseq.example.composite.classifier.client.Request;
+import com.linkedin.parseq.example.composite.classifier.client.impl.GetNetworkRequest;
+import com.linkedin.parseq.example.composite.classifier.client.impl.TruthMapRequest;
+import com.linkedin.parseq.promise.Promise;
+import com.linkedin.parseq.promise.Promises;
+import com.linkedin.parseq.promise.SettablePromise;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static com.linkedin.parseq.Tasks.action;
+import static com.linkedin.parseq.Tasks.par;
+import static com.linkedin.parseq.Tasks.seq;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class ClassifierPlanFactory
+{
+ private final Client _client;
+
+ public ClassifierPlanFactory(final Client client)
+ {
+ _client = client;
+ }
+
+ public Task<Map<Long, Classification>> classify(final long viewerId,
+ final Set<Long> vieweeIds)
+ {
+ return new ClassifierPlan(viewerId, vieweeIds);
+ }
+
+ private class ClassifierPlan extends BaseTask<Map<Long, Classification>>
+ {
+ private final long _viewerId;
+ private final Set<Long> _unclassified;
+ private final Map<Long, Classification> _classified = new HashMap<Long, Classification>();
+ private final SettablePromise<Map<Long, Classification>> _result = Promises.settable();
+
+ private ClassifierPlan(final long viewerId,
+ final Set<Long> unclassified)
+ {
+ super("ClassifierPlan[viewerId=" + viewerId + "]");
+ _viewerId = viewerId;
+ _unclassified = new HashSet<Long>(unclassified);
+ }
+
+ @Override
+ public Promise<Map<Long, Classification>> run(final Context ctx)
+ {
+ // Network data is shared across classifiers, so we create it here
+ final Task<Network> network = clientRequestTask(new GetNetworkRequest(_viewerId));
+
+ // CLASSIFIERS
+
+ // Self classification
+ // Treat self as a special case. If we can classify all requested profiles
+ // as self, then there is no need to make a remote call.
+ final Task<?> selfClassifier = classifyTask(new SelfClassifier(_viewerId));
+
+ // Full visibility classification
+ final Task<?> directlyConnectedClassifier = connectedClassifyTask(network);
+ final Task<?> invitedToGroupClassifier = truthMapQueryClassifyTask("GroupInvited", 1, Classification.FULL_VISIBILITY);
+ final Task<?> messagedClassifier = truthMapQueryClassifyTask("Messaged", 2, Classification.FULL_VISIBILITY);
+
+ // Partial visibility classification
+ final Task<?> inNetworkClassifier = networkClassifyTask(network);
+ final Task<?> sharesGroupClassifier = truthMapQueryClassifyTask("CommonGroups", 4, Classification.PARTIAL_VISIBILITY);
+
+ // Default visibility (i.e. no visibility)
+ final Task<?> defaultClassifier = classifyTask(DefaultClassifier.instance());
+
+ // If we time out then we run local filters and return
+ ctx.createTimer(1, TimeUnit.SECONDS, defaultClassifier);
+
+ // ORDERING
+ final Task<?> ordering =
+ seq(selfClassifier,
+ par(seq(network, directlyConnectedClassifier),
+ invitedToGroupClassifier,
+ messagedClassifier),
+ par(inNetworkClassifier,
+ sharesGroupClassifier),
+ defaultClassifier);
+ ctx.run(ordering);
+
+ return _result;
+ }
+
+ private Task<?> classifyTask(final Classifier classifier)
+ {
+ return action(classifier.getClass().getSimpleName(), new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doClassify(classifier);
+ }
+ });
+ }
+
+ private Task<?> truthMapQueryClassifyTask(final String name,
+ final int remainder,
+ final Classification classification)
+ {
+ final Task<Map<Long, Boolean>> svcCall =
+ clientRequestTask(new TruthMapRequest("get" + name, remainder, _unclassified));
+
+ final Task<?> classifyResult = truthMapClassifyTask(name, classification, svcCall);
+
+ return seq(svcCall, classifyResult);
+ }
+
+ private Task<?> truthMapClassifyTask(final String name,
+ final Classification classification,
+ final Promise<Map<Long, Boolean>> result)
+ {
+ return action(name + "Classifier", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doClassify(new TruthMapClassifier(classification, result.get()));
+ }
+ });
+ }
+
+ private <T> Task<T> clientRequestTask(final Request<T> request)
+ {
+ return new BaseTask<T>(request.getName())
+ {
+ @Override
+ protected Promise<? extends T> run(final Context context) throws Exception
+ {
+ return _client.sendRequest(request);
+ }
+ };
+ }
+
+ private Task<?> connectedClassifyTask(final Task<Network> network)
+ {
+ return action("ConnectedClassifier", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doClassify(new ConnectedClassifier(network.get()));
+ }
+ });
+ }
+
+ private Task<?> networkClassifyTask(final Task<Network> network)
+ {
+ return action("NetworkClassifier", new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doClassify(new NetworkClassifier(network.get()));
+ }
+ });
+ }
+
+ private void doClassify(final Classifier classifier)
+ {
+ for (Iterator<Long> it = _unclassified.iterator(); it.hasNext(); )
+ {
+ final long vieweeId = it.next();
+ final Classification classification = classifier.classify(vieweeId);
+ if (classification != null)
+ {
+ it.remove();
+ _classified.put(vieweeId, classification);
+ }
+ }
+
+ if (_unclassified.isEmpty())
+ {
+ _result.done(_classified);
+ }
+ }
+ }
+}
38 example/com/linkedin/parseq/example/composite/classifier/ConnectedClassifier.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class ConnectedClassifier implements Classifier
+{
+ private final Network _subnet;
+
+ public ConnectedClassifier(final Network subnet)
+ {
+ _subnet = subnet;
+ }
+
+ @Override
+ public Classification classify(final long vieweeId)
+ {
+ return _subnet != null && Network.Distance.D1.equals(_subnet.getDistance(vieweeId))
+ ? Classification.FULL_VISIBILITY
+ : null;
+ }
+}
36 example/com/linkedin/parseq/example/composite/classifier/DefaultClassifier.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class DefaultClassifier implements Classifier
+{
+ private static DefaultClassifier INSTANCE = new DefaultClassifier();
+
+ public static DefaultClassifier instance()
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public Classification classify(final long vieweeId)
+ {
+ return Classification.NO_VISIBILITY;
+ }
+}
58 example/com/linkedin/parseq/example/composite/classifier/Network.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class Network
+{
+ public static enum Distance
+ {
+ NOT_IN_NETWORK,
+ SELF,
+ D1,
+ D2,
+ D3
+ }
+
+ private final long memberId;
+
+ public Network(long memberId)
+ {
+ this.memberId = memberId;
+ }
+
+ public Distance getDistance(final long memberId)
+ {
+ if (memberId == this.memberId)
+ return Distance.SELF;
+
+ final long group = memberId % 10;
+ switch ((int) group)
+ {
+ case 10:
+ return Distance.D3;
+ case 9:
+ return Distance.D2;
+ case 8:
+ return Distance.D1;
+ default:
+ return Distance.NOT_IN_NETWORK;
+ }
+ }
+}
40 example/com/linkedin/parseq/example/composite/classifier/NetworkClassifier.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class NetworkClassifier implements Classifier
+{
+ private final Network _subnet;
+
+ public NetworkClassifier(final Network subnet)
+ {
+ _subnet = subnet;
+ }
+
+ @Override
+ public Classification classify(final long vieweeId)
+ {
+ final Network.Distance distance = _subnet.getDistance(vieweeId);
+
+ return Network.Distance.D2.equals(distance) || Network.Distance.D3.equals(distance)
+ ? Classification.PARTIAL_VISIBILITY
+ : null;
+ }
+}
38 example/com/linkedin/parseq/example/composite/classifier/SelfClassifier.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class SelfClassifier implements Classifier
+{
+ private final long viewerId;
+
+ public SelfClassifier(final long viewerId)
+ {
+ this.viewerId = viewerId;
+ }
+
+ @Override
+ public Classification classify(final long vieweeId)
+ {
+ return vieweeId == viewerId
+ ? Classification.FULL_VISIBILITY
+ : null;
+ }
+}
43 example/com/linkedin/parseq/example/composite/classifier/TruthMapClassifier.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.parseq.example.composite.classifier;
+
+import java.util.Map;
+
+/**
+ * @author Chris Pettitt (cpettitt@linkedin.com)
+ */
+public class TruthMapClassifier implements Classifier
+{
+ private final Classification _classification;
+ private final Map<Long, Boolean> _truthMap;
+
+ public TruthMapClassifier(Classification classification, Map<Long, Boolean> truthMap)
+ {
+ _classification = classification;
+ _truthMap = truthMap;
+ }
+
+ @Override
+ public Classification classify(final long vieweeId)
+ {
+ final Boolean b = _truthMap.get(vieweeId);
+ return b != null && b
+ ? _classification
+ : null;
+ }
+}
27 example/com/linkedin/parseq/example/composite/classifier/client/Client.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at