Skip to content

Commit

Permalink
Merge pull request #1 from jdreaver/master
Browse files Browse the repository at this point in the history
pull latest.
  • Loading branch information
tjweir committed Jan 23, 2019
2 parents 5672297 + 3f0c604 commit 4127e77
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 36 deletions.
2 changes: 1 addition & 1 deletion circle.yml
@@ -1,7 +1,7 @@
machine:
environment:
PATH: "$HOME/.local/bin:$PATH"
STACK_VERSION: 1.5.1
STACK_VERSION: 1.6.1
_JAVA_OPTIONS: "-Xms512m -Xmx1024m"
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
Expand Down
4 changes: 2 additions & 2 deletions eventful-core/src/Eventful/CommandHandler.hs
Expand Up @@ -55,8 +55,8 @@ applyCommandHandler writer reader (CommandHandler handler proj) uuid command = d
let events = handler streamProjectionState command
mError <- storeEvents writer uuid (ExactPosition streamProjectionPosition) events
case mError of
(Just err) -> error $ "TODO: Create CommandHandler restart logic. " ++ show err
Nothing -> return events
Left err -> error $ "TODO: Create CommandHandler restart logic. " ++ show err
Right _ -> return events

-- | Use a pair of 'Serializer's to wrap a 'CommandHandler' with event type @event@
-- and command type @command@ so it uses the @serializedEvent@ and
Expand Down
8 changes: 4 additions & 4 deletions eventful-core/src/Eventful/EventBus.hs
Expand Up @@ -31,14 +31,14 @@ storeAndPublishEvents
-> UUID
-> ExpectedPosition EventVersion
-> [event]
-> m (Maybe (EventWriteError EventVersion))
-> m (Either (EventWriteError EventVersion) EventVersion)
storeAndPublishEvents store handlers uuid expectedVersion events = do
result <- storeEvents store uuid expectedVersion events
case result of
Just err -> return $ Just err
Nothing -> do
Left err -> return $ Left err
Right vers -> do
-- NB: If a handler stores events, then its events will be published
-- before the events of the next handler. That is, we will be storing
-- events generated by handlers in depth-first order.
mapM_ (\handler -> mapM_ (handler uuid) events) handlers
return Nothing
return $ Right vers
16 changes: 8 additions & 8 deletions eventful-core/src/Eventful/Store/Class.hs
Expand Up @@ -56,7 +56,7 @@ type GlobalEventStoreReader m event = EventStoreReader () SequenceNumber m (Glob
-- | An 'EventStoreWriter' is a function to write some events of type @event@
-- to an event store in some monad @m@.
newtype EventStoreWriter key position m event
= EventStoreWriter { storeEvents :: key -> ExpectedPosition position -> [event] -> m (Maybe (EventWriteError position)) }
= EventStoreWriter { storeEvents :: key -> ExpectedPosition position -> [event] -> m (Either (EventWriteError position) EventVersion) }

instance Contravariant (EventStoreWriter key position m) where
contramap f (EventStoreWriter writer) = EventStoreWriter $ \vers uuid -> writer vers uuid . fmap f
Expand Down Expand Up @@ -99,8 +99,8 @@ data EventWriteError position
transactionalExpectedWriteHelper
:: (Monad m, Ord position, Num position)
=> (key -> m position)
-> (key -> [event] -> m ())
-> key -> ExpectedPosition position -> [event] -> m (Maybe (EventWriteError position))
-> (key -> [event] -> m EventVersion)
-> key -> ExpectedPosition position -> [event] -> m (Either (EventWriteError position) EventVersion)
transactionalExpectedWriteHelper getLatestVersion' storeEvents' key expected =
go expected getLatestVersion' storeEvents' key
where
Expand All @@ -113,15 +113,15 @@ transactionalExpectedWriteHelper'
:: (Monad m)
=> Maybe (position -> Bool)
-> (key -> m position)
-> (key -> [event] -> m ())
-> key -> [event] -> m (Maybe (EventWriteError position))
-> (key -> [event] -> m EventVersion)
-> key -> [event] -> m (Either (EventWriteError position) EventVersion)
transactionalExpectedWriteHelper' Nothing _ storeEvents' uuid events =
storeEvents' uuid events >> return Nothing
storeEvents' uuid events >>= return . Right
transactionalExpectedWriteHelper' (Just f) getLatestVersion' storeEvents' uuid events = do
latestVersion <- getLatestVersion' uuid
if f latestVersion
then storeEvents' uuid events >> return Nothing
else return $ Just $ EventStreamNotAtExpectedVersion latestVersion
then storeEvents' uuid events >>= return . Right
else return $ Left $ EventStreamNotAtExpectedVersion latestVersion

-- | Changes the monad an 'EventStoreReader' runs in. This is useful to run
-- event stores in another 'Monad' while forgetting the original 'Monad'.
Expand Down
3 changes: 2 additions & 1 deletion eventful-dynamodb/src/Eventful/Store/DynamoDB.hs
Expand Up @@ -155,7 +155,7 @@ storeDynamoEvents
=> DynamoDBEventStoreConfig serialized
-> UUID
-> [serialized]
-> m ()
-> m EventVersion
storeDynamoEvents config@DynamoDBEventStoreConfig{..} uuid events = do
latestVersion <- latestEventVersion config uuid

Expand All @@ -170,6 +170,7 @@ storeDynamoEvents config@DynamoDBEventStoreConfig{..} uuid events = do
, (dynamoDBEventStoreConfigVersionAttributeName, attributeValue & avN ?~ T.pack (show version))
, (dynamoDBEventStoreConfigEventAttributeName, dynamoDBEventStoreConfigSerializedToValue event)
]
return $ latestVersion + (EventVersion $ length events)

-- | Helpful function to create the events table. If a table already exists
-- with the same name, then this function just uses that one. Note, there are
Expand Down
26 changes: 15 additions & 11 deletions eventful-memory/src/Eventful/Store/Memory.hs
Expand Up @@ -54,7 +54,11 @@ tvarEventStoreWriter :: TVar (EventMap event) -> VersionedEventStoreWriter STM e
tvarEventStoreWriter tvar = EventStoreWriter $ transactionalExpectedWriteHelper getLatestVersion storeEvents'
where
getLatestVersion uuid = flip latestEventVersion uuid <$> readTVar tvar
storeEvents' uuid events = modifyTVar' tvar (\store -> storeEventMap store uuid events)
storeEvents' uuid events = do
store <- readTVar tvar
let (store', vers) = storeEventMap store uuid events
writeTVar tvar store'
return vers

-- | Analog of 'tvarEventStoreReader' for a 'GlobalEventStoreReader'
tvarGlobalEventStoreReader :: TVar (EventMap event) -> GlobalEventStoreReader STM event
Expand Down Expand Up @@ -96,12 +100,12 @@ embeddedStateEventStoreWriter
embeddedStateEventStoreWriter getMap setMap = EventStoreWriter $ transactionalExpectedWriteHelper getLatestVersion storeEvents'
where
getLatestVersion uuid = flip latestEventVersion uuid <$> gets getMap
storeEvents' uuid events = modify' (modifyStore uuid events)
modifyStore uuid events state' =
let
store = getMap state'
store' = storeEventMap store uuid events
in setMap state' store'
storeEvents' uuid events = do
state' <- get
let store = getMap state'
let (store', vers) = storeEventMap store uuid events
put $ setMap state' store'
return vers

-- | Analogous to 'embeddedStateEventStore' for a 'GlobalStreamEventStore'.
embeddedStateGlobalEventStoreReader
Expand Down Expand Up @@ -150,11 +154,11 @@ lookupGlobalEvents (QueryRange () start limit) (EventMap _ globalEvents) = event
(StartQueryAt startSeq) -> startSeq

storeEventMap
:: EventMap event -> UUID -> [event] -> EventMap event
:: EventMap event -> UUID -> [event] -> (EventMap event, EventVersion)
storeEventMap store@(EventMap uuidMap globalEvents) uuid events =
let
versStart = latestEventVersion store uuid + 1
streamEvents = zipWith (StreamEvent uuid) [versStart..] events
versStart = latestEventVersion store uuid
streamEvents = zipWith (StreamEvent uuid) [versStart + 1..] events
newMap = Map.insertWith (flip (><)) uuid (Seq.fromList streamEvents) uuidMap
globalEvents' = globalEvents >< Seq.fromList streamEvents
in EventMap newMap globalEvents'
in (EventMap newMap globalEvents', versStart + (EventVersion $ length events))
1 change: 1 addition & 0 deletions eventful-postgresql/postgres-event-store-bench/.gitignore
@@ -0,0 +1 @@
pgbench-script.sql
@@ -0,0 +1,197 @@
#!/usr/bin/env bash

set -eu;

# Benchmark various configurations of postgres as an event store.

export PGHOST=${PGHOST:-localhost}
export PGDATABASE=${PGDATABASE:-postgres_event_store_bench}
export PGUSER=${PGUSER:-postgres}
export PGPASSWORD=${PGPASSWORD:-password}

PSQL="psql --no-psqlrc --quiet"

NUM_CLIENTS=${NUM_CLIENTS:-4}
BENCH_TIME_SECONDS=${BENCH_TIME_SECONDS:-10}
PGBENCH="pgbench --no-vacuum $PGDATABASE --client $NUM_CLIENTS --jobs $NUM_CLIENTS -f pgbench-script.sql --time $BENCH_TIME_SECONDS --report-latencies"

SYNCHRONOUS_COMMIT=${SYNCHRONOUS_COMMIT:-off}
TRACK_COMMIT_TIMESTAMP=${TRACK_COMMIT_TIMESTAMP:-false}

SMALL_EVENT='{"type":"mytype","value":"hello"}'
LARGE_EVENT='{"type":"mytype","value":"hello","a":1,"b":2,"c":3,"d":true,"e":"Mary had a little lamb her fleece was white as snow and everywhere that mary went her lamb was sure to go","f":true,"g":false}'

# Spin up a docker container to run postgres tests
if [ "$(docker ps -aq -f name=postgres_event_store_bench)" ]; then
docker stop postgres_event_store_bench
docker rm postgres_event_store_bench
fi
docker run \
--name postgres_event_store_bench \
-e "POSTGRES_DB=$PGDATABASE" \
-p 5432:5432/tcp \
-d postgres:10 \
postgres \
-c "synchronous_commit=$SYNCHRONOUS_COMMIT" \
-c "track_commit_timestamp=$TRACK_COMMIT_TIMESTAMP"
# -c shared_buffers=2000MB \
# -c wal_buffers=16MB \
# -c work_mem=20MB \
# -c wal_writer_delay=2s

# Wait for postgres to be available
until nc -z "$PGHOST" 5432; do
echo "waiting for postgres..."
sleep 1
done

# Give time for postgres to start
sleep 10

recreate_db() {
local real_pgdatabase="$PGDATABASE"
PGDATABASE=postgres $PSQL -c "DROP DATABASE IF EXISTS $real_pgdatabase;"
PGDATABASE=postgres $PSQL -c "CREATE DATABASE $real_pgdatabase;"
}

# Test insertion speed with simple schema and full table lock
recreate_db
$PSQL <<EOF
CREATE TABLE events (
sequence_number serial PRIMARY KEY,
event jsonb NOT NULL
-- created_at timestamp with time zone default now() NOT NULL
);
EOF

cat <<EOF > pgbench-script.sql
BEGIN;
LOCK events IN EXCLUSIVE MODE;
INSERT INTO events (event) VALUES ('$SMALL_EVENT');
COMMIT;
EOF

echo "Full table lock"
$PGBENCH

# Schema with logical logs embedded (UUID and version)
recreate_db
$PSQL <<EOF
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE events (
sequence_number serial PRIMARY KEY,
log_id uuid NOT NULL,
version int NOT NULL,
event jsonb NOT NULL,
UNIQUE (log_id, version)
);
EOF

# TODO: I think the main bottleneck here is running uuid_generate_v4(). We
# wouldn't have to do this in a real system because we would already know the
# UUID.

cat <<EOF > pgbench-script.sql
BEGIN;
LOCK events IN EXCLUSIVE MODE;
INSERT INTO events (log_id, version, event) VALUES (uuid_generate_v4(), 0, '$SMALL_EVENT');
COMMIT;
EOF

echo "Full table lock, complex schema, random insertion"
$PGBENCH

# Schema with UUID but no versions
recreate_db
$PSQL <<EOF
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE events (
sequence_number serial PRIMARY KEY,
log_id uuid NOT NULL,
event jsonb NOT NULL
);
CREATE INDEX events_log_id ON events (log_id);
EOF

cat <<EOF > pgbench-script.sql
BEGIN;
LOCK events IN EXCLUSIVE MODE;
INSERT INTO events (log_id, event) VALUES ('00000000-0000-0000-0000-000000000000', '$SMALL_EVENT');
COMMIT;
EOF

echo "Full table lock, UUID but no version"
$PGBENCH

# Test insertion with trigger as sequence number
recreate_db
$PSQL <<EOF
CREATE TABLE events (
id serial PRIMARY KEY,
sequence_number bigint,
event jsonb NOT NULL,
UNIQUE (sequence_number)
);
CREATE SEQUENCE events_sequence_number_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
CREATE OR REPLACE FUNCTION update_events_sequence_number()
RETURNS trigger AS
\$BODY\$
BEGIN
PERFORM pg_advisory_lock(1);
UPDATE events
SET sequence_number = nextval('events_sequence_number_seq'::regclass)
WHERE id = NEW.id;
RETURN NEW;
END;
\$BODY\$
LANGUAGE 'plpgsql';
CREATE TRIGGER update_events_sequence_number
AFTER INSERT ON events
FOR EACH ROW
EXECUTE PROCEDURE update_events_sequence_number();
EOF

cat <<EOF > pgbench-script.sql
BEGIN;
INSERT INTO events (event) VALUES ('$SMALL_EVENT');
COMMIT;
EOF

echo "Trigger with advisory lock"
$PGBENCH

# TODO: Test with millions (billions?) of rows already inserted
# TODO: Insert multiple rows per transaction
# TODO: BRIN indexes
# TODO: Query and streaming throughput

# Notes of stuff I've discovered so far:

# * Setting synchronous_commit=off makes all the difference in the world. It
# increases insert throughput by like 10 times. During one test, the simple
# full table lock version went from 400 tps to 4000 tps on my laptop. Note that
# this means we will see data loss of up to 3 times wal_writer_delay, and if
# this is acceptable or not is domain specific. Also, in most systems like
# Kinesis and Kafka an ACK of the event means it is stored durably.

# * I'm not sure if in the trigger version the advisory lock on the sequence is
# necessary to achieve monotoinc reads. My gut says it is. With the advisory
# lock it is definitely slower than just a full table lock.

# * (Needs more testing) Event size doesn't have too much of an effect on write
# speed. That means if you have large events then the total bytes per second
# when using postgres could be fairly large. Also, if you batch event writes
# (like you probably would anyway with something like Kafka or Kinesis), then
# the total events per second and bytes per second is potentially event larger.
3 changes: 2 additions & 1 deletion eventful-sql-common/src/Eventful/Store/Sql/Operations.hs
Expand Up @@ -147,14 +147,15 @@ sqlStoreEvents
-> (DBName -> DBName -> DBName -> Text)
-> UUID
-> [serialized]
-> SqlPersistT m ()
-> SqlPersistT m EventVersion
sqlStoreEvents config@SqlEventStoreConfig{..} mLockCommand maxVersionSql uuid events = do
versionNum <- sqlMaxEventVersion config maxVersionSql uuid
let entities = zipWith (sqlEventStoreConfigSequenceMakeEntity uuid) [versionNum + 1..] events
-- NB: We need to take a lock on the events table or else the global sequence
-- numbers may not increase monotonically over time.
for_ mLockCommand $ \lockCommand -> rawExecute (lockCommand tableName) []
insertMany_ entities
return $ versionNum + (EventVersion $ length events)
where
(DBName tableName) = tableDBName (sqlEventStoreConfigSequenceMakeEntity nil 0 undefined)

Expand Down
14 changes: 7 additions & 7 deletions eventful-test-helpers/src/Eventful/TestHelpers.hs
Expand Up @@ -194,21 +194,21 @@ eventStoreSpec (EventStoreRunner withStore) = do
(,) <$>
storeEvents writer nil StreamExists [Added 1] <*>
storeEvents writer nil (ExactPosition 0) [Added 1]
err1 `shouldBe` Just (EventStreamNotAtExpectedVersion (-1))
err2 `shouldBe` Just (EventStreamNotAtExpectedVersion (-1))
err1 `shouldBe` Left (EventStreamNotAtExpectedVersion (-1))
err2 `shouldBe` Left (EventStreamNotAtExpectedVersion (-1))

it "should be able to store events starting with an empty stream" $ do
withStore (\writer _ -> storeEvents writer nil NoStream [Added 1]) `shouldReturn` Nothing
withStore (\writer _ -> storeEvents writer nil NoStream [Added 1]) `shouldReturn` Right 0

it "should reject storing events sometimes with a stream" $ do
(err1, err2, err3) <- withStore $ \writer _ ->
(,,) <$>
storeEvents writer nil NoStream [Added 1] <*>
storeEvents writer nil NoStream [Added 1] <*>
storeEvents writer nil (ExactPosition 1) [Added 1]
err1 `shouldBe` Nothing
err2 `shouldBe` Just (EventStreamNotAtExpectedVersion 0)
err3 `shouldBe` Just (EventStreamNotAtExpectedVersion 0)
err1 `shouldBe` Right 0
err2 `shouldBe` Left (EventStreamNotAtExpectedVersion 0)
err3 `shouldBe` Left (EventStreamNotAtExpectedVersion 0)

it "should accepts storing events sometimes with a stream" $ do
errors <- withStore $ \writer _ ->
Expand All @@ -218,7 +218,7 @@ eventStoreSpec (EventStoreRunner withStore) = do
, storeEvents writer nil (ExactPosition 1) [Added 1]
, storeEvents writer nil StreamExists [Added 1]
]
errors `shouldBe` [Nothing, Nothing, Nothing, Nothing]
errors `shouldBe` [Right 0, Right 1, Right 2, Right 3]

newtype GlobalStreamEventStoreRunner m =
GlobalStreamEventStoreRunner
Expand Down

0 comments on commit 4127e77

Please sign in to comment.