-
Notifications
You must be signed in to change notification settings - Fork 86
/
Server.hs
156 lines (140 loc) · 4.96 KB
/
Server.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
{-# LANGUAGE UndecidableInstances #-}
module Hydra.API.Server where
import Hydra.Prelude hiding (TVar, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.HTTPServer (httpApp)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
HeadStatus (Idle),
ServerOutput,
TimedServerOutput (..),
projectHeadStatus,
projectInitializingHeadId,
projectSnapshotUtxo,
)
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (
Chain (..),
IsChainState,
)
import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental (..))
import Network.HTTP.Types (status500)
import Network.Wai (responseLBS)
import Network.Wai.Handler.Warp (
defaultSettings,
runSettings,
setBeforeMainLoop,
setHost,
setOnException,
setOnExceptionResponse,
setPort,
)
import Network.Wai.Handler.WebSockets (websocketsOr)
import Network.WebSockets (
defaultConnectionOptions,
)
-- | Handle to provide a means for sending server outputs to clients.
newtype Server tx m = Server
{ sendOutput :: ServerOutput tx -> m ()
-- ^ Send some output to all connected clients.
}
-- | Callback for receiving client inputs.
type ServerCallback tx m = ClientInput tx -> m ()
-- | A type tying both receiving input and sending output into a /Component/.
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a
withAPIServer ::
forall tx.
IsChainState tx =>
IP ->
PortNumber ->
Party ->
PersistenceIncremental (TimedServerOutput tx) IO ->
Tracer IO APIServerLog ->
Chain tx IO ->
PParams LedgerEra ->
ServerComponent tx IO ()
withAPIServer host port party PersistenceIncremental{loadAll, append} tracer chain pparams callback action =
handle onIOException $ do
responseChannel <- newBroadcastTChanIO
timedOutputEvents <- loadAll
-- Intialize our read model from stored events
headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus
snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId
-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
history <- newTVarIO (reverse timedOutputEvents)
(notifyServerRunning, waitForServerRunning) <- setupServerNotification
let serverSettings =
defaultSettings
& setHost (fromString $ show host)
& setPort (fromIntegral port)
& setOnException (\_ e -> traceWith tracer $ APIConnectionError{reason = show e})
& setOnExceptionResponse (responseLBS status500 [] . show)
& setBeforeMainLoop notifyServerRunning
race_
( do
traceWith tracer (APIServerStarted port)
runSettings serverSettings $
websocketsOr
defaultConnectionOptions
(wsApp party tracer history callback headStatusP snapshotUtxoP responseChannel)
(httpApp tracer chain pparams (getLatest headIdP))
)
( do
waitForServerRunning
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
atomically $ do
update headStatusP output
update snapshotUtxoP output
update headIdP output
writeTChan responseChannel timedOutput
}
)
where
appendToHistory history output = do
time <- getCurrentTime
timedOutput <- atomically $ do
seq <- nextSequenceNumber history
let timedOutput = TimedServerOutput{output, time, seq}
modifyTVar' history (timedOutput :)
pure timedOutput
append timedOutput
pure timedOutput
onIOException ioException =
throwIO
RunServerException
{ ioException
, host
, port
}
-- | An 'IOException' with more 'IP' and 'PortNumber' added as context.
data RunServerException = RunServerException
{ ioException :: IOException
, host :: IP
, port :: PortNumber
}
deriving stock (Show)
instance Exception RunServerException
type NotifyServerRunning = IO ()
type WaitForServer = IO ()
-- | Setup notification and waiter to ensure that something only runs after the
-- server is actually listening.
setupServerNotification :: IO (NotifyServerRunning, WaitForServer)
setupServerNotification = do
mv <- newEmptyMVar
pure (putMVar mv (), takeMVar mv)