Skip to content

Commit

Permalink
Still documenting
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Apr 1, 2023
1 parent 2b664cf commit 27d3e9c
Showing 1 changed file with 42 additions and 19 deletions.
61 changes: 42 additions & 19 deletions marconi-core/src/Marconi/Core/Experiment.hs
Expand Up @@ -31,10 +31,11 @@ What's include in this module:
Contrary to the original Marconi design, indexers don't have a mandatory memory representation.
In this module @desc@ is the descriptor of an indexer,
it's usually an uninhabited type used to define the corresponding type families.
In this module @desc@ is the descriptor of an indexer.
It's usually an uninhabited type used to define the corresponding type families.
The idea behind @desc@ is that you may want to provide different indexer for the same descriptor,
or to provide an indexer implementation that works for different descriptors.
Decoupling the descriptor enables such a decoupling.
-}
Expand Down Expand Up @@ -90,6 +91,12 @@ data TimedEvent desc =

makeLenses 'TimedEvent

-- | The base class of an indexer.
-- The indexer should provide two main functionality: indexing events, and providing its last synchronisation point.
--
-- @indexer@ the indexer implementation type
-- @desc@ the descriptor of the indexer, fixing the @Event@ and @Point@ types
-- @m@ the monad in which our indexer operates
class Monad m => IsIndex indexer desc m where

-- | index an event at a given point in time
Expand All @@ -103,8 +110,14 @@ class Monad m => IsIndex indexer desc m where

-- | Last sync of the indexer
lastSyncPoint :: indexer desc -> m (Maybe (Point desc))
{-# MINIMAL index, lastSyncPoint #-}

-- | The indexer can answer a Query to produce the corresponding result of that query
-- | The indexer can answer a Query to produce the corresponding result of that query.
--
-- * @indexer@ is the indexer implementation type
-- * @desc@ the descriptor of the indexer, fixing the @Point@ types
-- * @query@ the type of query we want to answer fixing the @Result@ type
-- * @m@ the monad in which our indexer operates
class Queryable indexer desc query m where

-- | Query an indexer at a given point in time
Expand All @@ -117,6 +130,9 @@ class Queryable indexer desc query m where


-- | We can reset an indexer to a previous `Point`
-- * @indexer@ is the indexer implementation type
-- * @desc@ the descriptor of the indexer, fixing the @Point@ types
-- * @m@ the monad in which our indexer operates
class Rewindable indexer desc m where

rewind :: Ord (Point desc) => Point desc -> indexer desc -> m (Maybe (indexer desc))
Expand All @@ -125,6 +141,10 @@ class Rewindable indexer desc m where
-- The main purpose is to speed up query processing.
-- If the indexer is 'Rewindable', 'Aggregable' can't 'rewind' behind the 'aggregationPoint',
-- the idea is to call 'aggregate' on points that can't be rollbacked anymore.
--
-- * @indexer@ is the indexer implementation type
-- * @desc@ the descriptor of the indexer, fixing the @Point@ types
-- * @m@ the monad in which our indexer operates
class Aggregable indexer desc m where

-- Aggregate events of the indexer up to a given point in time
Expand All @@ -139,12 +159,12 @@ class Resumable indexer desc m where
-- | Last sync of the indexer
syncPoints :: Ord (Point desc) => indexer desc -> m [Point desc]

-- * Base runIndexers

-- * Base runIndexers

-- ** Full in-memory indexer, backed by a list

-- | Full in memory indexer, it uses list because I was too lazy to port the 'Vector' implementation.
-- | A Full in memory indexer, it uses list because I was too lazy to port the 'Vector' implementation.
-- If we wanna move to these indexer, we should switch the implementation to the 'Vector' one.
data InMemory desc = InMemory
{ _events :: ![TimedEvent desc] -- ^ Stored 'Event', associated with their history 'Point'
Expand Down Expand Up @@ -203,8 +223,8 @@ makeLenses 'InDatabase
-- ** Mixed indexer

-- | An indexer that has at most '_blocksInMemory' events in memory and put the older one in database.
-- The query interface for this indexer will alwys go through the database first and then aggregate
-- results presents in memory.
-- The query interface for this indexer will alwyas go through the database first and then aggregate
-- results present in memory.
--
-- @mem@ the indexer that handle old events, when we need to remove stuff from memory
-- @store@ the indexer that handle the most recent events
Expand All @@ -217,15 +237,15 @@ data MixedIndexer mem store desc = MixedIndexer
makeLenses 'MixedIndexer

-- | The indexer can take a result and complete it with its events
-- It's useful for the in memory part of a 'MixedIndexer', as it specify
-- It's used by the in-memory part of a 'MixedIndexer' to specify
-- how we can complete the database result with the memory content.
class ResumableResult indexer desc result m where

resumeResult :: result -> indexer desc -> m result



-- | Flush the in-memory events to the database, keeping track of the latest index
-- | Flush all the in-memory events to the database, keeping track of the latest index
flush ::
( Monad m
, IsIndex store desc m
Expand Down Expand Up @@ -284,7 +304,6 @@ instance

-- * Indexer transformer: Add effects


-- ** Tracer Add tracing to an existing indexer

-- | Remarkable events of an indexer
Expand Down Expand Up @@ -326,11 +345,12 @@ instance
traceWith (indexer' ^. tracer) (Rollback p)
pure $ Just indexer'


-- ** Runners

-- | A runner encapsulate an indexer in an opaque type.
-- It allows us to manipulate seamlessly a list of indexers that has different types, as long as the implement the
-- required interfaces.
-- It allows us to manipulate seamlessly a list of indexers that has different types
-- as long as they implement the required interfaces.
data RunnerM m input point =
forall indexer desc.
( IsIndex indexer desc m
Expand Down Expand Up @@ -360,7 +380,7 @@ createRunner ix rb f = do
mvar <- STM.atomically $ STM.newTMVar ix
pure (mvar, Runner mvar rb f)

-- | The runner start waiting fo new event and process them as they come
-- | The runner starts waiting for new events and process them as they come
startRunner :: Ord point => TChan input -> QSemN -> Runner input point -> IO ()
startRunner chan tokens (Runner ix isRollback extractEvent) = do
chan' <- STM.atomically $ STM.dupTChan chan
Expand All @@ -374,10 +394,12 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do

unlockCoordinator = Con.signalQSemN tokens 1

fresherThan evt p = maybe True (< evt ^. point) p

indexEvent timedEvent = do
indexer <- STM.atomically $ STM.takeTMVar ix
indexerLastPoint <- lastSyncPoint indexer
if maybe True (< timedEvent ^. point) indexerLastPoint
if timedEvent `fresherThan` indexerLastPoint
then do
indexer' <- index timedEvent indexer
STM.atomically $ STM.putTMVar ix indexer'
Expand All @@ -400,7 +422,7 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do

-- | A coordinator synchronises the event processing of a list of indexers.
-- A coordinator is itself is an indexer.
-- It means that we can create a tree of indexer, with coordinators that partially process the data at each branch,
-- It means that we can create a tree of indexer, with coordinators that partially process the data at each node,
-- and with concrete indexers at the leaves.
data Coordinator input point = Coordinator
{ _lastSync :: !(Maybe point) -- ^ the last common sync point for the runners
Expand All @@ -413,6 +435,9 @@ data Coordinator input point = Coordinator
makeLenses 'Coordinator

-- | Get the common syncPoints of a group or runners
--
-- Important note : the syncpoints are sensible to rewind. It means that the result of this function may be invalid if
-- the indexer is rewinded.
runnerSyncPoints :: Ord point => [Runner input point] -> IO [point]
runnerSyncPoints [] = pure []
runnerSyncPoints (r:rs) = do
Expand All @@ -423,10 +448,8 @@ runnerSyncPoints (r:rs) = do

getSyncPoints :: Ord point => Runner input point -> IO [point]
getSyncPoints (Runner ix _ _) = do
indexer <- STM.atomically $ STM.takeTMVar ix
res <- syncPoints indexer
STM.atomically $ STM.putTMVar ix indexer
pure res
indexer <- STM.atomically $ STM.readTMVar ix
syncPoints indexer

-- | create a coordinator with started runners
start :: Ord point => [Runner input point] -> IO (Coordinator input point)
Expand Down

0 comments on commit 27d3e9c

Please sign in to comment.