Skip to content

Commit

Permalink
Propose a solution to delay processing
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Apr 1, 2023
1 parent 819eed9 commit af54dc6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
1 change: 1 addition & 0 deletions marconi-core/marconi-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ library
hs-source-dirs: src
build-depends:
, base >=4.7 && <5
, containers
, contra-tracer
, lens
, mtl
Expand Down
59 changes: 58 additions & 1 deletion marconi-core/src/Marconi/Core/Experiment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module Marconi.Core.Experiment where

import Control.Concurrent (QSemN)
import Control.Concurrent qualified as Con
import Control.Lens (filtered, folded, makeLenses, view, (%~), (&), (<<.~), (?~), (^.), (^..), (^?))
import Control.Lens (filtered, folded, makeLenses, view, (%~), (&), (+~), (.~), (<<.~), (?~), (^.), (^..), (^?))
import Control.Monad (forever, guard)
import Control.Monad.Except (MonadError (catchError, throwError))
import Control.Tracer (Tracer, traceWith)
Expand All @@ -52,6 +52,8 @@ import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Data.Foldable (foldlM, foldrM, traverse_)
import Data.Functor (($>))
import Data.List (intersect)
import Data.Sequence (Seq, ViewR (EmptyR, (:>)), viewr, (<|))
import Data.Sequence qualified as Seq
import Data.Text (Text)


Expand Down Expand Up @@ -308,6 +310,8 @@ data ProcessedInput event
= Rollback !(Point event)
| Index !(TimedEvent event)

-- * Runners

-- | A runner encapsulate an indexer in an opaque type.
-- It allows us to manipulate seamlessly a list of indexers that has different types
-- as long as they implement the required interfaces.
Expand Down Expand Up @@ -579,3 +583,56 @@ instance
pure $ Just indexer'


-- ** Delaying insert

-- | When indexing computation is expensive, you may want to delay it to avoid expensive rollback
-- 'WithDelay' buffers events before sending them to the underlying indexer.
-- Buffered events are sent when the buffers overflows.
data WithDelay index event =
WithDelay
{ _bufferedIndexer :: !(index event)
, _bufferCapacity :: !Word
, _bufferSize :: !Word
, _buffer :: !(Seq (TimedEvent event))
}

makeLenses 'WithDelay

isFull :: WithDelay indexer event -> Bool
isFull b = (b ^. bufferSize) >= (b ^. bufferCapacity)

instance
(Monad m, IsIndex index event m) =>
IsIndex (WithDelay index) event m where

index timedEvent indexer = let
bufferEvent = (bufferSize +~ 1) . (buffer %~ (timedEvent <|))
pushAndGetOldest b = case viewr b of
EmptyR -> (timedEvent, b)
(buffer' :> e') -> (e', timedEvent <| buffer')
in do
if not $ isFull indexer
then pure $ bufferEvent indexer
else do
let b = indexer ^. buffer
(oldest, buffer') = pushAndGetOldest b
res <- bufferedIndexer (index oldest) indexer
pure $ res & buffer .~ buffer'

lastSyncPoint = lastSyncPoint . view bufferedIndexer

instance
( Monad m
, Rewindable index event m
, Ord (Point event)
) => Rewindable (WithDelay index) event m where

rewind p indexer = let
rewindWrappedIndexer p' = bufferedIndexer (MaybeT . rewind p') indexer
resetBuffer = (bufferSize .~ 0) . (buffer .~ Seq.empty)
(after, before) = Seq.spanl ((p <=) . view point) $ indexer ^. buffer
in if Seq.null before
then fmap resetBuffer <$> runMaybeT (rewindWrappedIndexer p)
else pure $ pure $ indexer & buffer .~ after & bufferSize .~ fromIntegral (Seq.length after)


0 comments on commit af54dc6

Please sign in to comment.