This repository has been archived by the owner on Nov 29, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Operations.hs
177 lines (162 loc) · 7.21 KB
/
Operations.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Eventful.Store.Sql.Operations
( SqlEventStoreConfig (..)
, sqlEventStoreReader
, sqlGlobalEventStoreReader
, sqlGetProjectionIds
, sqlGetStreamEvents
, sqlMaxEventVersion
, sqlStoreEvents
, unsafeSqlStoreGlobalStreamEvents
) where
import Control.Monad.Reader
import Data.Foldable (for_)
import Data.Maybe (listToMaybe, maybe)
import Data.Monoid ((<>))
import Data.Text (Text)
import Database.Persist
import Database.Persist.Sql
import Eventful.Store.Class
import Eventful.UUID
import Eventful.Store.Sql.Orphans as X ()
data SqlEventStoreConfig entity serialized =
SqlEventStoreConfig
{ sqlEventStoreConfigSequenceMakeEntity :: UUID -> EventVersion -> serialized -> entity
-- Key manipulation
, sqlEventStoreConfigMakeKey :: SequenceNumber -> Key entity
, sqlEventStoreConfigUnKey :: Key entity -> SequenceNumber
-- Record functions
, sqlEventStoreConfigUUID :: entity -> UUID
, sqlEventStoreConfigVersion :: entity -> EventVersion
, sqlEventStoreConfigData :: entity -> serialized
-- EntityFields
, sqlEventStoreConfigSequenceNumberField :: EntityField entity (Key entity)
, sqlEventStoreConfigUUIDField :: EntityField entity UUID
, sqlEventStoreConfigVersionField :: EntityField entity EventVersion
, sqlEventStoreConfigDataField :: EntityField entity serialized
}
sqlEventStoreReader
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> VersionedEventStoreReader (SqlPersistT m) serialized
sqlEventStoreReader config = EventStoreReader $ sqlGetStreamEvents config
sqlGlobalEventStoreReader
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> GlobalEventStoreReader (SqlPersistT m) serialized
sqlGlobalEventStoreReader config =
EventStoreReader $ sqlGetAllEventsInRange config
sqlEventToGlobalStream
:: SqlEventStoreConfig entity serialized
-> Entity entity
-> GlobalStreamEvent serialized
sqlEventToGlobalStream config@SqlEventStoreConfig{..} (Entity key event) =
StreamEvent () (sqlEventStoreConfigUnKey key) (sqlEventToVersioned config event)
sqlEventToVersioned
:: SqlEventStoreConfig entity serialized
-> entity
-> VersionedStreamEvent serialized
sqlEventToVersioned SqlEventStoreConfig{..} entity =
StreamEvent
(sqlEventStoreConfigUUID entity)
(sqlEventStoreConfigVersion entity)
(sqlEventStoreConfigData entity)
sqlGetProjectionIds
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> SqlPersistT m [UUID]
sqlGetProjectionIds SqlEventStoreConfig{..} =
fmap unSingle <$> rawSql ("SELECT DISTINCT " <> uuidFieldName <> " FROM " <> tableName) []
where
(DBName tableName) = tableDBName (sqlEventStoreConfigSequenceMakeEntity nil 0 undefined)
(DBName uuidFieldName) = fieldDBName sqlEventStoreConfigSequenceNumberField
sqlGetStreamEvents
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> QueryRange UUID EventVersion
-> SqlPersistT m [VersionedStreamEvent serialized]
sqlGetStreamEvents config@SqlEventStoreConfig{..} QueryRange{..} = do
entities <- selectList filters selectOpts
return $ sqlEventToVersioned config . entityVal <$> entities
where
startFilter =
case queryRangeStart of
StartFromBeginning -> []
StartQueryAt start -> [sqlEventStoreConfigVersionField >=. start]
(endFilter, endSelectOpt) =
case queryRangeLimit of
NoQueryLimit -> ([], [])
MaxNumberOfEvents maxNum -> ([], [LimitTo maxNum])
StopQueryAt stop -> ([sqlEventStoreConfigVersionField <=. stop], [])
filters = (sqlEventStoreConfigUUIDField ==. queryRangeKey) : startFilter ++ endFilter
selectOpts = Asc sqlEventStoreConfigSequenceNumberField : endSelectOpt
sqlGetAllEventsInRange
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> QueryRange () SequenceNumber
-> SqlPersistT m [GlobalStreamEvent serialized]
sqlGetAllEventsInRange config@SqlEventStoreConfig{..} QueryRange{..} = do
entities <- selectList filters selectOpts
return $ sqlEventToGlobalStream config <$> entities
where
startFilter =
case queryRangeStart of
StartFromBeginning -> []
StartQueryAt start -> [sqlEventStoreConfigSequenceNumberField >=. sqlEventStoreConfigMakeKey start]
(endFilter, endSelectOpt) =
case queryRangeLimit of
NoQueryLimit -> ([], [])
MaxNumberOfEvents maxNum -> ([], [LimitTo maxNum])
StopQueryAt stop -> ([sqlEventStoreConfigSequenceNumberField <=. sqlEventStoreConfigMakeKey stop], [])
filters = startFilter ++ endFilter
selectOpts = Asc sqlEventStoreConfigSequenceNumberField : endSelectOpt
sqlMaxEventVersion
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> (DBName -> DBName -> DBName -> Text)
-> UUID
-> SqlPersistT m EventVersion
sqlMaxEventVersion SqlEventStoreConfig{..} maxVersionSql uuid =
let
tableName = tableDBName (sqlEventStoreConfigSequenceMakeEntity nil 0 undefined)
uuidFieldName = fieldDBName sqlEventStoreConfigUUIDField
versionFieldName = fieldDBName sqlEventStoreConfigVersionField
rawVals = rawSql (maxVersionSql tableName uuidFieldName versionFieldName) [toPersistValue uuid]
in maybe 0 unSingle . listToMaybe <$> rawVals
sqlStoreEvents
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> Maybe (Text -> Text)
-> (DBName -> DBName -> DBName -> Text)
-> UUID
-> [serialized]
-> SqlPersistT m EventVersion
sqlStoreEvents config@SqlEventStoreConfig{..} mLockCommand maxVersionSql uuid events = do
versionNum <- sqlMaxEventVersion config maxVersionSql uuid
let entities = zipWith (sqlEventStoreConfigSequenceMakeEntity uuid) [versionNum + 1..] events
-- NB: We need to take a lock on the events table or else the global sequence
-- numbers may not increase monotonically over time.
for_ mLockCommand $ \lockCommand -> rawExecute (lockCommand tableName) []
insertMany_ entities
return $ versionNum + (EventVersion $ length events)
where
(DBName tableName) = tableDBName (sqlEventStoreConfigSequenceMakeEntity nil 0 undefined)
-- | Useful if you have some 'GlobalStreamEvent's and you want to shove them in
-- a SQL event store. This can happen when you are moving events between event
-- stores, or you somehow generate the events outside of the current SQL event
-- store.
unsafeSqlStoreGlobalStreamEvents
:: (MonadIO m, PersistEntity entity, PersistEntityBackend entity ~ SqlBackend)
=> SqlEventStoreConfig entity serialized
-> [GlobalStreamEvent serialized]
-> SqlPersistT m ()
unsafeSqlStoreGlobalStreamEvents SqlEventStoreConfig{..} events =
insertEntityMany $ fmap mkEventEntity events
where
mkEventEntity (StreamEvent () seqNum (StreamEvent uuid vers event)) =
Entity
(sqlEventStoreConfigMakeKey seqNum)
(sqlEventStoreConfigSequenceMakeEntity uuid vers event)