Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of github.com:simonmar/monad-par

  • Loading branch information...
commit 5ae89a6e83ac9f6efb14064ee50dec95dfb4c024 2 parents 359f91b + 24aba0e
@simonmar authored
View
90 Control/Monad/Par/Logging.hs
@@ -0,0 +1,90 @@
+{-# LANGUAGE NamedFieldPuns #-}
+-- A simple interface for logging start/end times and which processor a task runs on.
+
+module Control.Monad.Par.Logging
+ ( LogEntry(..), TaskSeries(..),
+ allTaskSeries,
+ unsafeNewTaskSeries,
+ timePure,
+ nameFromValue,
+ grabAllLogs, printAllLogs
+ )
+where
+
+import Control.Monad
+import Control.DeepSeq
+import Control.Exception
+import Data.IORef
+import Data.Word
+import Data.List
+import Data.Function
+import GHC.Conc
+import System.CPUTime.Rdtsc
+import System.IO.Unsafe
+import System.Mem.StableName
+
+-- import Text.Printf
+
+data LogEntry = LE { start :: {-# UNPACK #-} !Word64
+ , end :: {-# UNPACK #-} !Word64
+ , proc :: {-# UNPACK #-} !ThreadId
+ }
+ deriving Show
+type Log = [LogEntry]
+
+
+-- The String identifies the task series
+data TaskSeries = TS String (IORef Log)
+
+-- Global variable that accumulates all task series.
+allTaskSeries :: IORef [TaskSeries]
+allTaskSeries = unsafePerformIO$ newIORef []
+
+{-# NOINLINE unsafeNewTaskSeries #-}
+unsafeNewTaskSeries :: String -> TaskSeries
+unsafeNewTaskSeries name = unsafePerformIO$
+ do log <- newIORef []
+ let ts = TS name log
+ atomicModifyIORef_ allTaskSeries (ts:)
+ return ts
+
+
+{-# NOINLINE timePure #-}
+-- Time a pure computation, fully evaluate its result.
+timePure :: NFData a => TaskSeries -> a -> a
+timePure (TS _ log) thnk = unsafePerformIO$
+ do proc <- myThreadId
+ start <- rdtsc
+ evaluate (rnf thnk)
+ end <- rdtsc
+ atomicModifyIORef_ log (LE start end proc :)
+ return thnk
+
+{-# NOINLINE nameFromValue #-}
+nameFromValue :: a -> String
+nameFromValue val = unsafePerformIO$
+ do stbl <- makeStableName val
+ return ("Obj_" ++ show (hashStableName stbl))
+
+atomicModifyIORef_ ref fn = atomicModifyIORef ref (\x -> (fn x, ()))
+
+-- Read and reset ALL logs.
+grabAllLogs :: IO [(String, Log)]
+grabAllLogs =
+ do series <- readIORef allTaskSeries
+ -- This is piecewise atomic. We can't get a true snapshot but we
+ -- can grab it as fast as we can:
+ forM series $ \ (TS name log) -> do
+ -- Atomic slice off whats there:
+ ls <- atomicModifyIORef log (\x -> ([],x))
+ return (name,ls)
+
+
+printAllLogs :: IO ()
+printAllLogs =
+ do grab <- grabAllLogs
+ forM_ (sortBy (compare `on` fst) grab) $ \ (name, entries) -> do
+ putStrLn ""
+ forM_ (sortBy (compare `on` start) entries) $ \ LE{start,end,proc} -> do
+-- printf "%s %s %d %d\n" name start end (show proc)
+ putStrLn$ name ++" "++ show proc ++" "++ show start ++" "++ show end
View
34 Control/Monad/Par/Stream.hs
@@ -7,9 +7,11 @@
-- (In the future may want to look into the stream interface used by
-- the stream fusion framework.)
+#define DEBUGSTREAMS
+
module Control.Monad.Par.Stream
(
- streamMap, streamScan
+ streamMap, streamScan, streamFold
, countupWin, generate
, runParList, toListSpin
, measureRate, measureRateList
@@ -40,9 +42,11 @@ import System.CPUTime.Rdtsc
import GHC.Conc as Conc
import System.IO
import GHC.IO (unsafePerformIO, unsafeDupablePerformIO, unsafeInterleaveIO)
-import Debug.Trace
+import Debug.Trace
+import Control.Monad.Par.Logging
+
debugflag = True
--------------------------------------------------------------------------------
@@ -108,13 +112,19 @@ streamMap fn instrm =
-- | Applies a stateful kernel to the stream. Output stream elements match input one-to-one.
-streamScan :: (NFData b, NFData c) =>
+-- streamScan :: (NFData b, NFData c) =>
+streamScan :: (NFData a, NFData b, NFData c) => -- <- TEMP, don't need NFData a in general.
(a -> b -> (a,c)) -> a -> Stream b -> Par (Stream c)
streamScan fn initstate instrm =
do outstrm <- new
fork$ loop initstate instrm outstrm
return outstrm
where
+#ifdef DEBUGSTREAMS
+ -- Create a task log for each unique input stream fed to this function:
+ tasklog = unsafeNewTaskSeries (nameFromValue instrm)
+#endif
+
loop state instrm outstrm =
do
ilst <- get instrm
@@ -122,7 +132,12 @@ streamScan fn initstate instrm =
Null -> put outstrm Null -- End of stream.
Cons h t ->
do newtl <- new
- let (newstate, outp) = fn state h
+ let (newstate, outp) =
+#ifdef DEBUGSTREAMS
+ timePure tasklog$ fn state h
+#else
+ fn state h
+#endif
put outstrm (Cons outp newtl)
loop newstate t newtl
@@ -140,12 +155,21 @@ streamScan fn initstate instrm =
-- streamScan . concat rewrites to streamKernel perhaps...
+
+-- | Reduce a stream to a single value. This function will not return
+-- until it reaches the end-of-stream.
+streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
+streamFold fn acc instrm =
+ do ilst <- get instrm
+ case ilst of
+ Null -> return acc
+ Cons h t -> streamFold fn (fn acc h) t
+
-- | Generate a stream of the given length by applying the function to each index (starting at zero).
--
-- WARNING, this source calls yield, letting other par computations
-- run, but there is no backpressure. Thus if the source runs at a
-- higher rate than its consumer, buffered stream elements accumulate.
-
generate :: NFData a => Int -> (Int -> a) -> Par (Stream a)
-- NOTE: I don't currently know of a good way to do backpressure
-- directly in this system... but here are some other options:
View
2  examples/Makefile
@@ -25,7 +25,7 @@ runtests:
./run_tests.sh $(ALLEXES)
clean:
- rm -f $(ALLEXES) *.o *.hi
+ rm -f $(ALLEXES) *.o *.hi
(cd sumeuler; $(MAKE) clean)
(cd matmult; $(MAKE) clean)
(cd minimax; $(MAKE) clean)
View
2  examples/common.mk
@@ -7,4 +7,4 @@ ifeq (,$(GHC))
endif
ALLPARSRC= ../Control/Monad/Par.hs ../Control/Monad/Par/AList.hs ../Control/Monad/Par/OpenList.hs \
- ../Control/Monad/Par/IList.hs ../Control/Monad/Par/Stream.hs
+ ../Control/Monad/Par/IList.hs ../Control/Monad/Par/Stream.hs ../Control/Monad/Par/Logging.hs
View
65 examples/stream/disjoint_working_sets_pipeline.hs
@@ -36,7 +36,9 @@ import System.CPUTime
import System.CPUTime.Rdtsc
import GHC.Conc as Conc
import GHC.IO (unsafePerformIO, unsafeDupablePerformIO, unsafeInterleaveIO)
+
import Debug.Trace
+import Control.Monad.Par.Logging
import qualified Data.Vector.Unboxed as UV
import Data.Vector.Unboxed hiding ((++))
@@ -69,28 +71,29 @@ monadpar_version (_,numfilters, bufsize, statecoef, numwins) = do
putStrLn$ "Running monad-par version."
let statesize = bufsize * statecoef
- results <- evaluate $ runParAsync$ do
+ results <- evaluate $ runPar$ do
strm1 :: Stream (UV.Vector Double) <- S.generate numwins (\n -> UV.replicate bufsize 0)
-- Make a pipeline of numfilters stages:
let initstate = UV.generate statesize fromIntegral
pipe_end <- C.foldM (\s _ -> streamScan statefulKern initstate s) strm1 [1..numfilters]
sums <- streamMap UV.sum pipe_end
-
+#if 0
return sums
--- summed <- streamMap UV.sum strm_last
--- return summed
-- This is tricky, but two different consumers shouldn't prevent
-- garbage collection.
ls <- toListSpin results
-
-- Just (Cons h _) <- pollIVar results
putStrLn$ "Sum of first window: " ++ show (P.head ls)
forkIO$ measureRateList ls
putStrLn$ "Final sum = "++ show (P.sum ls)
+#else
--- browseStream results
+ streamFold (+) 0 sums
+
+ putStrLn$ "Final sum = "++ show results
+#endif
--------------------------------------------------------------------------------
@@ -198,6 +201,9 @@ main = do
"sparks" -> sparks_version arg_tup
_ -> error$ "unknown version: "++version
+ putStrLn$ "Finally, dumping all logs:"
+ printAllLogs
+
-- It is not necessary to evaluate every element in the case of an unboxed vector.
@@ -221,3 +227,50 @@ print_ msg = trace msg $ return ()
-- }
+{-
+
+Here's what cachegrind says (on 4 core nehalem):
+
+ $ valgrind --tool=cachegrind ./stream/disjoint_working_sets_pipeline monad 4 768 10 1000 +RTS -N4
+ .....
+ [measureRate] current rate: 58 Total elems&time 916 181,988,055,721
+ [measureRate] Hit end of stream after 1000 elements.
+ Final sum = 1.560518243231086e22
+ ==21202==
+ ==21202== I refs: 7,111,462,273
+ ==21202== I1 misses: 374,190
+ ==21202== L2i misses: 298,364
+ ==21202== I1 miss rate: 0.00%
+ ==21202== L2i miss rate: 0.00%
+ ==21202==
+ ==21202== D refs: 3,882,935,974 (3,542,949,529 rd + 339,986,445 wr)
+ ==21202== D1 misses: 14,606,684 ( 9,824,455 rd + 4,782,229 wr)
+ ==21202== L2d misses: 6,774,479 ( 2,088,565 rd + 4,685,914 wr)
+ ==21202== D1 miss rate: 0.3% ( 0.2% + 1.4% )
+ ==21202== L2d miss rate: 0.1% ( 0.0% + 1.3% )
+ ==21202==
+ ==21202== L2 refs: 14,980,874 ( 10,198,645 rd + 4,782,229 wr)
+ ==21202== L2 misses: 7,072,843 ( 2,386,929 rd + 4,685,914 wr)
+ ==21202== L2 miss rate: 0.0% ( 0.0% + 1.3% )
+
+
+Sparks version:
+ Final Sum = 1.560518243231086e22
+ ==21226==
+ ==21226== I refs: 5,898,314,238
+ ==21226== I1 misses: 291,271
+ ==21226== L2i misses: 246,518
+ ==21226== I1 miss rate: 0.00%
+ ==21226== L2i miss rate: 0.00%
+ ==21226==
+ ==21226== D refs: 3,264,359,909 (3,206,394,437 rd + 57,965,472 wr)
+ ==21226== D1 misses: 16,003,068 ( 10,905,138 rd + 5,097,930 wr)
+ ==21226== L2d misses: 9,177,043 ( 4,207,106 rd + 4,969,937 wr)
+ ==21226== D1 miss rate: 0.4% ( 0.3% + 8.7% )
+ ==21226== L2d miss rate: 0.2% ( 0.1% + 8.5% )
+ ==21226==
+ ==21226== L2 refs: 16,294,339 ( 11,196,409 rd + 5,097,930 wr)
+ ==21226== L2 misses: 9,423,561 ( 4,453,624 rd + 4,969,937 wr)
+ ==21226== L2 miss rate: 0.1% ( 0.0% + 8.5% )
+
+ -}
Please sign in to comment.
Something went wrong with that request. Please try again.