Skip to content

Commit

Permalink
Add process isolation to benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Trinkle committed Apr 16, 2016
1 parent 3a7fb30 commit 8397284
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 46 deletions.
209 changes: 209 additions & 0 deletions bench-cbits/checkCapability.c
@@ -0,0 +1,209 @@
#include <Rts.h>

// The InCall structure represents either a single in-call from C to
// Haskell, or a worker thread.
typedef struct InCall_ {
StgTSO * tso; // the bound TSO (or NULL for a worker)

StgTSO * suspended_tso; // the TSO is stashed here when we
// make a foreign call (NULL otherwise);

Capability *suspended_cap; // The capability that the
// suspended_tso is on, because
// we can't read this from the TSO
// without owning a Capability in the
// first place.

SchedulerStatus rstat; // return status
StgClosure ** ret; // return value

struct Task_ *task;

// When a Haskell thread makes a foreign call that re-enters
// Haskell, we end up with another Task associated with the
// current thread. We have to remember the whole stack of InCalls
// associated with the current Task so that we can correctly
// save & restore the InCall on entry to and exit from Haskell.
struct InCall_ *prev_stack;

// Links InCalls onto suspended_ccalls, spare_incalls
struct InCall_ *prev;
struct InCall_ *next;
} InCall;

typedef struct Task_ {
#if defined(THREADED_RTS)
OSThreadId id; // The OS Thread ID of this task

Condition cond; // used for sleeping & waking up this task
Mutex lock; // lock for the condition variable

// this flag tells the task whether it should wait on task->cond
// or just continue immediately. It's a workaround for the fact
// that signalling a condition variable doesn't do anything if the
// thread is already running, but we want it to be sticky.
rtsBool wakeup;
#endif

// This points to the Capability that the Task "belongs" to. If
// the Task owns a Capability, then task->cap points to it. If
// the task does not own a Capability, then either (a) if the task
// is a worker, then task->cap points to the Capability it belongs
// to, or (b) it is returning from a foreign call, then task->cap
// points to the Capability with the returning_worker queue that this
// this Task is on.
//
// When a task goes to sleep, it may be migrated to a different
// Capability. Hence, we always check task->cap on wakeup. To
// syncrhonise between the migrater and the migratee, task->lock
// must be held when modifying task->cap.
struct Capability_ *cap;

// The current top-of-stack InCall
struct InCall_ *incall;

nat n_spare_incalls;
struct InCall_ *spare_incalls;

rtsBool worker; // == rtsTrue if this is a worker Task
rtsBool stopped; // this task has stopped or exited Haskell

// So that we can detect when a finalizer illegally calls back into Haskell
rtsBool running_finalizers;

// Links tasks on the returning_tasks queue of a Capability, and
// on spare_workers.
struct Task_ *next;

// Links tasks on the all_tasks list; need ACQUIRE_LOCK(&all_tasks_mutex)
struct Task_ *all_next;
struct Task_ *all_prev;

} Task;

struct Capability_ {
// State required by the STG virtual machine when running Haskell
// code. During STG execution, the BaseReg register always points
// to the StgRegTable of the current Capability (&cap->r).
StgFunTable f;
StgRegTable r;

nat no; // capability number.

// The Task currently holding this Capability. This task has
// exclusive access to the contents of this Capability (apart from
// returning_tasks_hd/returning_tasks_tl).
// Locks required: cap->lock.
Task *running_task;

// true if this Capability is running Haskell code, used for
// catching unsafe call-ins.
rtsBool in_haskell;

// Has there been any activity on this Capability since the last GC?
nat idle;

rtsBool disabled;

// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
// also lock-free.
StgTSO *run_queue_hd;
StgTSO *run_queue_tl;

// Tasks currently making safe foreign calls. Doubly-linked.
// When returning, a task first acquires the Capability before
// removing itself from this list, so that the GC can find all
// the suspended TSOs easily. Hence, when migrating a Task from
// the returning_tasks list, we must also migrate its entry from
// this list.
InCall *suspended_ccalls;

// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
// keep track of which closures this CPU has been mutating, so we
// can traverse them using the right thread during GC and avoid
// unnecessarily moving the data from one cache to another.
bdescr **mut_lists;
bdescr **saved_mut_lists; // tmp use during GC

// block for allocating pinned objects into
bdescr *pinned_object_block;
// full pinned object blocks allocated since the last GC
bdescr *pinned_object_blocks;

// per-capability weak pointer list associated with nursery (older
// lists stored in generation object)
StgWeak *weak_ptr_list_hd;
StgWeak *weak_ptr_list_tl;

// Context switch flag. When non-zero, this means: stop running
// Haskell code, and switch threads.
int context_switch;

// Interrupt flag. Like the context_switch flag, this also
// indicates that we should stop running Haskell code, but we do
// *not* switch threads. This is used to stop a Capability in
// order to do GC, for example.
//
// The interrupt flag is always reset before we start running
// Haskell code, unlike the context_switch flag which is only
// reset after we have executed the context switch.
int interrupt;

// Total words allocated by this cap since rts start
// See [Note allocation accounting] in Storage.c
W_ total_allocated;

#if defined(THREADED_RTS)
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;
nat n_spare_workers; // count of above

// This lock protects:
// running_task
// returning_tasks_{hd,tl}
// wakeup_queue
// inbox
Mutex lock;

// Tasks waiting to return from a foreign call, or waiting to make
// a new call-in using this Capability (NULL if empty).
// NB. this field needs to be modified by tasks other than the
// running_task, so it requires cap->lock to modify. A task can
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;

// Messages, or END_TSO_QUEUE.
// Locks required: cap->lock
Message *inbox;

SparkPool *sparks;

// Stats on spark creation/conversion
SparkCounters spark_stats;
#if !defined(mingw32_HOST_OS)
// IO manager for this cap
int io_manager_control_wr_fd;
#endif
#endif

// Per-capability STM-related data
StgTVarWatchQueue *free_tvar_watch_queues;
StgInvariantCheckQueue *free_invariant_check_queues;
StgTRecChunk *free_trec_chunks;
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
} // typedef Capability is defined in RtsAPI.h
// We never want a Capability to overlap a cache line with anything
// else, so round it up to a cache line size:
#ifndef mingw32_HOST_OS
ATTRIBUTE_ALIGNED(64)
#endif
;

HsBool myCapabilityHasOtherRunnableThreads() {
return rts_unsafeGetMyCapability()->run_queue_hd == END_TSO_QUEUE ? HS_BOOL_FALSE : HS_BOOL_TRUE;
}
132 changes: 95 additions & 37 deletions bench/RunAll.hs
@@ -1,4 +1,4 @@
{-# LANGUAGE ConstraintKinds, TypeSynonymInstances, BangPatterns, ScopedTypeVariables, TupleSections, GADTs, RankNTypes, FlexibleInstances, FlexibleContexts, MultiParamTypeClasses, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ConstraintKinds, TypeSynonymInstances, BangPatterns, ScopedTypeVariables, TupleSections, GADTs, RankNTypes, FlexibleInstances, FlexibleContexts, MultiParamTypeClasses, GeneralizedNewtypeDeriving, ForeignFunctionInterface, ViewPatterns, TemplateHaskell, PatternSynonyms #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Main where
Expand All @@ -22,14 +22,39 @@ import System.IO
import System.Mem
import Prelude

import Data.IORef
import System.Mem.Weak
import Data.Function
import Control.Concurrent
import Data.Time.Clock
import GHC.Stats
import Data.Monoid
import Control.Arrow
import Control.Monad
import Data.Bool
import qualified GHC.Event as GHC
import Control.Concurrent.STM
import Data.Int
import Control.Exception
import Text.Read
import System.Environment
import Debug.Trace.LocationTH
import Control.Monad.Trans
import System.Process

import Unsafe.Coerce

import Data.Map (Map)
import qualified Data.Map as Map

type MonadReflexHost' t m = (MonadReflexHost t m, MonadIORef m, MonadIORef (HostFrame t))


setupFiring :: (MonadReflexHost t m, MonadIORef m) => Plan t (Event t a) -> m (Ignore (EventHandle t a), Schedule t)
setupFiring :: (MonadReflexHost t m, MonadIORef m) => Plan t (Event t a) -> m (EventHandle t a, Schedule t)
setupFiring p = do
(e, s) <- runPlan p
h <- subscribeEvent e
return (Ignore h, s)
return (h, s)

-- Hack to avoid the NFData constraint for EventHandle which is a synonym
newtype Ignore a = Ignore a
Expand All @@ -46,41 +71,74 @@ instance NFData (Firing t) where
rnf !(Firing _ _) = ()

-- Measure the running time
benchFiring :: (MonadReflexHost' t m, MonadSample t m) => (forall a. m a -> IO a) -> (String, TestCase) -> Benchmark
benchFiring runHost (name, TestE p) = env setup (\e -> bench name $ whnfIO $ run e) where
run (Ignore h, s) = runHost (readSchedule s (readEvent' h)) >> performGC
setup = runHost $ setupFiring p

benchFiring runHost (name, TestB p) = env setup (\e -> bench name $ whnfIO $ run e) where
run (b, s) = runHost (readSchedule s (sample b)) >> performGC
setup = runHost $ do
benchFiring :: forall t m. (MonadReflexHost' t m, MonadSample t m) => (forall a. m a -> IO a) -> TestCase -> Int -> IO ()
benchFiring runHost tc n = runHost $ do
let runIterations :: m a -> m ()
runIterations test = replicateM_ (10*n) $ do
result <- test
liftIO $ evaluate result
case tc of
TestE p -> do
(h, s) <- setupFiring p
runIterations $ readSchedule s $ readEvent' h
TestB p -> do
(b, s) <- runPlan p
return (b, makeDense s)
runIterations $ readSchedule (makeDense s) $ sample b

waitForFinalizers :: IO ()
waitForFinalizers = do
performGC
x <- getCurrentTime
isFinalized <- newIORef False
mkWeakPtr x $ Just $ writeIORef isFinalized True
performGC
fix $ \loop -> do
f <- readIORef isFinalized
if f then return () else do
threadDelay 1
loop

benchmarks :: [(String, Int -> IO ())]
benchmarks = implGroup "spider" runSpiderHost cases
where
implGroup :: (MonadReflexHost' t m, MonadSample t m) => String -> (forall a. m a -> IO a) -> [(String, TestCase)] -> [(String, Int -> IO ())]
implGroup name runHost = group name . fmap (second (benchFiring runHost))
group name = fmap $ first ((name <> "/") <>)
sub n frames = group ("subscribing " ++ show (n, frames)) $ Focused.subscribing n frames
firing n = group ("firing " <> show n) $ Focused.firing n
merging n = group ("merging " <> show n) $ Focused.merging n
dynamics n = group ("dynamics " <> show n) $ Focused.dynamics n
cases = concat
[ sub 100 40
, dynamics 100
, dynamics 1000
, firing 1000
, firing 10000
, merging 10
, merging 50
, merging 100
, merging 200
]

pattern RunTestCaseFlag = "--run-test-case"

spawnBenchmark :: String -> Benchmark
spawnBenchmark name = Benchmark name $ Benchmarkable $ \n -> do
self <- getExecutablePath
callProcess self [RunTestCaseFlag, name, show n, "+RTS", "-N1"]

foreign import ccall unsafe "myCapabilityHasOtherRunnableThreads" myCapabilityHasOtherRunnableThreads :: IO Bool

main :: IO ()
main = do
hSetBuffering stdout LineBuffering
defaultMainWith (defaultConfig { timeLimit = 10, csvFile = Just "dmap-original.csv" })
[ benchImpl "spider" runSpiderHost
]

benchImpl :: (MonadReflexHost' t m, MonadSample t m) => String -> (forall a. m a -> IO a) -> Benchmark
benchImpl name runHost = bgroup name [ sub 100 40
, dynamics 100
, dynamics 1000
, firing 1000
, firing 10000
, merging 10
, merging 50
, merging 100
, merging 200]
where
sub n frames = runGroup ("subscribing " ++ show (n, frames)) $ Focused.subscribing n frames
firing n = runGroup ("firing " ++ show n) $ Focused.firing n
merging n = runGroup ("merging " ++ show n) $ Focused.merging n
dynamics n = runGroup ("dynamics " ++ show n) $ Focused.dynamics n

runGroup name' benchmarks = bgroup name' (benchFiring runHost <$> benchmarks)



args <- getArgs
case args of
RunTestCaseFlag : t -> case t of
[name, readMaybe -> Just count] -> do
case lookup name benchmarks of
Just testCase -> testCase count
performGC
fix $ \loop -> bool (return ()) (yield >> loop) =<< myCapabilityHasOtherRunnableThreads
return ()
_ -> $failure "--run-test-case: expected test name and iteration count to follow"
_ -> defaultMainWith (defaultConfig { timeLimit = 20, csvFile = Just "dmap-original.csv", reportFile = Just "report.html" }) $ fmap (spawnBenchmark . fst) benchmarks
4 changes: 2 additions & 2 deletions default.nix
Expand Up @@ -2,7 +2,7 @@
, exception-transformers, haskell-src-exts, haskell-src-meta
, MemoTrie, mtl, primitive, ref-tf, semigroups, stdenv, syb
, template-haskell, these, transformers, transformers-compat
, criterion, deepseq, split, stm
, criterion, deepseq, split, stm, loch-th
}:
mkDerivation {
pname = "reflex";
Expand All @@ -16,7 +16,7 @@ mkDerivation {
testHaskellDepends = [
base containers dependent-map MemoTrie mtl ref-tf
# Benchmark dependencies
criterion deepseq split stm
criterion deepseq split stm loch-th
];
homepage = "https://github.com/reflex-frp/reflex";
description = "Higher-order Functional Reactive Programming";
Expand Down

0 comments on commit 8397284

Please sign in to comment.