Permalink
Browse files

Initial version of Joinads pre-processor (for the 'docase' syntax) an…

…d samples (parsers, parallel programming, maybe monad, resumptions).
  • Loading branch information...
0 parents commit 7f66b81b54aff80c2e460902a6b055bec52bb921 unknown committed Jul 16, 2011
@@ -0,0 +1,3 @@
+*.hi
+*.o
+*.exe
@@ -0,0 +1,335 @@
+{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
+ ExistentialQuantification
+ #-}
+{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fwarn-unused-imports #-}
+
+-- | This module provides a monad @Par@, for speeding up pure
+-- computations using parallel processors. It cannot be used for
+-- speeding up computations that use IO (for that, see
+-- @Control.Concurrent@). The result of a given @Par@ computation is
+-- always the same - ie. it is deterministic, but the computation may
+-- be performed more quickly if there are processors available to
+-- share the work.
+--
+-- For example, the following program fragment computes the values of
+-- @(f x)@ and @(g x)@ in parallel, and returns a pair of their results:
+--
+-- > runPar $ do
+-- > fx <- pval (f x) -- start evaluating (f x)
+-- > gx <- pval (g x) -- start evaluating (g x)
+-- > a <- get fx -- wait for fx
+-- > b <- get gx -- wait for gx
+-- > return (a,b) -- return results
+--
+-- @Par@ can be used for specifying pure parallel computations in
+-- which the order of the computation is not known beforehand.
+-- The programmer specifies how information flows from one
+-- part of the computation to another, but not the order in which
+-- computations will be evaluated at runtime. Information flow is
+-- described using "variables" called @IVar@s, which support 'put' and
+-- 'get' operations. For example, suppose you have a problem that
+-- can be expressed as a network with four nodes, where @b@ and @c@
+-- require the value of @a@, and @d@ requires the value of @b@ and @c@:
+--
+-- > a
+-- > / \
+-- > b c
+-- > \ /
+-- > d
+--
+-- Then you could express this in the @Par@ monad like this:
+--
+-- > runPar $ do
+-- > [a,b,c,d] <- sequence [new,new,new,new]
+-- > fork $ do x <- get a; put b (x+1)
+-- > fork $ do x <- get a; put c (x+2)
+-- > fork $ do x <- get b; y <- get c; put d (x+y)
+-- > fork $ do put a (3 :: Int)
+-- > get d
+--
+-- The result of the above computation is always 9. The 'get' operation
+-- waits until its input is available; multiple 'put's to the same
+-- @IVar@ are not allowed, and result in a runtime error. Values
+-- stored in @IVar@s are usually fully evaluated (although there are
+-- ways provided to pass lazy values if necessary).
+--
+-- In the above example, @b@ and @c@ will be evaluated in parallel.
+-- In practice the work involved at each node is too small here to see
+-- the benefits of parallelism though: typically each node should
+-- involve much more work. The granularity is completely under your
+-- control - too small and the overhead of the @Par@ monad will
+-- outweigh any parallelism benefits, whereas if the nodes are too
+-- large then there might not be enough parallelism to use all the
+-- available processors.
+--
+-- Unlike @Control.Parallel@, in @Control.Monad.Par@ parallelism is
+-- not combined with laziness, so sharing and granulairty are
+-- completely under the control of the programmer. New units of
+-- parallel work are only created by @fork@, @par@, and a few other
+-- combinators.
+--
+-- The implementation is based on a work-stealing scheduler that
+-- divides the work as evenly as possible between the available
+-- processors at runtime.
+--
+
+module Control.Monad.Par (
+ -- * The @Par@ monad
+ Par,
+ runPar,
+ runPar_,
+ fork,
+ block, both, forkWith,
+
+ -- * Communication: @IVar@s
+ IVar,
+ new, newFull, newFull_,
+ newBlocking,
+ get,
+ put, put_,
+
+ -- * Operations
+ pval,
+ spawn, spawn_,
+ parMap, parMapM,
+ parMapReduceRangeThresh, parMapReduceRange,
+ InclusiveRange(..),
+ parFor,
+
+ -- * Cancellation
+ CancelToken, newCancelToken,
+ setCancelToken, getCancelToken, cancel,
+ withCancel, wrapCancel
+ ) where
+
+import Control.Monad.Par.Internal
+import Control.DeepSeq
+import Data.Traversable
+import Control.Monad as M hiding (mapM, sequence, join)
+import Prelude hiding (mapM, sequence, head,tail)
+
+import GHC.Conc (numCapabilities)
+
+-- -----------------------------------------------------------------------------
+
+withCancel :: Par a -> Par a
+withCancel m = do
+ r <- new
+ tok <- newCancelToken
+ forkWith tok (m >>= put_ r)
+ fr <- get r
+ cancel tok
+ return fr
+
+block :: Par a
+block = Par $ \tok _ -> Trace tok Done
+
+-- | forks a computation to happen in parallel. The forked
+-- computation may exchange values with other computations using
+-- @IVar@s.
+fork :: Par () -> Par ()
+fork p = Par $ \tok c -> Trace tok $ Fork (runCont p tok (\_ _ -> Trace tok Done)) (c tok ())
+
+-- | forks a computation to happen in parallel. The forked
+-- computation may exchange values with other computations using
+-- @IVar@s. The forked computation will carry the specified cancellation
+-- token (which can be used to cancel the computation from other running
+-- processes)
+forkWith :: CancelToken -> Par () -> Par ()
+forkWith token p = Par $ \origTok c ->
+ let forkTok = Just token in
+ Trace origTok
+ (Fork (runCont p forkTok (\_ _ -> Trace forkTok Done))
+ (c origTok ()))
+
+-- | starts both computations in parallel and runs the continuation
+-- two times when they each finish
+-- > both a b >> c == both (a >> c) (b >> c)
+both :: Par a -> Par a -> Par a
+both a b = Par $ \tok c -> Trace tok (Fork (runCont a tok c) (runCont b tok c))
+
+-- -----------------------------------------------------------------------------
+-- Derived functions
+
+-- | Like 'spawn', but the result is only head-strict, not fully-strict.
+spawn_ :: Par a -> Par (IVar a)
+spawn_ p = do
+ r <- new
+ fork (p >>= put_ r)
+ return r
+
+-- | Like 'fork', but returns a @IVar@ that can be used to query the
+-- result of the forked computataion.
+--
+-- > spawn p = do
+-- > r <- new
+-- > fork (p >>= put r)
+-- > return r
+--
+spawn :: NFData a => Par a -> Par (IVar a)
+spawn p = do
+ r <- new
+ fork (p >>= put r)
+ return r
+
+-- | equivalent to @spawn . return@
+pval :: NFData a => a -> Par (IVar a)
+pval a = spawn (return a)
+
+-- -----------------------------------------------------------------------------
+-- Parallel maps over Traversable data structures
+
+-- | Applies the given function to each element of a data structure
+-- in parallel (fully evaluating the results), and returns a new data
+-- structure containing the results.
+--
+-- > parMap f xs = mapM (pval . f) xs >>= mapM get
+--
+-- @parMap@ is commonly used for lists, where it has this specialised type:
+--
+-- > parMap :: NFData b => (a -> b) -> [a] -> Par [b]
+--
+parMap :: (Traversable t, NFData b) => (a -> b) -> t a -> Par (t b)
+parMap f xs = mapM (pval . f) xs >>= mapM get
+
+-- | Like 'parMap', but the function is a @Par@ monad operation.
+--
+-- > parMapM f xs = mapM (spawn . f) xs >>= mapM get
+--
+parMapM :: (Traversable t, NFData b) => (a -> Par b) -> t a -> Par (t b)
+parMapM f xs = mapM (spawn . f) xs >>= mapM get
+
+{-# SPECIALISE parMap :: (NFData b) => (a -> b) -> [a] -> Par [b] #-}
+{-# SPECIALISE parMapM :: (NFData b) => (a -> Par b) -> [a] -> Par [b] #-}
+
+
+-- TODO: Perhaps should introduce a class for the "splittable range" concept.
+data InclusiveRange = InclusiveRange Int Int
+
+-- | Computes a binary map\/reduce over a finite range. The range is
+-- recursively split into two, the result for each half is computed in
+-- parallel, and then the two results are combined. When the range
+-- reaches the threshold size, the remaining elements of the range are
+-- computed sequentially.
+--
+-- For example, the following is a parallel implementation of
+--
+-- > foldl (+) 0 (map (^2) [1..10^6])
+--
+-- > parMapReduceRangeThresh 100 (InclusiveRange 1 (10^6))
+-- > (\x -> return (x^2))
+-- > (\x y -> return (x+y))
+-- > 0
+--
+parMapReduceRangeThresh
+ :: NFData a
+ => Int -- ^ threshold
+ -> InclusiveRange -- ^ range over which to calculate
+ -> (Int -> Par a) -- ^ compute one result
+ -> (a -> a -> Par a) -- ^ combine two results (associative)
+ -> a -- ^ initial result
+ -> Par a
+
+parMapReduceRangeThresh threshold (InclusiveRange min max) fn binop init
+ = loop min max
+ where
+ loop min max
+ | max - min <= threshold =
+ let mapred a b = do x <- fn b;
+ result <- a `binop` x
+ return result
+ in foldM mapred init [min..max]
+
+ | otherwise = do
+ let mid = min + ((max - min) `quot` 2)
+ rght <- spawn $ loop (mid+1) max
+ l <- loop min mid
+ r <- get rght
+ l `binop` r
+
+-- How many tasks per process should we aim for. Higher numbers
+-- improve load balance but put more pressure on the scheduler.
+auto_partition_factor :: Int
+auto_partition_factor = 4
+
+-- | \"Auto-partitioning\" version of 'parMapReduceRangeThresh' that chooses the threshold based on
+-- the size of the range and the number of processors..
+parMapReduceRange :: NFData a => InclusiveRange -> (Int -> Par a) -> (a -> a -> Par a) -> a -> Par a
+parMapReduceRange (InclusiveRange start end) fn binop init =
+ loop (length segs) segs
+ where
+ segs = splitInclusiveRange (auto_partition_factor * numCapabilities) (start,end)
+ loop 1 [(st,en)] =
+ let mapred a b = do x <- fn b;
+ result <- a `binop` x
+ return result
+ in foldM mapred init [st..en]
+ loop n segs =
+ let half = n `quot` 2
+ (left,right) = splitAt half segs in
+ do l <- spawn$ loop half left
+ r <- loop (n-half) right
+ l' <- get l
+ l' `binop` r
+
+
+-- TODO: A version that works for any splittable input domain. In this case
+-- the "threshold" is a predicate on inputs.
+-- parMapReduceRangeGeneric :: (inp -> Bool) -> (inp -> Maybe (inp,inp)) -> inp ->
+
+
+-- Experimental:
+
+-- | Parallel for-loop over an inclusive range. Semantically equivalent
+-- to
+--
+-- > parFor (InclusiveRange n m) f = forM_ [n..m] f
+--
+-- except that the implementation will split the work into an
+-- unspecified number of subtasks in an attempt to gain parallelism.
+-- The exact number of subtasks is chosen at runtime, and is probably
+-- a small multiple of the available number of processors.
+--
+-- Strictly speaking the semantics of 'parFor' depends on the
+-- number of processors, and its behaviour is therefore not
+-- deterministic. However, a good rule of thumb is to not have any
+-- interdependencies between the elements; if this rule is followed
+-- then @parFor@ has deterministic semantics. One easy way to follow
+-- this rule is to only use 'put' or 'put_' in @f@, never 'get'.
+
+parFor :: InclusiveRange -> (Int -> Par ()) -> Par ()
+parFor (InclusiveRange start end) body =
+ do
+ let run (x,y) = for_ x (y+1) body
+ range_segments = splitInclusiveRange (4*numCapabilities) (start,end)
+
+ vars <- M.forM range_segments (\ pr -> spawn_ (run pr))
+ M.mapM_ get vars
+ return ()
+
+splitInclusiveRange :: Int -> (Int, Int) -> [(Int, Int)]
+splitInclusiveRange pieces (start,end) =
+ map largepiece [0..remain-1] ++
+ map smallpiece [remain..pieces-1]
+ where
+ len = end - start + 1 -- inclusive [start,end]
+ (portion, remain) = len `quotRem` pieces
+ largepiece i =
+ let offset = start + (i * (portion + 1))
+ in (offset, offset + portion)
+ smallpiece i =
+ let offset = start + (i * portion) + remain
+ in (offset, offset + portion - 1)
+
+-- My own forM for numeric ranges (not requiring deforestation optimizations).
+-- Inclusive start, exclusive end.
+{-# INLINE for_ #-}
+for_ :: Monad m => Int -> Int -> (Int -> m ()) -> m ()
+for_ start end _fn | start > end = error "for_: start is greater than end"
+for_ start end fn = loop start
+ where
+ loop !i | i == end = return ()
+ | otherwise = do fn i; loop (i+1)
+
+
+
Oops, something went wrong.

0 comments on commit 7f66b81

Please sign in to comment.