Skip to content

Commit

Permalink
TimedEvent is more explicit than tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Apr 1, 2023
1 parent 0cadf52 commit 255c587
Showing 1 changed file with 38 additions and 28 deletions.
66 changes: 38 additions & 28 deletions marconi-core/src/Marconi/Core/Experiment.hs
Expand Up @@ -39,7 +39,7 @@ module Marconi.Core.Experiment where

import Control.Concurrent (QSemN)
import Control.Concurrent qualified as Con
import Control.Lens (makeLenses, view, (%~), (&), (<<.~), (?~), (^.))
import Control.Lens (folded, makeLenses, view, (%~), (&), (<<.~), (?~), (^.), (^..))
import Control.Monad (forever, guard)
import Control.Tracer (Tracer, traceWith)

Expand All @@ -66,17 +66,24 @@ type family Point desc
type family Event desc
data family Result query

data TimedEvent desc =
TimedEvent
{ _point :: !(Point desc)
, _event :: !(Event desc)
}

makeLenses 'TimedEvent

class Monad m => IsIndex indexer desc m where

-- | index an event at a given point in time
index :: Eq (Point desc) =>
Point desc -> Event desc -> indexer desc -> m (indexer desc)
TimedEvent desc -> indexer desc -> m (indexer desc)

-- | Index a bunch of points, associated to their event, in an indexer
indexAll :: (Eq (Point desc), Foldable f) =>
f (Point desc, Event desc) -> indexer desc -> m (indexer desc)
indexAll = flip (foldrM (uncurry index))
f (TimedEvent desc) -> indexer desc -> m (indexer desc)
indexAll = flip (foldrM index)

-- | Last sync of the indexer
lastSyncPoint :: indexer desc -> m (Maybe (Point desc))
Expand Down Expand Up @@ -120,17 +127,17 @@ class Resumable indexer desc m where
-- | Full in memory indexer, it uses list because I was too lazy to port the 'Vector' implementation.
-- If we wanna move to these indexer, we should switch the implementation to the 'Vector' one.
data InMemory desc = InMemory
{ _events :: ![(Point desc, Event desc)] -- ^ Stored 'Event', associated with their history 'Point'
{ _events :: ![TimedEvent desc] -- ^ Stored 'Event', associated with their history 'Point'
, _latest :: !(Maybe (Point desc)) -- ^ Ease access to the latest datapoint
}

makeLenses 'InMemory

instance (Monad m) => IsIndex InMemory desc m where

index p e ix = pure $ ix
& events %~ ((p, e):)
& latest ?~ p
index timedEvent ix = pure $ ix
& events %~ (timedEvent:)
& latest ?~ (timedEvent ^. point)

lastSyncPoint = pure . view latest

Expand All @@ -149,13 +156,13 @@ instance Applicative m => Rewindable InMemory desc m where
cleanRecentEvents = events %~ dropWhile isEventAfterRollback
isIndexBeforeRollback :: InMemory desc -> Bool
isIndexBeforeRollback x = maybe True (p >=) $ x ^. latest
isEventAfterRollback :: (Point desc, a) -> Bool
isEventAfterRollback = (p <) . fst
isEventAfterRollback :: TimedEvent desc -> Bool
isEventAfterRollback = (p <) . view point

instance Applicative m => Resumable InMemory desc m where

syncPoints ix = let
indexPoints = fst <$> (ix ^. events)
indexPoints = ix ^.. events . folded . point
-- if the latest point of the index is not a stored event, we add it to the list of points
addLatestIfNeeded Nothing ps = ps
addLatestIfNeeded (Just p) [] = [p]
Expand Down Expand Up @@ -205,14 +212,14 @@ instance
, IsIndex store desc m
) => IsIndex (MixedIndexer InMemory store) desc m where

index point e indexer = do
index timedEvent indexer = do
let maxMemSize = fromIntegral $ indexer ^. slotsInMemory
currentSize = length (indexer ^. inMemory . events)
if currentSize >= maxMemSize
then do
indexer' <- flush indexer
inMemory (index point e) indexer'
else inMemory (index point e) indexer
inMemory (index timedEvent) indexer'
else inMemory (index timedEvent) indexer

lastSyncPoint = lastSyncPoint . view inMemory

Expand Down Expand Up @@ -246,8 +253,7 @@ instance
-- | Remarkable events of an indexer
data IndexerNotification desc
= Rollback !(Point desc)
| Process !(Point desc)
| Issue !(Event desc)
| Index !(TimedEvent desc)

-- | A tracer modifier that adds tracing to an existing indexer
data WithTracer m indexer desc =
Expand All @@ -262,10 +268,9 @@ instance
(Monad m, IsIndex index desc m) =>
IsIndex (WithTracer m index) desc m where

index point event indexer = do
res <- tracedIndexer (index point event) indexer
traceWith (indexer ^. tracer) (Issue event)
traceWith (indexer ^. tracer) (Process point)
index timedEvent indexer = do
res <- tracedIndexer (index timedEvent) indexer
traceWith (indexer ^. tracer) $ Index timedEvent
pure res

lastSyncPoint = lastSyncPoint . view tracedIndexer
Expand Down Expand Up @@ -295,7 +300,7 @@ data RunnerM m input point =
Runner
{ runnerState :: !(TMVar (indexer desc))
, identifyRollback :: !(input -> m (Maybe (Point desc)))
, extractEvent :: !(input -> m (Maybe (Point desc, Event desc)))
, extractEvent :: !(input -> m (Maybe (TimedEvent desc)))
}

type Runner = RunnerM IO
Expand All @@ -308,7 +313,7 @@ createRunner ::
, point ~ Point desc) =>
indexer desc ->
(input -> IO (Maybe point)) ->
(input -> IO (Maybe (point, Event desc))) ->
(input -> IO (Maybe (TimedEvent desc))) ->
IO (TMVar (indexer desc), Runner input point)
createRunner ix rb f = do
mvar <- STM.atomically $ STM.newTMVar ix
Expand All @@ -328,21 +333,21 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do

unlockCoordinator = Con.signalQSemN tokens 1

indexEvent p e = do
indexEvent timedEvent = do
indexer <- STM.atomically $ STM.takeTMVar ix
indexerLastPoint <- lastSyncPoint indexer
if maybe True (< p) indexerLastPoint
if maybe True (< timedEvent ^. point) indexerLastPoint
then do
indexer' <- index p e indexer
indexer' <- index timedEvent indexer
STM.atomically $ STM.putTMVar ix indexer'
else STM.atomically $ STM.putTMVar ix indexer

handleInsert input = do
me <- extractEvent input
case me of
Nothing -> pure ()
Just (point, event) -> do
indexEvent point event
Just timedEvent -> do
indexEvent timedEvent

handleRollback p = do
indexer <- STM.atomically $ STM.takeTMVar ix
Expand Down Expand Up @@ -411,7 +416,8 @@ makeLenses 'CoordinatorIndex
-- A coordinator can be consider as an indexer that forwards the input to its runner
instance IsIndex CoordinatorIndex desc IO where

index point event = coordinator $ \x -> step (const point) x event
index timedEvent = coordinator $
\x -> step (const $ timedEvent ^. point) x $ timedEvent ^. event

lastSyncPoint indexer = pure $ indexer ^. coordinator . lastSync

Expand Down Expand Up @@ -441,3 +447,7 @@ instance Rewindable CoordinatorIndex desc IO where
(STM.atomically (STM.putTMVar runnerState indexer) $> Nothing)
((Just r <$) . STM.atomically . STM.putTMVar runnerState)
res

-- There is no point in providing a 'Queryable' interface for 'CoordinatorIndex' though,
-- as it's sole interest would be to get the latest synchronisation points,
-- but 'query' requires a 'Point' to provide a result.

0 comments on commit 255c587

Please sign in to comment.