91 changes: 58 additions & 33 deletions marconi-core/src/Marconi/Core/Experiment.hs
Expand Up @@ -53,43 +53,47 @@ import Data.Functor (($>))
import Data.List (intersect)
import Database.SQLite.Simple qualified as SQL

-- | A point in time
-- | A point in time, the concrete type of a point is now derived from an indexer descriptor,
-- instead of an event.
-- The reason is that you may not want to always carry around a point when you manipulate an event.
type family Point desc
-- |
-- A an element that you want to capture from a given input. A given point in time will always correspond to an event.
-- As a consequence if a point in time can be associated with no event, wrap a `Maybe` type, if several events need to be
-- associated to the same point in time, wrap a `List`.
-- A an element that you want to capture from a given input.
-- A given point in time will always correspond to an event.
-- As a consequence if a point in time can be associated with no event,
-- wrap it in a `Maybe` type,
-- if several events need to be associated to the same point in time, wrap a `List`, etc.
type family Event desc
data family Result query

class Monad m => IsIndex indexer desc m where

insert :: Eq (Point desc) =>
-- | index an event at a given point in time
index :: Eq (Point desc) =>
Point desc -> Event desc -> indexer desc -> m (indexer desc)

-- | Store a bunch of points, associated to their event, in an indexer
insertAll :: (Eq (Point desc), Foldable f) =>
-- | Index a bunch of points, associated to their event, in an indexer
indexAll :: (Eq (Point desc), Foldable f) =>
f (Point desc, Event desc) -> indexer desc -> m (indexer desc)
insertAll = flip (foldrM (uncurry insert))
indexAll = flip (foldrM (uncurry index))

-- | Last sync of the indexer
lastSyncPoint :: indexer desc -> m (Maybe (Point desc))

-- | The indexer can answer a Query to produce the corresponding result
-- | The indexer can answer a Query to produce the corresponding result of that query
class Queryable indexer desc query m where

-- | Query an indexer at a given point in time
-- It can be read as:
-- "With the knowledge you have at that point in time,
-- what is your answer to this query?"
-- Note: Not sure we shouldn't explicitly handle error when we query an invalid point here
query :: Point desc -> query -> indexer desc -> m (Result query)

-- | The indexer can take a result and complete it with its events
class ResumableResult indexer desc result m where

resumeResult :: result -> indexer desc -> m result

-- | The indexer can be reset to a previous `Point`
-- | We can reset an indexer to a previous `Point`
class Rewindable indexer desc m where

rewind :: Ord (Point desc) => Point desc -> indexer desc -> m (Maybe (indexer desc))
Expand Down Expand Up @@ -124,7 +128,7 @@ makeLenses 'InMemory

instance (Monad m) => IsIndex InMemory desc m where

insert p e ix = pure $ ix
index p e ix = pure $ ix
& events %~ ((p, e):)
& latest ?~ p

Expand Down Expand Up @@ -174,6 +178,14 @@ data MixedIndexer mem store desc = MixedIndexer

makeLenses 'MixedIndexer

-- | The indexer can take a result and complete it with its events
-- It's useful for the in memory part of a 'MixedIndexer', as it specify
-- how we can complete the database result with the memory content.
class ResumableResult indexer desc result m where

resumeResult :: result -> indexer desc -> m result

-- | Flush the in-memory events to the database, keeping track of the latest index
flush ::
Expand All @@ -185,22 +197,22 @@ flush ::
flush indexer = let
flushMemory = inMemory . events <<.~ []
(eventsToFlush, indexer') = flushMemory indexer
in inDatabase (insertAll eventsToFlush) indexer'
in inDatabase (indexAll eventsToFlush) indexer'

( Monad m
, IsIndex InMemory desc m
, IsIndex store desc m
) => IsIndex (MixedIndexer InMemory store) desc m where

insert point e indexer = do
index point e indexer = do
let maxMemSize = fromIntegral $ indexer ^. slotsInMemory
currentSize = length (indexer ^. inMemory . events)
if currentSize >= maxMemSize
then do
indexer' <- flush indexer
inMemory (insert point e) indexer'
else inMemory (insert point e) indexer
inMemory (index point e) indexer'
else inMemory (index point e) indexer

lastSyncPoint = lastSyncPoint . view inMemory

Expand All @@ -212,22 +224,26 @@ instance
rewind p indexer = do
mindexer <- runMaybeT $ inMemory rewindInStore indexer
case mindexer of
Just ix -> if null $ ix ^. inMemory . events
then runMaybeT $ inDatabase rewindInStore ix
else pure $ pure ix -- if there are still event in memory, no need to rewind the database
Just ix -> if not $ null $ ix ^. inMemory . events
then pure $ pure ix -- if there are still events in memory, no need to rewind the database
else runMaybeT $ inDatabase rewindInStore ix
Nothing -> pure Nothing
rewindInStore :: Rewindable index desc m => index desc -> MaybeT m (index desc)
rewindInStore = MaybeT . rewind p

( Monad m , ResumableResult InMemory desc (Result query) m , Queryable store desc query m) =>
( Monad m
, ResumableResult InMemory desc (Result query) m
, Queryable store desc query m
) =>
Queryable (MixedIndexer InMemory store) desc query m where

query valid q indexer = do
res <- query valid q $ indexer ^. inDatabase
resumeResult res $ indexer ^. inMemory

-- | Remarkable events of an indexer
data IndexerNotification desc
= Rollback !(Point desc)
| Process !(Point desc)
Expand All @@ -246,8 +262,8 @@ instance
(Monad m, IsIndex index desc m) =>
IsIndex (WithTracer m index) desc m where

insert point event indexer = do
res <- tracedIndexer (insert point event) indexer
index point event indexer = do
res <- tracedIndexer (index point event) indexer
traceWith (indexer ^. tracer) (Issue event)
traceWith (indexer ^. tracer) (Process point)
pure res
Expand All @@ -271,7 +287,11 @@ instance
-- input data
data RunnerM m input point =
forall indexer desc.
(IsIndex indexer desc m, Resumable indexer desc m, Rewindable indexer desc m, Point desc ~ point) =>
( IsIndex indexer desc m
, Resumable indexer desc m
, Rewindable indexer desc m
, Point desc ~ point
) =>
{ runnerState :: !(TMVar (indexer desc))
, identifyRollback :: !(input -> m (Maybe (Point desc)))
Expand All @@ -282,7 +302,10 @@ type Runner = RunnerM IO

-- | create a runner for an indexer, retuning the runner and the 'MVar' it's using internally
createRunner ::
(IsIndex indexer desc IO, Resumable indexer desc IO, Rewindable indexer desc IO, point ~ Point desc) =>
( IsIndex indexer desc IO
, Resumable indexer desc IO
, Rewindable indexer desc IO
, point ~ Point desc) =>
indexer desc ->
(input -> IO (Maybe point)) ->
(input -> IO (Maybe (point, Event desc))) ->
Expand Down Expand Up @@ -310,7 +333,7 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do
indexerLastPoint <- lastSyncPoint indexer
if maybe True (< p) indexerLastPoint
then do
indexer' <- insert p e indexer
indexer' <- index p e indexer
STM.atomically $ STM.putTMVar ix indexer'
else STM.atomically $ STM.putTMVar ix indexer

Expand Down Expand Up @@ -388,7 +411,7 @@ makeLenses 'CoordinatorIndex
-- A coordinator can be consider as an indexer that forwards the input to its runner
instance IsIndex CoordinatorIndex desc IO where

insert point event = coordinator $ \x -> step (const point) x event
index point event = coordinator $ \x -> step (const point) x event

lastSyncPoint indexer = pure $ indexer ^. coordinator . lastSync

Expand All @@ -402,7 +425,8 @@ instance Rewindable CoordinatorIndex desc IO where
MaybeT IO (Coordinator (Event desc) (Point desc))
rewindRunners c = do
availableSyncs <- lift $ runnerSyncPoints $ c ^. runners
guard $ p `elem` availableSyncs -- we start by checking if the given point is a valid sync point
-- we start by checking if the given point is a valid sync point
guard $ p `elem` availableSyncs
runners (traverse $ MaybeT . rewindRunner) c

rewindRunner ::
Expand All @@ -412,7 +436,8 @@ instance Rewindable CoordinatorIndex desc IO where
indexer <- STM.atomically $ STM.takeTMVar runnerState
res <- rewind p indexer
-- the Nothing case should not happen as we check that the sync point is a valid one
-- the Nothing case should not happen
-- as we check that the sync point is a valid one
(STM.atomically (STM.putTMVar runnerState indexer) $> Nothing)
((Just r <$) . STM.atomically . STM.putTMVar runnerState)

