Skip to content

Commit

Permalink
Merge pull request #63 from axel22/master
Browse files Browse the repository at this point in the history
Typos fixed, some additions to the pc-overview docs
  • Loading branch information
heathermiller committed Mar 27, 2012
2 parents 2e97cbc + 79c7c9e commit 31f558d
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 259 deletions.
9 changes: 5 additions & 4 deletions overviews/parallel-collections/architecture.md
Expand Up @@ -52,7 +52,8 @@ subsets of elements of the whole parallel collection. And similar to normal
In general, collections are partitioned using `Splitter`s into subsets of
roughly the same size. In cases where more arbitrarily-sized partions are
required, in particular on parallel sequences, a `PreciseSplitter` is used,
which inherits `Splitter`.
which inherits `Splitter` and additionally implements a precise split method,
`psplit`.

### Combiners

Expand Down Expand Up @@ -86,12 +87,12 @@ regular collections framework's corresponding traits, as shown below.

[<img src="{{ site.baseurl }}/resources/images/parallel-collections-hierarchy.png" width="550">]({{ site.baseurl }}/resources/images/parallel-collections-hierarchy.png)

<center><b>Hierarchy of Scala's Collections and Parallel Collections Libraries<b></center>

<center><b>Hierarchy of Scala's Collections and Parallel Collections Libraries</b></center>
<br/>

The goal is of course to integrate parallel collections as tightly as possible
with sequential collections, so as to allow for straightforward substitution
of sequntial and parallel collections.
of sequential and parallel collections.

In order to be able to have a reference to a collection which may be either
sequential or parallel (such that it's possible to "toggle" between a parallel
Expand Down
126 changes: 103 additions & 23 deletions overviews/parallel-collections/concrete-parallel-collections.md

Large diffs are not rendered by default.

60 changes: 32 additions & 28 deletions overviews/parallel-collections/configuration.md
Expand Up @@ -10,22 +10,25 @@ num: 7

## Task support

Parallel collections are modular in the way operations are scheduled. Each parallel
collection is parametrized with a task support object which is responsible for
scheduling and load-balancing tasks to processors.

The task support object internally keeps a reference to a thread pool implementation
and decides how and when tasks are split into smaller tasks.
To learn more about the internals of how exactly this is done, see the tech report \[[1][1]\].

There are currently a few task support implementations available for parallel collections.
The `ForkJoinTaskSupport` uses a fork-join pool internally and is used by default on JVM 1.6 or greater.
The less efficient `ThreadPoolTaskSupport` is a fallback for JVM 1.5 and JVMs that do not support
the fork join pools. The `ExecutionContextTaskSupport` uses the default execution context implementation
found in `scala.concurrent`, and it reuses the thread pool used in
`scala.concurrent` (this is either a fork join pool or a thread pool executor, depending on the JVM version).
The execution context task support is set to each parallel collection by default, so parallel collections
reuse the same fork-join pool as the future API.
Parallel collections are modular in the way operations are scheduled. Each
parallel collection is parametrized with a task support object which is
responsible for scheduling and load-balancing tasks to processors.

The task support object internally keeps a reference to a thread pool
implementation and decides how and when tasks are split into smaller tasks. To
learn more about the internals of how exactly this is done, see the tech
report \[[1][1]\].

There are currently a few task support implementations available for parallel
collections. The `ForkJoinTaskSupport` uses a fork-join pool internally and is
used by default on JVM 1.6 or greater. The less efficient
`ThreadPoolTaskSupport` is a fallback for JVM 1.5 and JVMs that do not support
the fork join pools. The `ExecutionContextTaskSupport` uses the default
execution context implementation found in `scala.concurrent`, and it reuses
the thread pool used in `scala.concurrent` (this is either a fork join pool or
a thread pool executor, depending on the JVM version). The execution context
task support is set to each parallel collection by default, so parallel
collections reuse the same fork-join pool as the future API.

Here is a way to change the task support of a parallel collection:

Expand All @@ -41,33 +44,34 @@ Here is a way to change the task support of a parallel collection:
scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

The above sets the parallel collection to use a fork-join pool with parallelism level 2.
To set the parallel collection to use a thread pool executor:
The above sets the parallel collection to use a fork-join pool with
parallelism level 2. To set the parallel collection to use a thread pool
executor:

scala> pc.tasksupport = new ThreadPoolTaskSupport()
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1d914a39

scala> pc map { _ + 1 }
res1: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)

When a parallel collection is serialized, the task support field is omitted from serialization.
When deserializing a parallel collection, the task support field is set to the default value - the execution context task support.
When a parallel collection is serialized, the task support field is omitted
from serialization. When deserializing a parallel collection, the task support
field is set to the default value-- the execution context task support.

To implement a custom task support, extend the `TaskSupport` trait and implement the following methods:
To implement a custom task support, extend the `TaskSupport` trait and
implement the following methods:

def execute[R, Tp](task: Task[R, Tp]): () => R

def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R

def parallelismLevel: Int

The `execute` method schedules a task asynchronously and returns a future to wait on the result of the computation.
The `executeAndWait` method does the same, but only returns when the task is completed.
The `parallelismLevel` simply returns the targeted number of cores that the task support uses to schedule tasks.




The `execute` method schedules a task asynchronously and returns a future to
wait on the result of the computation. The `executeAndWait` method does the
same, but only returns when the task is completed. The `parallelismLevel`
simply returns the targeted number of cores that the task support uses to
schedule tasks.


## References
Expand Down
18 changes: 9 additions & 9 deletions overviews/parallel-collections/conversions.md
Expand Up @@ -11,23 +11,23 @@ num: 3
## Converting between sequential and parallel collections

Every sequential collection can be converted to its parallel variant
using the `par` method. Certain sequential collections have their
direct parallel counterparts. For these collections the conversion is
efficient - it occurs in constant time, since both the sequential and
using the `par` method. Certain sequential collections have a
direct parallel counterpart. For these collections the conversion is
efficient-- it occurs in constant time, since both the sequential and
the parallel collection have the same data-structural representation
(one exception are mutable hash maps and hash sets which are slightly
(one exception is mutable hash maps and hash sets which are slightly
more expensive to convert the first time `par` is called, but
subsequent invocations of `par` take constant time). It should be
noted that for mutable collections, changes in the sequential collection are
visible in its parallel counterpart if they share the underlying data-structure.

| sequential | parallel |
| Sequential | Parallel |
| ------------- | -------------- |
| **mutable** | |
| `Array` | `ParArray` |
| `HashMap` | `ParHashMap` |
| `HashSet` | `ParHashSet` |
| `Ctrie` | `ParCtrie` |
| `TrieMap` | `ParTrieMap` |
| **immutable** | |
| `Vector` | `ParVector` |
| `Range` | `ParRange` |
Expand All @@ -43,9 +43,9 @@ vector.

Every parallel collection can be converted to its sequential variant
using the `seq` method. Converting a parallel collection to a
sequential collection is always efficient - it takes constant
sequential collection is always efficient-- it takes constant
time. Calling `seq` on a mutable parallel collection yields a
sequential collection which is backed by the same store - updates to
sequential collection which is backed by the same store-- updates to
one collection will be visible in the other one.


Expand All @@ -61,7 +61,7 @@ into a `ParX` collection.

Here is a summary of all conversion methods:

| method | return type |
| Method | Return Type |
| -------------- | -------------- |
| `toArray` | `Array` |
| `toList` | `List` |
Expand Down
56 changes: 48 additions & 8 deletions overviews/parallel-collections/ctries.md
Expand Up @@ -18,17 +18,24 @@ parallel counterparts. The only difference between the two is that the
former traverses its elements sequentially, whereas the latter does so in
parallel.

This is a nice property that allows you to write some algorithms more easily.
This is a nice property that allows you to write some algorithms more
easily. Typically, these are algorithms that process a dataset of elements
iteratively, in which different elements need a different number of
iterations to be processed.

The following example computes the square roots of a set of numbers. Each iteration
iteratively updates the square root value. Numbers whose square roots converged
are removed from the map.

case class Entry(num: Double) {
var sqrt = num
}

val length = 50000

// prepare the list
val entries = (1 until length) map { num => Entry(num.toDouble) }
val results = ParConcurrentTrieMap()
val results = ParTrieMap()
for (e <- entries) results += ((e.num, e))

// compute square roots
Expand All @@ -41,8 +48,25 @@ are removed from the map.
}
}

Note that in the above Babylonian method square root computation
(\[[3][3]\]) some numbers may converge much faster than the others. For
this reason, we want to remove them from `results` so that only those
elements that need to be worked on are traversed.

Another example is the breadth-first search algorithm, which iteratively expands the nodefront
until either it finds some path to the target or there are no more nodes to expand.
until either it finds some path to the target or there are no more
nodes to expand. We define a node on a 2D map as a tuple of
`Int`s. We define the `map` as a 2D array of booleans which denote is
the respective slot occupied or not. We then declare 2 concurrent trie
maps-- `open` which contains all the nodes which have to be
expanded (the nodefront), and `closed` which contains all the nodes which have already
been expanded. We want to start the search from the corners of the map and
find a path to the center of the map-- we initialize the `open` map
with appropriate nodes. Then we iteratively expand all the nodes in
the `open` map in parallel until there are no more nodes to expand.
Each time a node is expanded, it is removed from the `open` map and
placed in the `closed` map.
Once done, we output the path from the target to the initial node.

val length = 1000

Expand All @@ -63,8 +87,8 @@ until either it finds some path to the target or there are no more nodes to expa

// open list - the nodefront
// closed list - nodes already processed
val open = ParConcurrentTrieMap[Node, Parent]()
val closed = ParConcurrentTrieMap[Node, Parent]()
val open = ParTrieMap[Node, Parent]()
val closed = ParTrieMap[Node, Parent]()

// add a couple of starting positions
open((0, 0)) = null
Expand Down Expand Up @@ -97,13 +121,22 @@ until either it finds some path to the target or there are no more nodes to expa
}
println()

The concurrent trie data structure supports a linearizable, lock-free, constant
time, lazily evaluated snapshot operation. The snapshot operation merely creates

The concurrent tries also support a linearizable, lock-free, constant
time `snapshot` operation. This operation creates a new concurrent
trie with all the elements at a specific point in time, thus in effect
capturing the state of the trie at a specific point.
The `snapshot` operation merely creates
a new root for the concurrent trie. Subsequent updates lazily rebuild the part of
the concurrent trie relevant to the update and leave the rest of the concurrent trie
intact. First of all, this means that the snapshot operation by itself is not expensive
since it does not copy the elements. Second, since the copy-on-write optimization copies
only parts of the concurrent trie, subsequent modifications scale horizontally.
The `readOnlySnapshot` method is slightly more efficient than the
`snapshot` method, but returns a read-only map which cannot be
modified. Concurrent tries also support a linearizable, constant-time
`clear` operation based on the snapshot mechanism.
To learn more about how concurrent tries and snapshots work, see \[[1][1]\] and \[[2][2]\].

The iterators for concurrent tries are based on snapshots. Before the iterator
object gets created, a snapshot of the concurrent trie is taken, so the iterator
Expand All @@ -119,7 +152,14 @@ of the `size` method to amortized logarithmic time. In effect, this means that a
the size only for those branches of the trie which have been modified since the last `size` call.
Additionally, size computation for parallel concurrent tries is performed in parallel.

The concurrent tries also support a linearizable, lock-free, constant time `clear` operation.


## References

1. [Cache-Aware Lock-Free Concurrent Hash Tries][1]
2. [Concurrent Tries with Efficient Non-Blocking Snapshots][2]
3. [Methods of computing square roots][3]

[1]: http://infoscience.epfl.ch/record/166908/files/ctries-techreport.pdf "Ctries-techreport"
[2]: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf "Ctries-snapshot"
[3]: http://en.wikipedia.org/wiki/Methods_of_computing_square_roots#Babylonian_method "babylonian-method"

0 comments on commit 31f558d

Please sign in to comment.