Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 25fe06165bb9b6915e526d452eee2c9aa61ec270 1 parent e6ef753
@szegedi szegedi authored
Showing with 16,694 additions and 0 deletions.
  1. +7 −0 .classpath
  2. +23 −0 .project
  3. +3 −0  .scala_dependencies
  4. +12 −0 .settings/org.eclipse.jdt.core.prefs
  5. +71 −0 ant/bootstrap.xml
  6. +20 −0 ant/clean.xml
  7. +111 −0 ant/compile.xml
  8. +35 −0 ant/docs.xml
  9. +126 −0 ant/package.xml
  10. +55 −0 ant/prepare.xml
  11. +56 −0 ant/test.xml
  12. +6 −0 build.xml
  13. +21 −0 ivy/ivy.xml
  14. +32 −0 ivy/ivysettings.xml
  15. +531 −0 src/main/java/com/twitter/actors/FJTask.java
  16. +996 −0 src/main/java/com/twitter/actors/FJTaskRunner.java
  17. +714 −0 src/main/java/com/twitter/actors/FJTaskRunnerGroup.java
  18. +18 −0 src/main/java/com/twitter/actors/IFJTaskRunnerGroup.java
  19. +25 −0 src/main/java/com/twitter/actors/LinkedNode.java
  20. +185 −0 src/main/java/com/twitter/actors/LinkedQueue.java
  21. +32 −0 src/main/java/com/twitter/actors/threadpool/AbstractCollection.java
  22. +293 −0 src/main/java/com/twitter/actors/threadpool/AbstractExecutorService.java
  23. +170 −0 src/main/java/com/twitter/actors/threadpool/AbstractQueue.java
  24. +811 −0 src/main/java/com/twitter/actors/threadpool/Arrays.java
  25. +210 −0 src/main/java/com/twitter/actors/threadpool/AtomicInteger.java
  26. +35 −0 src/main/java/com/twitter/actors/threadpool/Callable.java
  27. +34 −0 src/main/java/com/twitter/actors/threadpool/CancellationException.java
  28. +98 −0 src/main/java/com/twitter/actors/threadpool/CompletionService.java
  29. +65 −0 src/main/java/com/twitter/actors/threadpool/ExecutionException.java
  30. +112 −0 src/main/java/com/twitter/actors/threadpool/Executor.java
  31. +178 −0 src/main/java/com/twitter/actors/threadpool/ExecutorCompletionService.java
  32. +331 −0 src/main/java/com/twitter/actors/threadpool/ExecutorService.java
  33. +667 −0 src/main/java/com/twitter/actors/threadpool/Executors.java
  34. +143 −0 src/main/java/com/twitter/actors/threadpool/Future.java
  35. +311 −0 src/main/java/com/twitter/actors/threadpool/FutureTask.java
  36. +28 −0 src/main/java/com/twitter/actors/threadpool/Perf.java
  37. +191 −0 src/main/java/com/twitter/actors/threadpool/Queue.java
  38. +62 −0 src/main/java/com/twitter/actors/threadpool/RejectedExecutionException.java
  39. +34 −0 src/main/java/com/twitter/actors/threadpool/RejectedExecutionHandler.java
  40. +24 −0 src/main/java/com/twitter/actors/threadpool/RunnableFuture.java
  41. +41 −0 src/main/java/com/twitter/actors/threadpool/ThreadFactory.java
  42. +1,968 −0 src/main/java/com/twitter/actors/threadpool/ThreadPoolExecutor.java
  43. +38 −0 src/main/java/com/twitter/actors/threadpool/TimeoutException.java
  44. +85 −0 src/main/java/com/twitter/actors/threadpool/helpers/FIFOWaitQueue.java
  45. +29 −0 src/main/java/com/twitter/actors/threadpool/helpers/NanoTimer.java
  46. +66 −0 src/main/java/com/twitter/actors/threadpool/helpers/ThreadHelpers.java
  47. +344 −0 src/main/java/com/twitter/actors/threadpool/helpers/Utils.java
  48. +147 −0 src/main/java/com/twitter/actors/threadpool/helpers/WaitQueue.java
  49. +191 −0 src/main/java/com/twitter/actors/threadpool/locks/CondVar.java
  50. +435 −0 src/main/java/com/twitter/actors/threadpool/locks/Condition.java
  51. +147 −0 src/main/java/com/twitter/actors/threadpool/locks/FIFOCondVar.java
  52. +328 −0 src/main/java/com/twitter/actors/threadpool/locks/Lock.java
  53. +104 −0 src/main/java/com/twitter/actors/threadpool/locks/ReadWriteLock.java
  54. +960 −0 src/main/java/com/twitter/actors/threadpool/locks/ReentrantLock.java
  55. +1,340 −0 src/main/java/com/twitter/actors/threadpool/locks/ReentrantReadWriteLock.java
  56. +37 −0 src/main/scala/com/twitter/actors/AbstractActor.scala
  57. +966 −0 src/main/scala/com/twitter/actors/Actor.scala
  58. +29 −0 src/main/scala/com/twitter/actors/ActorGC.scala
  59. +39 −0 src/main/scala/com/twitter/actors/ActorProxy.scala
  60. +185 −0 src/main/scala/com/twitter/actors/Channel.scala
  61. +47 −0 src/main/scala/com/twitter/actors/Debug.scala
  62. +202 −0 src/main/scala/com/twitter/actors/FJTaskScheduler2.scala
  63. +132 −0 src/main/scala/com/twitter/actors/Future.scala
  64. +66 −0 src/main/scala/com/twitter/actors/InputChannel.scala
  65. +206 −0 src/main/scala/com/twitter/actors/MessageQueue.scala
  66. +49 −0 src/main/scala/com/twitter/actors/OutputChannel.scala
  67. +113 −0 src/main/scala/com/twitter/actors/Reaction.scala
  68. +317 −0 src/main/scala/com/twitter/actors/Scheduler.scala
  69. +49 −0 src/main/scala/com/twitter/actors/SchedulerAdapter.scala
  70. +173 −0 src/main/scala/com/twitter/actors/TickedScheduler.scala
  71. +36 −0 src/main/scala/com/twitter/actors/remote/FreshNameCreator.scala
  72. +53 −0 src/main/scala/com/twitter/actors/remote/JavaSerializer.scala
  73. +149 −0 src/main/scala/com/twitter/actors/remote/NetKernel.scala
  74. +187 −0 src/main/scala/com/twitter/actors/remote/Proxy.scala
  75. +133 −0 src/main/scala/com/twitter/actors/remote/RemoteActor.scala
  76. +57 −0 src/main/scala/com/twitter/actors/remote/Serializer.scala
  77. +23 −0 src/main/scala/com/twitter/actors/remote/Service.scala
  78. +277 −0 src/main/scala/com/twitter/actors/remote/TcpService.scala
  79. +70 −0 src/main/scala/com/twitter/actors/scheduler/TerminationMonitor.scala
View
7 .classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src/main/scala"/>
+ <classpathentry kind="con" path="ch.epfl.lamp.sdt.launching.SCALA_CONTAINER"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.launching.macosx.MacOSXType/JVM 1.6.0"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
View
23 .project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>twitterActors</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>ch.epfl.lamp.sdt.core.scalabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>ch.epfl.lamp.sdt.core.scalanature</nature>
+ </natures>
+</projectDescription>
View
3  .scala_dependencies
@@ -0,0 +1,3 @@
+/Users/aszegedi/Documents/projects/twitterActors/bin:/Applications/eclipse 3.6/configuration/org.eclipse.osgi/bundles/308/1/.cp/lib/scala-library.jar:/Applications/eclipse 3.6/configuration/org.eclipse.osgi/bundles/308/1/.cp/lib/scala-dbc.jar:/Applications/eclipse 3.6/configuration/org.eclipse.osgi/bundles/308/1/.cp/lib/scala-swing.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Classes/classes.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Classes/ui.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Classes/jsse.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Classes/jce.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Classes/charsets.jar:/System/Library/Java/Extensions/AppleScriptEngine.jar:/System/Library/Java/Extensions/dns_sd.jar:/System/Library/Java/Extensions/j3daudio.jar:/System/Library/Java/Extensions/j3dcore.jar:/System/Library/Java/Extensions/j3dutils.jar:/System/Library/Java/Extensions/jai_codec.jar:/System/Library/Java/Extensions/jai_core.jar:/System/Library/Java/Extensions/mlibwrapper_jai.jar:/System/Library/Java/Extensions/MRJToolkit.jar:/System/Library/Java/Extensions/QTJava.zip:/System/Library/Java/Extensions/vecmath.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/apple_provider.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/bcprov-jdk16-145.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/dnsns.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/localedata.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/sunjce_provider.jar:/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/lib/ext/sunpkcs11.jar
+-------
+-------
View
12 .settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,12 @@
+#Fri Jul 02 17:36:05 CEST 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.debug.lineNumber=generate
+org.eclipse.jdt.core.compiler.debug.localVariable=generate
+org.eclipse.jdt.core.compiler.debug.sourceFile=generate
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.source=1.6
View
71 ant/bootstrap.xml
@@ -0,0 +1,71 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <!-- defaults for all projects -->
+ <property name="source.dir" value="${basedir}/src/main" />
+ <property name="test.source.dir" value="${basedir}/src/test" />
+ <property name="target.dir" value="${basedir}/target" />
+
+ <property environment="env" />
+
+ <property name="ivy.install.version" value="2.1.0" />
+ <property name="ivy.jar.dir" value="${user.home}/.ivy2" />
+ <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy-${ivy.install.version}.jar" />
+ <property name="jsch.install.version" value="0.1.29" />
+ <property name="jsch.jar.file" value="${ivy.jar.dir}/jsch-${jsch.install.version}.jar" />
+
+ <!--
+ download ivy from the web site so that it can be used without being
+ installed. if the file has already been downloaded, we use a rename
+ trick to avoid hitting the website again. (that would be annoying
+ when building offline.)
+ -->
+ <target name="download-ivy" unless="skip.download">
+ <mkdir dir="${ivy.jar.dir}"/>
+ <condition property="ivy.url" value="file:${ivy.jar.file}">
+ <available file="${ivy.jar.file}" />
+ </condition>
+ <property name="ivy.url" value="http://repo1.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar" />
+ <get src="${ivy.url}" dest="${ivy.jar.file}.download" usetimestamp="true" />
+ <move file="${ivy.jar.file}.download" tofile="${ivy.jar.file}" />
+
+ <condition property="jsch.url" value="file:${jsch.jar.file}">
+ <available file="${jsch.jar.file}" />
+ </condition>
+ <property name="jsch.url" value="http://repo1.maven.org/maven2/jsch/jsch/${jsch.install.version}/jsch-${jsch.install.version}.jar" />
+ <get src="${jsch.url}" dest="${jsch.jar.file}.download" usetimestamp="true" />
+ <move file="${jsch.jar.file}.download" tofile="${jsch.jar.file}" />
+ </target>
+
+ <!-- import ivy's ant tasks -->
+ <target name="install-ivy" depends="download-ivy">
+ <path id="ivy.lib.path">
+ <fileset dir="${ivy.jar.dir}" includes="ivy-${ivy.install.version}.jar jsch-${jsch.install.version}.jar"/>
+ </path>
+ <taskdef resource="org/apache/ivy/ant/antlib.xml" uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path" />
+ </target>
+
+ <!-- define filename-friendly names for the OS variants -->
+ <condition property="os.libsname" value="osx">
+ <os name="Mac OS X" />
+ </condition>
+ <condition property="os.jni.ext" value="jnilib">
+ <os name="Mac OS X" />
+ </condition>
+ <condition property="os.libsname" value="linux">
+ <os name="Linux" />
+ </condition>
+ <condition property="os.jni.ext" value="so">
+ <os name="Linux" />
+ </condition>
+
+ <!-- where to look for the ivy config -->
+ <property name="ivy.dep.file" value="${basedir}/ivy/ivy.xml" />
+ <property name="ivy.settings.file" value="${basedir}/ivy/ivysettings.xml" />
+
+ <import file="clean.xml" />
+ <import file="prepare.xml" />
+ <import file="compile.xml" />
+ <import file="test.xml" />
+ <import file="docs.xml" />
+ <import file="package.xml" />
+</project>
View
20 ant/clean.xml
@@ -0,0 +1,20 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <target name="clean" depends="init" description="erase built files and targets">
+ <delete dir="${target.dir}" />
+ <delete dir="${dist.dir}" />
+ <!-- i dont think this is really a good idea: -->
+ <!-- delete dir="${ivy.jar.dir}/cache/${ivy.organisation}" /-->
+ </target>
+
+ <target name="clean-ivy" depends="prepare" description="erase ivy cache of downloaded packages">
+ <ivy:cleancache />
+ </target>
+
+ <target name="clean-jni" depends="prepare" description="clean out any built jni targets" if="build.jni">
+ <ant dir="src/main/jni" target="clean" />
+ </target>
+
+ <target name="distclean" depends="clean, clean-jni, clean-ivy" />
+
+</project>
View
111 ant/compile.xml
@@ -0,0 +1,111 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <!-- compile old-skool java -->
+
+ <target name="compile-java" if="build.java">
+ <javac srcdir="${source.dir}/java" destdir="${target.dir}/classes">
+ <classpath>
+ <path refid="deps.path" />
+ </classpath>
+ <include name="**/*.java" />
+ </javac>
+ </target>
+
+
+ <!-- compile scala -->
+
+ <target name="compile-scala" if="build.scala">
+ <scalac srcdir="${source.dir}/scala" destdir="${target.dir}/classes" force="changed">
+ <classpath>
+ <path refid="deps.path" />
+ </classpath>
+ <include name="**/*.scala" />
+ </scalac>
+ </target>
+
+
+ <!-- create properties file with build info -->
+
+ <target name="find-git-revision" unless="no.git">
+ <!-- ask git for the current "head" commit-id, for memoizing inside the built jar -->
+ <exec outputproperty="revision" executable="git" failifexecutionfails="false">
+ <arg value="rev-parse" />
+ <arg value="HEAD" />
+ </exec>
+ <exec executable="git" failifexecutionfails="false">
+ <arg value="rev-parse" />
+ <arg value="HEAD" />
+ <redirector outputproperty="revision-short">
+ <outputfilterchain>
+ <tokenfilter>
+ <filetokenizer />
+ <replaceregex pattern="(.{8}).*" replace="\1"/>
+ </tokenfilter>
+ </outputfilterchain>
+ </redirector>
+ </exec>
+ </target>
+
+ <target name="write-build-info" depends="init, find-git-revision" if="ivy.extra.buildpackage">
+ <tstamp>
+ <format property="build.timestamp.time" pattern="yyyyMMdd-HHmmss" />
+ <format property="build.timestamp.date" pattern="yyyyMMdd" />
+ </tstamp>
+ <pathconvert property="build.properties.path">
+ <path location="${ivy.extra.buildpackage}" />
+ <unpackagemapper from="${basedir}/*" to="${target.dir}/classes/*" />
+ </pathconvert>
+ <propertyfile file="${build.properties.path}/build.properties">
+ <entry key="name" value="${ivy.module}" />
+ <entry key="version" value="${ivy.revision}" />
+ <entry key="build_name" value="${build.timestamp.time}" />
+ <entry key="build_revision" value="${revision}" />
+ </propertyfile>
+ </target>
+
+
+ <!-- copy resources needed by tests and jar -->
+
+ <target name="copy-resources">
+ <copy todir="${dist.dir}/libs" flatten="true">
+ <path refid="deps.path" />
+ </copy>
+ <copy todir="${target.dir}/test-classes/" failonerror="false">
+ <fileset dir="${test.source.dir}/resources" />
+ </copy>
+ <copy todir="${target.dir}/classes/" overwrite="true" failonerror="false">
+ <fileset dir="${source.dir}/resources" />
+ </copy>
+ </target>
+
+ <target name="copy-config" if="copy.config">
+ <copy todir="${dist.dir}/config">
+ <fileset dir="${basedir}/config" />
+ </copy>
+ </target>
+
+ <target name="copy-extra-config" if="config.extra">
+ <copy todir="${dist.dir}/config">
+ <fileset dir="${config.extra}" />
+ </copy>
+ </target>
+
+ <target name="copy-extra-libs" if="libs.extra">
+ <copy todir="${dist.dir}/libs">
+ <path refid="libs.extra" />
+ </copy>
+ </target>
+
+ <target name="copy-extra-dist" if="dist.extra">
+ <copy todir="${dist.dir}">
+ <path refid="dist.extra" />
+ </copy>
+ </target>
+
+ <target name="copy-extra" depends="copy-resources, copy-config, copy-extra-config, copy-extra-libs, copy-extra-dist" />
+
+
+ <target name="compile" depends="prepare, find-source, compile-java, compile-scala, write-build-info, copy-extra"
+ description="compile java and scala code" />
+
+</project>
View
35 ant/docs.xml
@@ -0,0 +1,35 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <target name="vscaladoc" depends="prepare" unless="skip.docs">
+ <delete dir="${docs.target.dir}/scaladoc" />
+ <mkdir dir="${docs.target.dir}/scaladoc" />
+ <pathconvert property="doc.sources" pathsep=" ">
+ <fileset dir="${source.dir}" includes="**/*.scala" />
+ </pathconvert>
+ <path id="docs.path">
+ <path refid="bootstrap.path" />
+ <pathelement location="${target.dir}/classes" />
+ </path>
+ <echo message="Building vscaladoc..." />
+ <java classname="org.scala_tools.vscaladoc.Main" fork="true" failonerror="true">
+ <classpath>
+ <path refid="bootstrap.path" />
+ </classpath>
+ <arg value="-classpath" />
+ <arg pathref="docs.path" />
+ <arg value="-d" />
+ <arg value="${docs.target.dir}/scaladoc" />
+ <arg value="-sourcepath" />
+ <arg value="${source.dir}/scala" />
+ <arg value="-windowtitle" />
+ <arg value="${ivy.module} ${ivy.revision}" />
+ <arg value="-doctitle" />
+ <arg value="${ivy.module} ${ivy.revision}" />
+ <arg value="-linksource" />
+ <arg line="${doc.sources}" />
+ </java>
+ </target>
+
+ <target name="docs" depends="prepare,vscaladoc" unless="skip.docs" description="build source documentation" />
+
+</project>
View
126 ant/package.xml
@@ -0,0 +1,126 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <!-- unzip all the dependent jars into target/ so that the final jar will depend on nothing. kinda evil. -->
+ <target name="pack-deps" if="pack.deps">
+ <pathconvert pathsep="," property="deps.path.list">
+ <path refid="deps.path" />
+ <map from="/" to="" />
+ </pathconvert>
+ <unzip dest="${target.dir}/classes">
+ <fileset dir="/" includes="${deps.path.list}" />
+ </unzip>
+ <delete dir="${target.dir}/classes/META-INF" />
+ </target>
+
+ <target name="make-non-executable-jar" unless="ivy.extra.jarclassname" depends="pack-deps">
+ <jar destfile="${dist.dir}/${jar.name}.jar">
+ <fileset dir="${target.dir}/classes" />
+ </jar>
+ </target>
+
+ <!-- generate a jar that contains all deps inside it, so it can be run with "java -jar" -->
+ <target name="make-executable-jar" if="ivy.extra.jarclassname" depends="copy-extra,pack-deps">
+ <pathconvert refid="deps.path" pathsep=" " property="deps.path.jar-format">
+ <chainedmapper>
+ <flattenmapper />
+ <globmapper from="*" to="libs/*" />
+ </chainedmapper>
+ </pathconvert>
+ <jar destfile="${dist.dir}/${jar.name}.jar">
+ <fileset dir="${target.dir}/classes" />
+ <manifest>
+ <attribute name="Main-Class" value="${ivy.extra.jarclassname}" />
+ <attribute name="Class-Path" value="${deps.path.jar-format}" />
+ </manifest>
+ </jar>
+ </target>
+
+ <target name="generate-scripts" depends="prepare" if="generate.scripts">
+ <pathconvert refid="deps.path" property="classpath" />
+ <pathconvert refid="test.path" property="test.classpath" />
+ <pathconvert refid="deps.path" property="deps.path.dist-format">
+ <chainedmapper>
+ <flattenmapper />
+ <globmapper from="*" to="$${DIST_HOME}/libs/*" />
+ </chainedmapper>
+ </pathconvert>
+
+ <!-- delete dir="${basedir}/target/scripts" /-->
+ <mkdir dir="${dist.dir}/scripts" />
+ <copy todir="${dist.dir}/scripts" overwrite="true">
+ <fileset dir="${basedir}/src/scripts" />
+ <filterset>
+ <filter token="CLASSPATH" value="${classpath}:${target.dir}/classes" />
+ <filter token="TEST_CLASSPATH" value="${test.classpath}:${target.dir}/classes:${target.dir}/test-classes" />
+ <filter token="DIST_CLASSPATH" value="${deps.path.dist-format}:$${DIST_HOME}/${jar.name}.jar" />
+ <filter token="TARGET" value="${target.dir}" />
+ <filter token="DIST_NAME" value="${dist.name}" />
+ </filterset>
+ </copy>
+ <copy todir="${dist.dir}/scripts" overwrite="true" failonerror="false">
+ <fileset dir="${target.dir}/gen-rb" />
+ </copy>
+ <chmod dir="${dist.dir}/scripts" includes="*" perm="ugo+x" />
+ </target>
+
+ <target name="package" depends="test, make-non-executable-jar, make-executable-jar,
+ generate-scripts, docs" description="build complete jar(s) and docs">
+ <ivy:makepom ivyfile="${basedir}/ivy/ivy.xml" pomfile="${dist.dir}/${dist.name}.pom">
+ <mapping conf="*" scope="runtime" />
+ <mapping conf="bootstrap" scope="compile" />
+ <mapping conf="test" scope="compile" />
+ </ivy:makepom>
+ <ivy:deliver conf="*(public)" />
+ <condition property="dist.tarball.name"
+ value="${ivy.module}-${revision-short}.tar.bz2"
+ else="${dist.name}.tar.bz2">
+ <isset property="dist.build_integration" />
+ </condition>
+ <tar destfile="${basedir}/dist/${dist.tarball.name}"
+ basedir="${basedir}/dist" compression="bzip2">
+ <include name="${dist.name}/${jar.name}.jar" />
+ <include name="${dist.name}/*.so" />
+ <include name="${dist.name}/*.jnilib" />
+ <include name="${dist.name}/libs/**" />
+ <include name="${dist.name}/config/**" />
+ <tarfileset dir="${basedir}/dist" mode="755">
+ <include name="${dist.name}/scripts/**" />
+ </tarfileset>
+ </tar>
+ <ivy:publish resolver="local" overwrite="true">
+ <artifacts pattern="${dist.dir}/[artifact]-[revision].[ext]" />
+ </ivy:publish>
+ </target>
+
+ <target name="jars" depends="package" />
+
+ <target name="dist" depends="package" description="create a source distribution zipfile">
+ <copy todir="${dist.dir}" failonerror="false">
+ <fileset dir="${basedir}" includes="INSTALL* LICENSE* README* build.xml" />
+ </copy>
+ <copy todir="${dist.dir}/src">
+ <fileset dir="src" />
+ </copy>
+ <copy todir="${dist.dir}/ivy">
+ <fileset dir="ivy" />
+ </copy>
+ <copy todir="${dist.dir}/ant">
+ <fileset dir="ant" />
+ </copy>
+ <zip destfile="${basedir}/dist/${ivy.module}-${ivy.revision}-src.zip"
+ basedir="${basedir}/dist"
+ includes="${ivy.module}-${ivy.revision}/**"
+ excludes="${ivy.module}-${ivy.revision}/libs/** ${ivy.module}-${ivy.revision}/*.jar" />
+ </target>
+
+ <target name="push" depends="package" description="push built jar(s) to the repository">
+ <condition property="pubrevision"
+ value="${ivy.revision}-${build.timestamp.date}-${revision-short}"
+ else="${ivy.revision}">
+ <isset property="push.build_name" />
+ </condition>
+ <ivy:publish resolver="push" overwrite="true" pubrevision="${pubrevision}" update="true">
+ <artifacts pattern="${dist.dir}/[artifact]-${ivy.revision}.[ext]" />
+ </ivy:publish>
+ </target>
+</project>
View
55 ant/prepare.xml
@@ -0,0 +1,55 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <!-- read the ivy files but don't resolve anything yet -->
+ <target name="init" depends="install-ivy">
+ <!-- override some ivy config. must happen before ivy:info even tho it's slightly recursive. -->
+ <property name="ivy.distrib.dir" value="${dist.dir}" />
+ <property name="ivy.deliver.ivy.pattern" value="${dist.dir}/[artifact]-[revision].[ext]" />
+
+ <ivy:info file="${ivy.dep.file}" />
+
+ <condition property="dist.name" value="${ivy.module}" else="${ivy.module}-${ivy.revision}">
+ <isset property="dist.build_integration" />
+ </condition>
+ <property name="jar.name" value="${ivy.module}-${ivy.revision}" />
+ <property name="dist.relative.dir" value="dist/${dist.name}" />
+ <property name="dist.dir" value="${basedir}/${dist.relative.dir}" />
+ <property name="docs.target.dir" value="${dist.dir}/docs" />
+ </target>
+
+ <target name="resolve" depends="init">
+ <ivy:retrieve log="download-only" conf="bootstrap,default,test" pattern="${target.dir}/libs/[conf]/[artifact]-[type].[ext]" />
+ <ivy:cachepath pathid="bootstrap.path" conf="bootstrap" />
+ <ivy:cachepath pathid="deps.path" conf="default" />
+ <ivy:cachepath pathid="test.path" conf="test" />
+ </target>
+
+ <target name="find-source">
+ <pathconvert property="build.java" setonempty="false">
+ <fileset dir="${source.dir}" includes="java/**/*.java" />
+ </pathconvert>
+ <pathconvert property="build.scala" setonempty="false">
+ <fileset dir="${source.dir}" includes="scala/**/*.scala" />
+ </pathconvert>
+ <pathconvert property="build.thrift" setonempty="false">
+ <fileset dir="${source.dir}" includes="thrift/**/*.thrift" />
+ </pathconvert>
+ <pathconvert property="build.jni" setonempty="false">
+ <fileset dir="${source.dir}" includes="jni/build.xml" />
+ </pathconvert>
+ <pathconvert property="copy.config" setonempty="false">
+ <fileset dir="${basedir}" includes="config/**" />
+ </pathconvert>
+ </target>
+
+ <target name="prepare" depends="resolve, find-source" description="bootstrap ivy and scala, and download dependencies">
+ <taskdef resource="scala/tools/ant/antlib.xml" classpathref="bootstrap.path" />
+ <condition property="generate.scripts" value="true">
+ <available file="${basedir}/src/scripts" type="dir" />
+ </condition>
+ <mkdir dir="${target.dir}/classes" />
+ <mkdir dir="${target.dir}/test-classes" />
+ <mkdir dir="${dist.dir}" />
+ </target>
+
+</project>
View
56 ant/test.xml
@@ -0,0 +1,56 @@
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
+
+ <target name="compile-tests" depends="prepare, compile" unless="skip.test" if="ivy.extra.testclass">
+ <scalac srcdir="${test.source.dir}/scala" destdir="${target.dir}/test-classes" force="changed" deprecation="on">
+ <classpath>
+ <path refid="test.path" />
+ <pathelement location="${target.dir}/classes" />
+ </classpath>
+ <include name="**/*.scala" />
+ </scalac>
+ </target>
+
+ <macrodef name="run-test-class">
+ <attribute name="classname" />
+ <sequential>
+ <java classname="scala.tools.nsc.MainGenericRunner" fork="true" failonerror="true">
+ <classpath>
+ <path refid="test.path" />
+ <pathelement location="${target.dir}/classes" />
+ <pathelement location="${target.dir}/test-classes" />
+ </classpath>
+ <syspropertyset>
+ <propertyref builtin="all" />
+ </syspropertyset>
+ <jvmarg value="-Ddist.dir=${dist.dir}" />
+ <jvmarg value="-Djava.library.path=${dist.dir}:${dist.dir}/libs" />
+ <jvmarg value="-Xmx1200m" />
+ <arg value="@{classname}" />
+ </java>
+ </sequential>
+ </macrodef>
+
+ <target name="run-unit-tests" unless="skip.test" if="ivy.extra.testclass">
+ <run-test-class classname="${ivy.extra.testclass}" />
+ </target>
+
+ <target name="run-stress-tests" unless="skip.test" if="ivy.extra.stresstestclass">
+ <run-test-class classname="${ivy.extra.stresstestclass}" />
+ </target>
+
+ <target name="run-check-tests" unless="skip.test" if="ivy.extra.checktestclass">
+ <run-test-class classname="${ivy.extra.checktestclass}" />
+ </target>
+
+ <target name="unit" depends="compile-tests, run-unit-tests"
+ description="compile and run unit tests" />
+
+ <target name="stress" depends="compile-tests, run-stress-tests"
+ description="compile and run stress tests" />
+
+ <target name="check" depends="compile-tests, run-check-tests"
+ description="compile and run check tests" />
+
+ <target name="test" depends="unit" />
+
+</project>
View
6 build.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project name="twitterActors" default="package">
+ <description> TwitterActors: improved Scala actors library. </description>
+ <property name="skip.docs" value="true" />
+ <import file="ant/bootstrap.xml" />
+</project>
View
21 ivy/ivy.xml
@@ -0,0 +1,21 @@
+<ivy-module version="1.0" xmlns:e="http://ant.apache.org/ivy/extra">
+ <info
+ organisation="com.twitter"
+ module="twitterActors"
+ revision="1.0.0"
+ e:buildpackage="com.twitter.actors"
+ />
+
+ <configurations>
+ <conf name="bootstrap" visibility="private" description="load scala compiler and libraries" />
+ <conf name="default" description="normal build" />
+ <conf name="test" visibility="private" description="build and run tests" />
+ <conf name="compile" description="something broken in specs needs this" />
+ </configurations>
+ <dependencies>
+ <!-- i guess scala-compiler.jar is needed for MainGenericRunner -->
+ <dependency org="org.scala-lang" name="scala-compiler" rev="2.7.7" />
+ <dependency org="org.scala-lang" name="scala-library" rev="2.7.7" />
+ <dependency org="org.scala-tools.testing" name="specs" rev="1.6.2.1" conf="test->*"/>
+ </dependencies>
+</ivy-module>
View
32 ivy/ivysettings.xml
@@ -0,0 +1,32 @@
+<ivysettings>
+ <settings defaultResolver="chain-repos" />
+ <resolvers>
+ <chain name="chain-repos">
+ <filesystem name="local" m2compatible="true" transactional="false" local="true" checkmodified="true">
+ <artifact pattern="${basedir}/../repo/local/[organisation]/[module]/[revision]/[artifact]-[revision].[ext]" />
+ </filesystem>
+ <filesystem name="libs" m2compatible="true" transactional="false" local="true" checkmodified="true">
+ <artifact pattern="${basedir}/libs/[artifact]-[revision].[ext]" />
+ </filesystem>
+
+ <ibiblio name="scala-tools.org" m2compatible="true" root="http://scala-tools.org/repo-releases/"/>
+ <ibiblio name="specs-extra" m2compatible="true" root="http://specs.googlecode.com/svn/maven2/" />
+
+ <ibiblio name="maven2-repository.dev.java.net" m2compatible="true"
+ root="http://download.java.net/maven/2/"/>
+ <ibiblio name="powermock" m2compatible="true" root="http://powermock.googlecode.com/svn/repo/" />
+ <ibiblio name="javassist" m2compatible="true" root="http://repository.jboss.org/maven2/" />
+
+ <!-- can't use poms because commons-io pom is corrupted -->
+ <ibiblio name="maven2" m2compatible="true" usepoms="false"/>
+
+ <ibiblio name="lag.net" m2compatible="true" root="http://www.lag.net/repo/" />
+ <ibiblio name="com.twitter" m2compatible="true" root="http://www.lag.net/nest/" />
+ </chain>
+ <filesystem name="local" m2compatible="true" transactional="false">
+ <artifact pattern="${basedir}/dist/repo/[organisation]/[module]/[revision]/[artifact]-[revision].[ext]" />
+ </filesystem>
+ </resolvers>
+
+ <caches useOrigin="true"/>
+</ivysettings>
View
531 src/main/java/com/twitter/actors/FJTask.java
@@ -0,0 +1,531 @@
+/*
+ File: Task.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl first release
+ 14jan1999 dl simplify start() semantics;
+ improve documentation
+ 18Jan1999 dl Eliminate useless time-based waits.
+ 7Mar1999 dl Add reset method,
+ add array-based composite operations
+ 27Apr1999 dl Rename
+*/
+
+package com.twitter.actors;
+
+
+/**
+ * Abstract base class for Fork/Join Tasks.
+ *
+ * <p>
+ * FJTasks are lightweight, stripped-down analogs of Threads.
+ * Many FJTasks share the same pool of Java threads. This is
+ * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that
+ * mainly contain
+ * methods called only internally by FJTasks.
+ * FJTasks support versions of the most common methods found in class Thread,
+ * including start(), yield() and join(). However, they
+ * don't support priorities, ThreadGroups or other bookkeeping
+ * or control methods of class Thread.
+ * <p>
+ * FJTasks should normally be defined by subclassing and adding a run() method.
+ * Alternatively, static inner class <code>Wrap(Runnable r)</code>
+ * can be used to
+ * wrap an existing Runnable object in a FJTask.
+ * <p>
+ * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to
+ * initiate a FJTask from a non-FJTask thread.
+ * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate
+ * a FJTask and then wait for it to complete before returning.
+ * These are the only entry-points from normal threads to FJTasks.
+ * Most FJTask methods themselves may only be called from within running FJTasks.
+ * They throw ClassCastExceptions if they are not,
+ * reflecting the fact that these methods
+ * can only be executed using FJTaskRunner threads, not generic
+ * java.lang.Threads.
+ * <p>
+ * There are three different ways to run a FJTask,
+ * with different scheduling semantics:
+ * <ul>
+ * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask))
+ * behaves pretty much like Thread.start(). It enqueues a task to be
+ * run the next time any FJTaskRunner thread is otherwise idle.
+ * It maintains standard FIFO ordering with respect to
+ * the group of worker threads.
+ * <li> FJTask.fork() (as well as the two-task spawning method,
+ * coInvoke(task1, task2), and the array version
+ * coInvoke(FJTask[] tasks)) starts a task
+ * that will be executed in
+ * procedure-call-like LIFO order if executed by the
+ * same worker thread as the one that created it, but is FIFO
+ * with respect to other tasks if it is run by
+ * other worker threads. That is, earlier-forked
+ * tasks are preferred to later-forked tasks by other idle workers.
+ * Fork() is noticeably faster than start(), but can only be
+ * used when these scheduling semantics are acceptable.
+ * <li> FJTask.invoke(FJTask) just executes the run method
+ * of one task from within another. It is the analog of a
+ * direct call.
+ * </ul>
+ * <p>
+ * The main economies of FJTasks stem from the fact that
+ * FJTasks do not support blocking operations of any kind.
+ * FJTasks should just run to completion without
+ * issuing waits or performing blocking IO.
+ * There are several styles for creating the run methods that
+ * execute as tasks, including
+ * event-style methods, and pure computational methods.
+ * Generally, the best kinds of FJTasks are those that in turn
+ * generate other FJTasks.
+ * <p>
+ * There is nothing actually
+ * preventing you from blocking within a FJTask, and very short waits/blocks are
+ * completely well behaved. But FJTasks are not designed
+ * to support arbitrary synchronization
+ * since there is no way to suspend and resume individual tasks
+ * once they have begun executing. FJTasks should also be finite
+ * in duration -- they should not contain infinite loops.
+ * FJTasks that might need to perform a blocking
+ * action, or hold locks for extended periods, or
+ * loop forever can instead create normal
+ * java Thread objects that will do so. FJTasks are just not
+ * designed to support these things.
+ * FJTasks may however yield() control to allow their FJTaskRunner threads
+ * to run other tasks,
+ * and may wait for other dependent tasks via join(). These
+ * are the only coordination mechanisms supported by FJTasks.
+ * <p>
+ * FJTasks, and the FJTaskRunners that execute them are not
+ * intrinsically robust with respect to exceptions.
+ * A FJTask that aborts via an exception does not automatically
+ * have its completion flag (isDone) set.
+ * As with ordinary Threads, an uncaught exception will normally cause
+ * its FJTaskRunner thread to die, which in turn may sometimes
+ * cause other computations being performed to hang or abort.
+ * You can of course
+ * do better by trapping exceptions inside the run methods of FJTasks.
+ * <p>
+ * The overhead differences between FJTasks and Threads are substantial,
+ * especially when using fork() or coInvoke().
+ * FJTasks can be two or three orders of magnitude faster than Threads,
+ * at least when run on JVMs with high-performance garbage collection
+ * (every FJTask quickly becomes garbage) and good native thread support.
+ * <p>
+ * Given these overhead savings, you might be tempted to use FJTasks for
+ * everything you would use a normal Thread to do. Don't. Java Threads
+ * remain better for general purpose thread-based programming. Remember
+ * that FJTasks cannot be used for designs involving arbitrary blocking
+ * synchronization or I/O. Extending FJTasks to support such capabilities
+ * would amount to re-inventing the Thread class, and would make them
+ * less optimal in the contexts that they were designed for.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * <p>
+ * @see FJTaskRunner
+ * @see FJTaskRunnerGroup
+ **/
+
+public abstract class FJTask implements Runnable {
+
+ /**
+ * The only status information associated with FJTasks is whether
+ * the they are considered to have completed.
+ * It is set true automatically within
+ * FJTaskRunner methods upon completion
+ * of the run method, or manually via cancel.
+ **/
+
+ private volatile boolean done; // = false;
+
+ /**
+ * Return the FJTaskRunner thread running the current FJTask.
+ * Most FJTask methods are just relays to their current
+ * FJTaskRunners, that perform the indicated actions.
+ * @exception ClassCastException if caller thread is not a
+ * running FJTask.
+ **/
+
+ public static FJTaskRunner getFJTaskRunner() {
+ return (FJTaskRunner)(Thread.currentThread());
+ }
+
+ /**
+ * Return the FJTaskRunnerGroup of the thread running the current FJTask.
+ * @exception ClassCastException if caller thread is not a
+ * running FJTask.
+ **/
+ public static IFJTaskRunnerGroup getFJTaskRunnerGroup() {
+ return getFJTaskRunner().getGroup();
+ }
+
+
+ /**
+ * Return true if current task has terminated or been cancelled.
+ * The method is a simple analog of the Thread.isAlive()
+ * method. However, it reports true only when the task has terminated
+ * or has been cancelled. It does not distinguish these two cases.
+ * And there is no way to determine whether a FJTask has been started
+ * or is currently executing.
+ **/
+
+ public final boolean isDone() { return done; }
+
+ /**
+ * Indicate termination. Intended only to be called by FJTaskRunner.
+ * FJTasks themselves should use (non-final) method
+ * cancel() to suppress execution.
+ **/
+
+ protected final void setDone() { done = true; }
+
+ /**
+ * Set the termination status of this task. This simple-minded
+ * analog of Thread.interrupt
+ * causes the task not to execute if it has not already been started.
+ * Cancelling a running FJTask
+ * has no effect unless the run method itself uses isDone()
+ * to probe cancellation and take appropriate action.
+ * Individual run() methods may sense status and
+ * act accordingly, normally by returning early.
+ **/
+
+ public void cancel() { setDone(); }
+
+
+ /**
+ * Clear the termination status of this task.
+ * This method is intended to be used
+ * only as a means to allow task objects to be recycled. It should
+ * be called only when you are sure that the previous
+ * execution of this task has terminated and, if applicable, has
+ * been joined by all other waiting tasks. Usage in any other
+ * context is a very bad idea.
+ **/
+
+ public void reset() { done = false; }
+
+
+ /**
+ * Execute this task. This method merely places the task in a
+ * group-wide scheduling queue.
+ * It will be run
+ * the next time any TaskRunner thread is otherwise idle.
+ * This scheduling maintains FIFO ordering of started tasks
+ * with respect to
+ * the group of worker threads.
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void start() { getFJTaskRunnerGroup().executeTask(this); }
+
+
+ /**
+ * Arrange for execution of a strictly dependent task.
+ * The task that will be executed in
+ * procedure-call-like LIFO order if executed by the
+ * same worker thread, but is FIFO with respect to other tasks
+ * forked by this thread when taken by other worker threads.
+ * That is, earlier-forked
+ * tasks are preferred to later-forked tasks by other idle workers.
+ * <p>
+ * Fork() is noticeably
+ * faster than start(). However, it may only
+ * be used for strictly dependent tasks -- generally, those that
+ * could logically be issued as straight method calls without
+ * changing the logic of the program.
+ * The method is optimized for use in parallel fork/join designs
+ * in which the thread that issues one or more forks
+ * cannot continue until at least some of the forked
+ * threads terminate and are joined.
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void fork() { getFJTaskRunner().push(this); }
+
+ /**
+ * Allow the current underlying FJTaskRunner thread to process other tasks.
+ * <p>
+ * Spinloops based on yield() are well behaved so long
+ * as the event or condition being waited for is produced via another
+ * FJTask. Additionally, you must never hold a lock
+ * while performing a yield or join. (This is because
+ * multiple FJTasks can be run by the same Thread during
+ * a yield. Since java locks are held per-thread, the lock would not
+ * maintain the conceptual exclusion you have in mind.)
+ * <p>
+ * Otherwise, spinloops using
+ * yield are the main construction of choice when a task must wait
+ * for a condition that it is sure will eventually occur because it
+ * is being produced by some other FJTask. The most common
+ * such condition is built-in: join() repeatedly yields until a task
+ * has terminated after producing some needed results. You can also
+ * use yield to wait for callbacks from other FJTasks, to wait for
+ * status flags to be set, and so on. However, in all these cases,
+ * you should be confident that the condition being waited for will
+ * occur, essentially always because it is produced by
+ * a FJTask generated by the current task, or one of its subtasks.
+ *
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public static void yield() { getFJTaskRunner().taskYield(); }
+
+ /**
+ * Yield until this task isDone.
+ * Equivalent to <code>while(!isDone()) yield(); </code>
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public void join() { getFJTaskRunner().taskJoin(this); }
+
+ /**
+ * Immediately execute task t by calling its run method. Has no
+ * effect if t has already been run or has been cancelled.
+ * It is equivalent to calling t.run except that it
+ * deals with completion status, so should always be used
+ * instead of directly calling run.
+ * The method can be useful
+ * when a computation has been packaged as a FJTask, but you just need to
+ * directly execute its body from within some other task.
+ **/
+
+ public static void invoke(FJTask t) {
+ if (!t.isDone()) {
+ t.run();
+ t.setDone();
+ }
+ }
+
+ /**
+ * Fork both tasks and then wait for their completion. It behaves as:
+ * <pre>
+ * task1.fork(); task2.fork(); task2.join(); task1.join();
+ * </pre>
+ * As a simple classic example, here is
+ * a class that computes the Fibonacci function:
+ * <pre>
+ * public class Fib extends FJTask {
+ *
+ * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1
+ * // fibonacci(0) = 0;
+ * // fibonacci(1) = 1.
+ *
+ * // Value to compute fibonacci function for.
+ * // It is replaced with the answer when computed.
+ * private volatile int number;
+ *
+ * public Fib(int n) { number = n; }
+ *
+ * public int getAnswer() {
+ * if (!isDone()) throw new Error("Not yet computed");
+ * return number;
+ * }
+ *
+ * public void run() {
+ * int n = number;
+ * if (n > 1) {
+ * Fib f1 = new Fib(n - 1);
+ * Fib f2 = new Fib(n - 2);
+ *
+ * coInvoke(f1, f2); // run these in parallel
+ *
+ * // we know f1 and f2 are computed, so just directly access numbers
+ * number = f1.number + f2.number;
+ * }
+ * }
+ *
+ * public static void main(String[] args) { // sample driver
+ * try {
+ * int groupSize = 2; // 2 worker threads
+ * int num = 35; // compute fib(35)
+ * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
+ * Fib f = new Fib(num);
+ * group.invoke(f);
+ * int result = f.getAnswer();
+ * System.out.println(" Answer: " + result);
+ * }
+ * catch (InterruptedException ex) {
+ * System.out.println("Interrupted");
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * @exception ClassCastException if caller thread is not
+ * running in a FJTaskRunner thread.
+ **/
+
+ public static void coInvoke(FJTask task1, FJTask task2) {
+ getFJTaskRunner().coInvoke(task1, task2);
+ }
+
+
+ /**
+ * Fork all tasks in array, and await their completion.
+ * Behaviorally equivalent to:
+ * <pre>
+ * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].fork();
+ * for (int i = 0; i &lt; tasks.length; ++i) tasks[i].join();
+ * </pre>
+ **/
+
+ public static void coInvoke(FJTask[] tasks) {
+ getFJTaskRunner().coInvoke(tasks);
+ }
+
+ /**
+ * A FJTask that holds a Runnable r, and calls r.run when executed.
+ * The class is a simple utilty to allow arbitrary Runnables
+ * to be used as FJTasks.
+ **/
+
+ public static class Wrap extends FJTask {
+ protected final Runnable runnable;
+ public Wrap(Runnable r) { runnable = r; }
+ public void run() { runnable.run(); }
+ }
+
+
+ /**
+ * A <code>new Seq</code>, when executed,
+ * invokes each task provided in the constructor, in order.
+ * The class is a simple utility
+ * that makes it easier to create composite FJTasks.
+ **/
+ public static class Seq extends FJTask {
+ protected final FJTask[] tasks;
+
+ /**
+ * Construct a Seq that, when executed, will process each of the
+ * tasks in the tasks array in order
+ **/
+ public Seq(FJTask[] tasks) {
+ this.tasks = tasks;
+ }
+
+ /**
+ * Two-task constructor, for compatibility with previous release.
+ **/
+ public Seq(FJTask task1, FJTask task2) {
+ this.tasks = new FJTask[] { task1, task2 };
+ }
+
+ public void run() {
+ for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]);
+ }
+ }
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke the tasks in the tasks array in array order
+ **/
+
+ public static FJTask seq(FJTask[] tasks) {
+ return new Seq(tasks);
+ }
+
+ /**
+ * A <code>new Par</code>, when executed,
+ * runs the tasks provided in the constructor in parallel using
+ * coInvoke(tasks).
+ * The class is a simple utility
+ * that makes it easier to create composite FJTasks.
+ **/
+ public static class Par extends FJTask {
+ protected final FJTask[] tasks;
+
+ /**
+ * Construct a Seq that, when executed, will process each of the
+ * tasks in the tasks array in parallel
+ **/
+ public Par(FJTask[] tasks) {
+ this.tasks = tasks;
+ }
+
+ /**
+ * Two-task constructor, for compatibility with previous release.
+ **/
+ public Par(FJTask task1, FJTask task2) {
+ this.tasks = new FJTask[] { task1, task2 };
+ }
+
+
+ public void run() {
+ FJTask.coInvoke(tasks);
+ }
+ }
+
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke the tasks in the tasks array in parallel using coInvoke
+ **/
+ public static FJTask par(FJTask[] tasks) {
+ return new Par(tasks);
+ }
+
+ /**
+ * A <code>new Seq2(task1, task2)</code>, when executed,
+ * invokes task1 and then task2, in order.
+ * The class is a simple utility
+ * that makes it easier to create composite Tasks.
+ **/
+ public static class Seq2 extends FJTask {
+ protected final FJTask fst;
+ protected final FJTask snd;
+ public Seq2(FJTask task1, FJTask task2) {
+ fst = task1;
+ snd = task2;
+ }
+ public void run() {
+ FJTask.invoke(fst);
+ FJTask.invoke(snd);
+ }
+ }
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke task1 and task2, in order
+ **/
+
+ public static FJTask seq(FJTask task1, FJTask task2) {
+ return new Seq2(task1, task2);
+ }
+
+ /**
+ * A <code>new Par(task1, task2)</code>, when executed,
+ * runs task1 and task2 in parallel using coInvoke(task1, task2).
+ * The class is a simple utility
+ * that makes it easier to create composite Tasks.
+ **/
+ public static class Par2 extends FJTask {
+ protected final FJTask fst;
+ protected final FJTask snd;
+ public Par2(FJTask task1, FJTask task2) {
+ fst = task1;
+ snd = task2;
+ }
+ public void run() {
+ FJTask.coInvoke(fst, snd);
+ }
+ }
+
+
+ /**
+ * Construct and return a FJTask object that, when executed, will
+ * invoke task1 and task2, in parallel
+ **/
+ public static FJTask par(FJTask task1, FJTask task2) {
+ return new Par2(task1, task2);
+ }
+
+}
View
996 src/main/java/com/twitter/actors/FJTaskRunner.java
@@ -0,0 +1,996 @@
+/*
+ File: FJTaskRunner.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl First public release
+ 13Jan1999 dl correct a stat counter update;
+ ensure inactive status on run termination;
+ misc minor cleaup
+ 14Jan1999 dl Use random starting point in scan;
+ variable renamings.
+ 18Jan1999 dl Runloop allowed to die on task exception;
+ remove useless timed join
+ 22Jan1999 dl Rework scan to allow use of priorities.
+ 6Feb1999 dl Documentation updates.
+ 7Mar1999 dl Add array-based coInvoke
+ 31Mar1999 dl Revise scan to remove need for NullTasks
+ 27Apr1999 dl Renamed
+ 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
+ 24nov1999 dl Now works on JVMs that do not properly
+ implement read-after-write of 2 volatiles.
+*/
+
+package com.twitter.actors;
+
+import java.util.Random;
+
+/**
+ * Specialized Thread subclass for running FJTasks.
+ * <p>
+ * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
+ * Double-ended queues support stack-based operations
+ * push and pop, as well as queue-based operations put and take.
+ * Normally, threads run their own tasks. But they
+ * may also steal tasks from each others DEQs.
+ * <p>
+ * The algorithms are minor variants of those used
+ * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
+ * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
+ * to a lesser extent
+ * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
+ * but are adapted to work in Java.
+ * <p>
+ * The two most important capabilities are:
+ * <ul>
+ * <li> Fork a FJTask:
+ * <pre>
+ * Push task onto DEQ
+ * </pre>
+ * <li> Get a task to run (for example within taskYield)
+ * <pre>
+ * If DEQ is not empty,
+ * Pop a task and run it.
+ * Else if any other DEQ is not empty,
+ * Take ("steal") a task from it and run it.
+ * Else if the entry queue for our group is not empty,
+ * Take a task from it and run it.
+ * Else if current thread is otherwise idling
+ * If all threads are idling
+ * Wait for a task to be put on group entry queue
+ * Else
+ * Yield or Sleep for a while, and then retry
+ * </pre>
+ * </ul>
+ * The push, pop, and put are designed to only ever called by the
+ * current thread, and take (steal) is only ever called by
+ * other threads.
+ * All other operations are composites and variants of these,
+ * plus a few miscellaneous bookkeeping methods.
+ * <p>
+ * Implementations of the underlying representations and operations
+ * are geared for use on JVMs operating on multiple CPUs (although
+ * they should of course work fine on single CPUs as well).
+ * <p>
+ * A possible snapshot of a FJTaskRunner's DEQ is:
+ * <pre>
+ * 0 1 2 3 4 5 6 ...
+ * +-----+-----+-----+-----+-----+-----+-----+--
+ * | | t | t | t | t | | | ... deq array
+ * +-----+-----+-----+-----+-----+-----+-----+--
+ * ^ ^
+ * base top
+ * (incremented (incremented
+ * on take, on push
+ * decremented decremented
+ * on put) on pop)
+ * </pre>
+ * <p>
+ * FJTasks are held in elements of the DEQ.
+ * They are maintained in a bounded array that
+ * works similarly to a circular bounded buffer. To ensure
+ * visibility of stolen FJTasks across threads, the array elements
+ * must be <code>volatile</code>.
+ * Using volatile rather than synchronizing suffices here since
+ * each task accessed by a thread is either one that it
+ * created or one that has never seen before. Thus we cannot
+ * encounter any staleness problems executing run methods,
+ * although FJTask programmers must be still sure to either synch or use
+ * volatile for shared data within their run methods.
+ * <p>
+ * However, since there is no way
+ * to declare an array of volatiles in Java, the DEQ elements actually
+ * hold VolatileTaskRef objects, each of which in turn holds a
+ * volatile reference to a FJTask.
+ * Even with the double-indirection overhead of
+ * volatile refs, using an array for the DEQ works out
+ * better than linking them since fewer shared
+ * memory locations need to be
+ * touched or modified by the threads while using the DEQ.
+ * Further, the double indirection may alleviate cache-line
+ * sharing effects (which cannot otherwise be directly dealt with in Java).
+ * <p>
+ * The indices for the <code>base</code> and <code>top</code> of the DEQ
+ * are declared as volatile. The main contention point with
+ * multiple FJTaskRunner threads occurs when one thread is trying
+ * to pop its own stack while another is trying to steal from it.
+ * This is handled via a specialization of Dekker's algorithm,
+ * in which the popping thread pre-decrements <code>top</code>,
+ * and then checks it against <code>base</code>.
+ * To be conservative in the face of JVMs that only partially
+ * honor the specification for volatile, the pop proceeds
+ * without synchronization only if there are apparently enough
+ * items for both a simultaneous pop and take to succeed.
+ * It otherwise enters a
+ * synchronized lock to check if the DEQ is actually empty,
+ * if so failing. The stealing thread
+ * does almost the opposite, but is set up to be less likely
+ * to win in cases of contention: Steals always run under synchronized
+ * locks in order to avoid conflicts with other ongoing steals.
+ * They pre-increment <code>base</code>, and then check against
+ * <code>top</code>. They back out (resetting the base index
+ * and failing to steal) if the
+ * DEQ is empty or is about to become empty by an ongoing pop.
+ * <p>
+ * A push operation can normally run concurrently with a steal.
+ * A push enters a synch lock only if the DEQ appears full so must
+ * either be resized or have indices adjusted due to wrap-around
+ * of the bounded DEQ. The put operation always requires synchronization.
+ * <p>
+ * When a FJTaskRunner thread has no tasks of its own to run,
+ * it tries to be a good citizen.
+ * Threads run at lower priority while scanning for work.
+ * <p>
+ * If the task is currently waiting
+ * via yield, the thread alternates scans (starting at a randomly
+ * chosen victim) with Thread.yields. This is
+ * well-behaved so long as the JVM handles Thread.yield in a
+ * sensible fashion. (It need not. Thread.yield is so underspecified
+ * that it is legal for a JVM to treat it as a no-op.) This also
+ * keeps things well-behaved even if we are running on a uniprocessor
+ * JVM using a simple cooperative threading model.
+ * <p>
+ * If a thread needing work is
+ * is otherwise idle (which occurs only in the main runloop), and
+ * there are no available tasks to steal or poll, it
+ * instead enters into a sleep-based (actually timed wait(msec))
+ * phase in which it progressively sleeps for longer durations
+ * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
+ * currently 100ms) between scans.
+ * If all threads in the group
+ * are idling, they further progress to a hard wait phase, suspending
+ * until a new task is entered into the FJTaskRunnerGroup entry queue.
+ * A sleeping FJTaskRunner thread may be awakened by a new
+ * task being put into the group entry queue or by another FJTaskRunner
+ * becoming active, but not merely by some DEQ becoming non-empty.
+ * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
+ * in cases where all but one worker thread start sleeping
+ * even though there will eventually be work produced
+ * by a thread that is taking a long time to place tasks in DEQ.
+ * These sleep mechanics are handled in the FJTaskRunnerGroup class.
+ * <p>
+ * Composite operations such as taskJoin include heavy
+ * manual inlining of the most time-critical operations
+ * (mainly FJTask.invoke).
+ * This opens up a few opportunities for further hand-optimizations.
+ * Until Java compilers get a lot smarter, these tweaks
+ * improve performance significantly enough for task-intensive
+ * programs to be worth the poorer maintainability and code duplication.
+ * <p>
+ * Because they are so fragile and performance-sensitive, nearly
+ * all methods are declared as final. However, nearly all fields
+ * and methods are also declared as protected, so it is possible,
+ * with much care, to extend functionality in subclasses. (Normally
+ * you would also need to subclass FJTaskRunnerGroup.)
+ * <p>
+ * None of the normal java.lang.Thread class methods should ever be called
+ * on FJTaskRunners. For this reason, it might have been nicer to
+ * declare FJTaskRunner as a Runnable to run within a Thread. However,
+ * this would have complicated many minor logistics. And since
+ * no FJTaskRunner methods should normally be called from outside the
+ * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
+ * usage.
+ * <p>
+ * You might think that layering this kind of framework on top of
+ * Java threads, which are already several levels removed from raw CPU
+ * scheduling on most systems, would lead to very poor performance.
+ * But on the platforms
+ * tested, the performance is quite good.
+ * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
+ * @see FJTask
+ * @see FJTaskRunnerGroup
+ **/
+
+public class FJTaskRunner extends Thread {
+
+ /** The group of which this FJTaskRunner is a member **/
+ protected final IFJTaskRunnerGroup group;
+
+ /**
+ * Constructor called only during FJTaskRunnerGroup initialization
+ **/
+
+ /*protected*/ public FJTaskRunner(IFJTaskRunnerGroup g) {
+ group = g;
+ victimRNG = new Random(System.identityHashCode(this));
+ runPriority = getPriority();
+ setDaemon(true);
+ }
+
+ /**
+ * Return the FJTaskRunnerGroup of which this thread is a member
+ **/
+
+ protected final IFJTaskRunnerGroup getGroup() { return group; }
+
+
+ /* ------------ DEQ Representation ------------------- */
+
+
+ /**
+ * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
+ * elements. The DEQ is grown if necessary, but default value is
+ * normally much more than sufficient unless there are
+ * user programming errors or questionable operations generating
+ * large numbers of Tasks without running them.
+ * Capacities must be a power of two.
+ **/
+
+ protected static final int INITIAL_CAPACITY = 4096;
+
+ /**
+ * The maximum supported DEQ capacity.
+ * When exceeded, FJTaskRunner operations throw Errors
+ **/
+
+ protected static final int MAX_CAPACITY = 1 << 30;
+
+ /**
+ * An object holding a single volatile reference to a FJTask.
+ **/
+
+ protected final static class VolatileTaskRef {
+ /** The reference **/
+ protected volatile FJTask ref;
+
+ /** Set the reference **/
+ protected final void put(FJTask r) { ref = r; }
+ /** Return the reference **/
+ protected final FJTask get() { return ref; }
+ /** Return the reference and clear it **/
+ protected final FJTask take() { FJTask r = ref; ref = null; return r; }
+
+ /**
+ * Initialization utility for constructing arrays.
+ * Make an array of given capacity and fill it with
+ * VolatileTaskRefs.
+ **/
+ protected static VolatileTaskRef[] newArray(int cap) {
+ VolatileTaskRef[] a = new VolatileTaskRef[cap];
+ for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
+ return a;
+ }
+
+ }
+
+ /**
+ * The DEQ array.
+ **/
+
+ protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
+
+ /** Current size of the task DEQ **/
+ protected int deqSize() { return deq.length; }
+
+ /**
+ * Current top of DEQ. Generally acts just like a stack pointer in an
+ * array-based stack, except that it circularly wraps around the
+ * array, as in an array-based queue. The value is NOT
+ * always kept within <code>0 ... deq.length</code> though.
+ * The current top element is always at <code>top & (deq.length-1)</code>.
+ * To avoid integer overflow, top is reset down
+ * within bounds whenever it is noticed to be out out bounds;
+ * at worst when it is at <code>2 * deq.length</code>.
+ **/
+ protected volatile int top = 0;
+
+
+ /**
+ * Current base of DEQ. Acts like a take-pointer in an
+ * array-based bounded queue. Same bounds and usage as top.
+ **/
+
+ protected volatile int base = 0;
+
+
+ /**
+ * An extra object to synchronize on in order to
+ * achieve a memory barrier.
+ **/
+
+ protected final Object barrier = new Object();
+
+ /* ------------ Other BookKeeping ------------------- */
+
+ /**
+ * Record whether current thread may be processing a task
+ * (i.e., has been started and is not in an idle wait).
+ * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
+ * stored here for simplicity.
+ **/
+
+ /*protected*/ public boolean active = false;
+
+ /** Random starting point generator for scan() **/
+ protected final Random victimRNG;
+
+
+ /** Priority to use while scanning for work **/
+ protected int scanPriority = Thread.MIN_PRIORITY + 1;
+
+ /** Priority to use while running tasks **/
+ protected int runPriority;
+
+ /**
+ * Set the priority to use while scanning.
+ * We do not bother synchronizing access, since
+ * by the time the value is needed, both this FJTaskRunner
+ * and its FJTaskRunnerGroup will
+ * necessarily have performed enough synchronization
+ * to avoid staleness problems of any consequence.
+ **/
+ protected void setScanPriority(int pri) { scanPriority = pri; }
+
+
+ /**
+ * Set the priority to use while running tasks.
+ * Same usage and rationale as setScanPriority.
+ **/
+ protected void setRunPriority(int pri) { runPriority = pri; }
+
+ /**
+ * Compile-time constant for statistics gathering.
+ * Even when set, reported values may not be accurate
+ * since all are read and written without synchronization.
+ **/
+
+
+
+ static final boolean COLLECT_STATS = true;
+ // static final boolean COLLECT_STATS = false;
+
+
+ // for stat collection
+
+ /** Total number of tasks run **/
+ protected int runs = 0;
+
+ /** Total number of queues scanned for work **/
+ protected int scans = 0;
+
+ /** Total number of tasks obtained via scan **/
+ protected int steals = 0;
+
+
+
+ /* -------- Suspending -------- */
+ protected boolean suspending = false;
+
+ synchronized void setSuspending(boolean susp) {
+ suspending = susp;
+ }
+
+ /* ------------ DEQ operations ------------------- */
+
+
+ /**
+ * Push a task onto DEQ.
+ * Called ONLY by current thread.
+ **/
+
+ /*protected*/ public final void push(final FJTask r) {
+ int t = top;
+
+ /*
+ This test catches both overflows and index wraps. It doesn't
+ really matter if base value is in the midst of changing in take.
+ As long as deq length is < 2^30, we are guaranteed to catch wrap in
+ time since base can only be incremented at most length times
+ between pushes (or puts).
+ */
+
+ if (t < (base & (deq.length-1)) + deq.length) {
+
+ deq[t & (deq.length-1)].put(r);
+ top = t + 1;
+ }
+
+ else // isolate slow case to increase chances push is inlined
+ slowPush(r); // check overflow and retry
+ }
+
+
+ /**
+ * Handle slow case for push
+ **/
+
+ protected synchronized void slowPush(final FJTask r) {
+ checkOverflow();
+ push(r); // just recurse -- this one is sure to succeed.
+ }
+
+
+ /**
+ * Enqueue task at base of DEQ.
+ * Called ONLY by current thread.
+ * This method is currently not called from class FJTask. It could be used
+ * as a faster way to do FJTask.start, but most users would
+ * find the semantics too confusing and unpredictable.
+ **/
+
+ protected final synchronized void put(final FJTask r) {
+ for (;;) {
+ int b = base - 1;
+ if (top < b + deq.length) {
+
+ int newBase = b & (deq.length-1);
+ deq[newBase].put(r);
+ base = newBase;
+
+ if (b != newBase) { // Adjust for index underflow
+ int newTop = top & (deq.length-1);
+ if (newTop < newBase) newTop += deq.length;
+ top = newTop;
+ }
+ return;
+ }
+ else {
+ checkOverflow();
+ // ... and retry
+ }
+ }
+ }
+
+ /**
+ * Return a popped task, or null if DEQ is empty.
+ * Called ONLY by current thread.
+ * <p>
+ * This is not usually called directly but is
+ * instead inlined in callers. This version differs from the
+ * cilk algorithm in that pop does not fully back down and
+ * retry in the case of potential conflict with take. It simply
+ * rechecks under synch lock. This gives a preference
+ * for threads to run their own tasks, which seems to
+ * reduce flailing a bit when there are few tasks to run.
+ **/
+
+ protected final FJTask pop() {
+ /*
+ Decrement top, to force a contending take to back down.
+ */
+
+ int t = --top;
+
+ /*
+ To avoid problems with JVMs that do not properly implement
+ read-after-write of a pair of volatiles, we conservatively
+ grab without lock only if the DEQ appears to have at least two
+ elements, thus guaranteeing that both a pop and take will succeed,
+ even if the pre-increment in take is not seen by current thread.
+ Otherwise we recheck under synch.
+ */
+
+ if (base + 1 < t)
+ return deq[t & (deq.length-1)].take();
+ else
+ return confirmPop(t);
+
+ }
+
+
+ /**
+ * Check under synch lock if DEQ is really empty when doing pop.
+ * Return task if not empty, else null.
+ **/
+
+ protected final synchronized FJTask confirmPop(int provisionalTop) {
+ if (base <= provisionalTop)
+ return deq[provisionalTop & (deq.length-1)].take();
+ else { // was empty
+ /*
+ Reset DEQ indices to zero whenever it is empty.
+ This both avoids unnecessary calls to checkOverflow
+ in push, and helps keep the DEQ from accumulating garbage
+ */
+
+ top = base = 0;
+ return null;
+ }
+ }
+
+
+ /**
+ * Take a task from the base of the DEQ.
+ * Always called by other threads via scan()
+ **/
+
+
+ protected final synchronized FJTask take() {
+
+ /*
+ Increment base in order to suppress a contending pop
+ */
+
+ int b = base++;
+
+ if (b < top)
+ return confirmTake(b);
+ else {
+ // back out
+ base = b;
+ return null;
+ }
+ }
+
+
+ /**
+ * double-check a potential take
+ **/
+
+ protected FJTask confirmTake(int oldBase) {
+
+ /*
+ Use a second (guaranteed uncontended) synch
+ to serve as a barrier in case JVM does not
+ properly process read-after-write of 2 volatiles
+ */
+
+ synchronized(barrier) {
+ if (oldBase < top) {
+ /*
+ We cannot call deq[oldBase].take here because of possible races when
+ nulling out versus concurrent push operations. Resulting
+ accumulated garbage is swept out periodically in
+ checkOverflow, or more typically, just by keeping indices
+ zero-based when found to be empty in pop, which keeps active
+ region small and constantly overwritten.
+ */
+
+ return deq[oldBase & (deq.length-1)].get();
+ }
+ else {
+ base = oldBase;
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Adjust top and base, and grow DEQ if necessary.
+ * Called only while DEQ synch lock being held.
+ * We don't expect this to be called very often. In most
+ * programs using FJTasks, it is never called.
+ **/
+
+ protected void checkOverflow() {
+ int t = top;
+ int b = base;
+
+ if (t - b < deq.length-1) { // check if just need an index reset
+
+ int newBase = b & (deq.length-1);
+ int newTop = top & (deq.length-1);
+ if (newTop < newBase) newTop += deq.length;
+ top = newTop;
+ base = newBase;
+
+ /*
+ Null out refs to stolen tasks.
+ This is the only time we can safely do it.
+ */
+
+ int i = newBase;
+ while (i != newTop && deq[i].ref != null) {
+ deq[i].ref = null;
+ i = (i - 1) & (deq.length-1);
+ }
+
+ }
+ else { // grow by doubling array
+
+ int newTop = t - b;
+ int oldcap = deq.length;
+ int newcap = oldcap * 2;
+
+ if (newcap >= MAX_CAPACITY)
+ throw new Error("FJTask queue maximum capacity exceeded");
+
+ VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
+
+ // copy in bottom half of new deq with refs from old deq
+ for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
+
+ // fill top half of new deq with new refs
+ for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
+
+ deq = newdeq;
+ base = 0;
+ top = newTop;
+ }
+ }
+
+
+ /* ------------ Scheduling ------------------- */
+
+
+ /**
+ * Do all but the pop() part of yield or join, by
+ * traversing all DEQs in our group looking for a task to
+ * steal. If none, it checks the entry queue.
+ * <p>
+ * Since there are no good, portable alternatives,
+ * we rely here on a mixture of Thread.yield and priorities
+ * to reduce wasted spinning, even though these are
+ * not well defined. We are hoping here that the JVM
+ * does something sensible.
+ * @param waitingFor if non-null, the current task being joined
+ **/
+
+ protected void scan(final FJTask waitingFor) {
+
+ FJTask task = null;
+
+ // to delay lowering priority until first failure to steal
+ boolean lowered = false;
+
+ /*
+ Circularly traverse from a random start index.
+
+ This differs slightly from cilk version that uses a random index
+ for each attempted steal.
+ Exhaustive scanning might impede analytic tractablity of
+ the scheduling policy, but makes it much easier to deal with
+ startup and shutdown.
+ */
+
+ FJTaskRunner[] ts = group.getArray();
+ int idx = victimRNG.nextInt(ts.length);
+
+ for (int i = 0; i < ts.length; ++i) {
+
+ FJTaskRunner t = ts[idx];
+ if (++idx >= ts.length) idx = 0; // circularly traverse
+
+ if (t != null && t != this) {
+
+ if (waitingFor != null && waitingFor.isDone()) {
+ break;
+ }
+ else {
+ if (COLLECT_STATS) ++scans;
+ task = t.take();
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ break;
+ }
+ else if (isInterrupted()) {
+ break;
+ }
+ else if (!lowered) { // if this is first fail, lower priority
+ lowered = true;
+ setPriority(scanPriority);
+ }
+ else { // otherwise we are at low priority; just yield
+ yield();
+ }
+ }
+ }
+
+ }
+
+ if (task == null) {
+ if (COLLECT_STATS) ++scans;
+ task = group.pollEntryQueue();
+ if (COLLECT_STATS) if (task != null) ++steals;
+ }
+
+ if (lowered) setPriority(runPriority);
+
+ if (task != null && !task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+
+ }
+
+ /**
+ * Same as scan, but called when current thread is idling.
+ * It repeatedly scans other threads for tasks,
+ * sleeping while none are available.
+ * <p>
+ * This differs from scan mainly in that
+ * since there is no reason to return to recheck any
+ * condition, we iterate until a task is found, backing
+ * off via sleeps if necessary.
+ **/
+
+ protected void scanWhileIdling() {
+ FJTask task = null;
+
+ boolean lowered = false;
+ long iters = 0;
+
+ FJTaskRunner[] ts = group.getArray();
+ int idx = victimRNG.nextInt(ts.length);
+
+ do {
+ for (int i = 0; i < ts.length; ++i) {
+
+ FJTaskRunner t = ts[idx];
+ if (++idx >= ts.length) idx = 0; // circularly traverse
+
+ if (t != null && t != this) {
+ if (COLLECT_STATS) ++scans;
+
+ task = t.take();
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ if (lowered) setPriority(runPriority);
+ group.setActive(this);
+ break;
+ }
+ }
+ }
+
+ if (task == null) {
+ if (isInterrupted())
+ return;
+
+ if (COLLECT_STATS) ++scans;
+ task = group.pollEntryQueue();
+
+ if (task != null) {
+ if (COLLECT_STATS) ++steals;
+ if (lowered) setPriority(runPriority);
+ group.setActive(this);
+ }
+ else {
+ ++iters;
+ // Check here for yield vs sleep to avoid entering group synch lock
+ if (iters >= /*group.SCANS_PER_SLEEP*/ 15) {
+ group.checkActive(this, iters);
+ if (isInterrupted())
+ return;
+ }
+ else if (!lowered) {
+ lowered = true;
+ setPriority(scanPriority);
+ }
+ else {
+ yield();
+ }
+ }
+ }
+ } while (task == null);
+
+
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+
+ }
+
+ /* ------------ composite operations ------------------- */
+
+
+ /**
+ * Main runloop
+ **/
+
+ public void run() {
+ try{
+ while (!interrupted()) {
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ // inline FJTask.invoke
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scanWhileIdling();
+ }
+ // check for suspending
+ if (suspending) {
+ synchronized(this) {
+ // move all local tasks to group-wide entry queue
+ for (int i = 0; i < deq.length; ++i) {
+ synchronized(group) {
+ try {
+ FJTask task = (FJTask)deq[i].take();
+ if (task != null)
+ group.getEntryQueue().put(task);
+ } catch (InterruptedException ie) {
+ System.err.println("Suspend: when transferring task to entryQueue: "+ie);
+ }
+ }
+ }
+ }
+ }
+ }
+ finally {
+ group.setInactive(this);
+ }
+ }
+
+ /**
+ * Execute a task in this thread. Generally called when current task
+ * cannot otherwise continue.
+ **/
+
+
+ protected final void taskYield() {
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scan(null);
+ }
+
+
+ /**
+ * Process tasks until w is done.
+ * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
+ **/
+
+ protected final void taskJoin(final FJTask w) {
+
+ while (!w.isDone()) {
+
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ if (task == w) return; // fast exit if we just ran w
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+
+ /**
+ * A specialized expansion of
+ * <code> w.fork(); invoke(v); w.join(); </code>
+ **/
+
+
+ protected final void coInvoke(final FJTask w, final FJTask v) {
+
+ // inline push
+
+ int t = top;
+ if (t < (base & (deq.length-1)) + deq.length) {
+
+ deq[t & (deq.length-1)].put(w);
+ top = t + 1;
+
+ // inline invoke
+
+ if (!v.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ v.run();
+ v.setDone();
+ }
+
+ // inline taskJoin
+
+ while (!w.isDone()) {
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ if (task == w) return; // fast exit if we just ran w
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+
+ else // handle non-inlinable cases
+ slowCoInvoke(w, v);
+ }
+
+
+ /**
+ * Backup to handle noninlinable cases of coInvoke
+ **/
+
+ protected void slowCoInvoke(final FJTask w, final FJTask v) {
+ push(w); // let push deal with overflow
+ FJTask.invoke(v);
+ taskJoin(w);
+ }
+
+
+ /**
+ * Array-based version of coInvoke
+ **/
+
+ protected final void coInvoke(FJTask[] tasks) {
+ int nforks = tasks.length - 1;
+
+ // inline bulk push of all but one task
+
+ int t = top;
+
+ if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
+ for (int i = 0; i < nforks; ++i) {
+ deq[t++ & (deq.length-1)].put(tasks[i]);
+ top = t;
+ }
+
+ // inline invoke of one task
+ FJTask v = tasks[nforks];
+ if (!v.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ v.run();
+ v.setDone();
+ }
+
+ // inline taskJoins
+
+ for (int i = 0; i < nforks; ++i) {
+ FJTask w = tasks[i];
+ while (!w.isDone()) {
+
+ FJTask task = pop();
+ if (task != null) {
+ if (!task.isDone()) {
+ if (COLLECT_STATS) ++runs;
+ task.run();
+ task.setDone();
+ }
+ }
+ else
+ scan(w);
+ }
+ }
+ }
+
+ else // handle non-inlinable cases
+ slowCoInvoke(tasks);
+ }
+
+ /**
+ * Backup to handle atypical or noninlinable cases of coInvoke
+ **/
+
+ protected void slowCoInvoke(FJTask[] tasks) {
+ for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
+ for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
+ }
+
+}
+
View
714 src/main/java/com/twitter/actors/FJTaskRunnerGroup.java
@@ -0,0 +1,714 @@
+/*
+ File: FJTaskRunnerGroup.java
+
+ Originally written by Doug Lea and released into the public domain.
+ This may be used for any purposes whatsoever without acknowledgment.
+ Thanks for the assistance and support of Sun Microsystems Labs,
+ and everyone contributing, testing, and using this code.
+
+ History:
+ Date Who What
+ 7Jan1999 dl First public release
+ 12Jan1999 dl made getActiveCount public; misc minor cleanup.
+ 14Jan1999 dl Added executeTask
+ 20Jan1999 dl Allow use of priorities; reformat stats
+ 6Feb1999 dl Lazy thread starts
+ 27Apr1999 dl Renamed
+*/
+
+package com.twitter.actors;
+
+/**
+ * A stripped down analog of a ThreadGroup used for
+ * establishing and managing FJTaskRunner threads.
+ * ThreadRunnerGroups serve as the control boundary separating
+ * the general world of normal threads from the specialized world
+ * of FJTasks.
+ * <p>
+ * By intent, this class does not subclass java.lang.ThreadGroup, and
+ * does not support most methods found in ThreadGroups, since they
+ * would make no sense for FJTaskRunner threads. In fact, the class
+ * does not deal with ThreadGroups at all. If you want to restrict
+ * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
+ * it from within that ThreadGroup.
+ * <p>
+ * The main contextual parameter for a FJTaskRunnerGroup is
+ * the group size, established in the constructor.
+ * Groups must be of a fixed size.
+ * There is no way to dynamically increase or decrease the number
+ * of threads in an existing group.
+ * <p>
+ * In general, the group size should be equal to the number
+ * of CPUs on the system. (Unfortunately, there is no portable
+ * means of automatically detecting the number of CPUs on a JVM, so there is
+ * no good way to automate defaults.) In principle, when
+ * FJTasks are used for computation-intensive tasks, having only
+ * as many threads as CPUs should minimize bookkeeping overhead
+ * and contention, and so maximize throughput. However, because
+ * FJTaskRunners lie atop Java threads, and in turn operating system
+ * thread support and scheduling policies,
+ * it is very possible that using more threads
+ * than CPUs will improve overall throughput even though it adds
+ * to overhead. This will always be so if FJTasks are I/O bound.
+ * So it may pay to experiment a bit when tuning on particular platforms.
+ * You can also use <code>setRunPriorities</code> to either
+ * increase or decrease the priorities of active threads, which
+ * may interact with group size choice.
+ * <p>
+ * In any case, overestimating group sizes never
+ * seriously degrades performance (at least within reasonable bounds).
+ * You can also use a value
+ * less than the number of CPUs in order to reserve processing
+ * for unrelated threads.
+ * <p>
+ * There are two general styles for using a FJTaskRunnerGroup.
+ * You can create one group per entire program execution, for example
+ * as a static singleton, and use it for all parallel tasks:
+ * <pre>
+ * class Tasks {
+ * static FJTaskRunnerGroup group;
+ * public void initialize(int groupsize) {
+ * group = new FJTaskRunnerGroup(groupSize);
+ * }
+ * // ...
+ * }
+ * </pre>
+ * Alternatively, you can make new groups on the fly and use them only for
+ * particular task sets. This is more flexible,,
+ * and leads to more controllable and deterministic execution patterns,
+ * but it encounters greater overhead on startup. Also, to reclaim
+ * system resources, you should
+ * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
+ * using one-shot groups. Otherwise, because FJTaskRunners set
+ * <code>Thread.isDaemon</code>