Skip to content

Commit

Permalink
Tracing is a indexer transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Apr 1, 2023
1 parent 42792f0 commit 095dbb2
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions marconi-core/src/Marconi/Core/Experiment.hs
Expand Up @@ -25,6 +25,7 @@ What's include in this module:
- Base type classes to define an indexer, its query interface, and the required plumbing to handle rollback
- A full in-memory indexer (naive), an indexer that compose it with a SQL layer for persistence
- Tracing, as a modifier to an existing indexer (it allows us to opt in for traces if we want, indexer by indexer)
- A coordinator for indexers, that can be exposed as an itdexer itself
Expand Down Expand Up @@ -232,6 +233,40 @@ data IndexerNotification desc
| Process !(Point desc)
| Issue !(Event desc)

-- | A tracer modifier that adds tracing to an existing indexer
data WithTracer m indexer desc =
WithTracer
{ _tracedIndexer :: !(indexer desc)
, _tracer :: !(Tracer m (IndexerNotification desc))
}

makeLenses 'WithTracer

instance
(Monad m, IsIndex index desc m) =>
IsIndex (WithTracer m index) desc m where

insert point event indexer = do
res <- tracedIndexer (insert point event) indexer
traceWith (indexer ^. tracer) (Issue event)
traceWith (indexer ^. tracer) (Process point)
pure res

lastSyncPoint = lastSyncPoint . view tracedIndexer

instance
( Monad m
, Rewindable index desc m
) => Rewindable (WithTracer m index) desc m where

rewind p indexer = do
res <- runMaybeT $ tracedIndexer (MaybeT . rewind p) indexer
maybe (pure Nothing) traceSuccessfulRewind res
where
traceSuccessfulRewind indexer' = do
traceWith (indexer' ^. tracer) (Rollback p)
pure $ Just indexer'

-- | A runner encapsulate an indexer in an opaque type, that allows to plug different indexers to the same stream of
-- input data
data RunnerM m input point =
Expand All @@ -241,11 +276,8 @@ data RunnerM m input point =
{ runnerState :: !(TMVar (indexer desc))
, identifyRollback :: !(input -> m (Maybe (Point desc)))
, extractEvent :: !(input -> m (Maybe (Point desc, Event desc)))
, tracer :: !(Tracer m (IndexerNotification desc))
}

makeLenses 'Runner

type Runner = RunnerM IO

-- | create a runner for an indexer, retuning the runner and the 'MVar' it's using internally
Expand All @@ -254,15 +286,14 @@ createRunner ::
indexer desc ->
(input -> IO (Maybe point)) ->
(input -> IO (Maybe (point, Event desc))) ->
Tracer IO (IndexerNotification desc) ->
IO (TMVar (indexer desc), Runner input point)
createRunner ix rb f tr = do
createRunner ix rb f = do
mvar <- STM.atomically $ STM.newTMVar ix
pure (mvar, Runner mvar rb f tr)
pure (mvar, Runner mvar rb f)

-- | The runner start waiting fo new event and process them as they come
startRunner :: Ord point => TChan input -> QSemN -> Runner input point -> IO ()
startRunner chan tokens (Runner ix isRollback extractEvent tracer) = do
startRunner chan tokens (Runner ix isRollback extractEvent) = do
chan' <- STM.atomically $ STM.dupTChan chan
input <- STM.atomically $ STM.readTChan chan'
forever $ do
Expand All @@ -289,8 +320,6 @@ startRunner chan tokens (Runner ix isRollback extractEvent tracer) = do
Nothing -> pure ()
Just (point, event) -> do
indexEvent point event
traceWith tracer (Issue event)
traceWith tracer (Process point)

handleRollback p = do
indexer <- STM.atomically $ STM.takeTMVar ix
Expand All @@ -299,7 +328,6 @@ startRunner chan tokens (Runner ix isRollback extractEvent tracer) = do
(STM.atomically $ STM.putTMVar ix indexer)
(STM.atomically . STM.putTMVar ix)
mindexer
traceWith tracer (Rollback p)

data Coordinator input point = Coordinator
{ _lastSync :: !(Maybe point) -- ^ the last common sync point for the runners
Expand All @@ -321,7 +349,7 @@ runnerSyncPoints (r:rs) = do
where

getSyncPoints :: Ord point => Runner input point -> IO [point]
getSyncPoints (Runner ix _ _ _) = do
getSyncPoints (Runner ix _ _) = do
indexer <- STM.atomically $ STM.takeTMVar ix
res <- syncPoints indexer
STM.atomically $ STM.putTMVar ix indexer
Expand Down

0 comments on commit 095dbb2

Please sign in to comment.