/
ApiServer.hs
169 lines (142 loc) · 5.3 KB
/
ApiServer.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
-- |
-- Module : Pact.Server.ApiServer
-- Copyright : (C) 2016 Stuart Popejoy
-- License : BSD-style (see the file LICENSE)
-- Maintainer : Stuart Popejoy <stuart@kadena.io>
--
-- Servant server for Pact REST API.
--
module Pact.Server.ApiServer
( runApiServer
, ApiEnv(..), aiLog, aiHistoryChan
, sendHandler, pollHandler, listenHandler, localHandler, versionHandler
) where
import Prelude hiding (log)
import Control.Lens
import Control.Concurrent
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BSL8
import qualified Data.Text as T
import Data.Text.Encoding
import Data.HashSet (HashSet)
import qualified Data.HashSet as HashSet
import qualified Data.HashMap.Strict as HM
import Servant
import Network.Wai.Handler.Warp (run)
import Network.Wai.Middleware.Cors
import Pact.Analyze.Remote.Server (verifyHandler)
import Pact.Server.API
import Pact.Types.Command
import Pact.Types.API
import Pact.Types.Server
import Pact.Types.Version
import Pact.Types.Hash
#if !MIN_VERSION_servant(0,16,0)
type ServerError = ServantErr
#endif
data ApiEnv = ApiEnv
{ _aiLog :: String -> IO ()
, _aiHistoryChan :: HistoryChannel
, _aiInboundPactChan :: InboundPactChan
}
makeLenses ''ApiEnv
type Api a = ReaderT ApiEnv (ExceptT ServerError IO) a
runApiServer :: HistoryChannel -> InboundPactChan -> (String -> IO ()) -> Int -> FilePath -> IO ()
runApiServer histChan inbChan logFn port _logDir = do
logFn $ "[api] starting on port " ++ show port
let conf' = ApiEnv logFn histChan inbChan
run port $ cors (const policy) $ serve pactServerAPI (servantServer conf')
where
policy = Just CorsResourcePolicy
{ corsOrigins = Nothing
, corsMethods = ["GET", "POST"]
, corsRequestHeaders = ["authorization", "content-type"]
, corsExposedHeaders = Nothing
, corsMaxAge = Just $ 60*60*24 -- one day
, corsVaryOrigin = False
, corsRequireOrigin = False
, corsIgnoreFailures = False
}
servantServer :: ApiEnv -> Server PactServerAPI
servantServer conf = apiV1Server conf :<|> verifyHandler :<|> versionHandler
apiV1Server :: ApiEnv -> Server ApiV1API
apiV1Server conf = hoistServer apiV1API nt
(sendHandler :<|> pollHandler :<|> listenHandler :<|> localHandler)
where
nt :: forall a. Api a -> Handler a
nt s = Handler $ runReaderT s conf
sendHandler :: SubmitBatch -> Api RequestKeys
sendHandler (SubmitBatch cmds) = do
when (null cmds) $ die' "Empty Batch"
crs <- forM cmds $ \c -> do
cr@(_,Command{..}) <- buildCmdRpc c
return cr
rks <- mapM queueCmds $ group 8000 crs
pure $ RequestKeys $ concat rks
pollHandler :: Poll -> Api PollResponses
pollHandler (Poll rks) = do
log $ "Polling for " ++ show rks
PossiblyIncompleteResults{..} <- checkHistoryForResult (HashSet.fromList rks)
when (HM.null possiblyIncompleteResults) $ log $ "No results found for poll!" ++ show rks
pure $ pollResultToReponse possiblyIncompleteResults
listenHandler :: ListenerRequest -> Api ListenResponse
listenHandler (ListenerRequest rk) = do
hChan <- view aiHistoryChan
m <- liftIO newEmptyMVar
liftIO $ writeHistory hChan $ RegisterListener (HM.fromList [(rk,m)])
log $ "Registered Listener for: " ++ show rk
res <- liftIO $ readMVar m
case res of
GCed msg -> do
log $ "Listener GCed for: " ++ show rk ++ " because " ++ msg
die' msg
ListenerResult cr -> do
log $ "Listener Serviced for: " ++ show rk
pure (ListenResponse cr)
localHandler :: Command T.Text -> Api (CommandResult Hash)
localHandler commandText = do
let (cmd :: Command ByteString) = fmap encodeUtf8 commandText
mv <- liftIO newEmptyMVar
c <- view aiInboundPactChan
liftIO $ writeInbound c (LocalCmd cmd mv)
r <- liftIO $ takeMVar mv
pure r
versionHandler :: Handler T.Text
versionHandler = pure pactVersion
checkHistoryForResult :: (MonadReader ApiEnv m, MonadIO m) => HashSet RequestKey -> m PossiblyIncompleteResults
checkHistoryForResult rks = do
hChan <- view aiHistoryChan
m <- liftIO newEmptyMVar
liftIO $ writeHistory hChan $ QueryForResults (rks,m)
liftIO $ readMVar m
pollResultToReponse :: HM.HashMap RequestKey (CommandResult Hash) -> PollResponses
pollResultToReponse m = PollResponses m
log :: (MonadReader ApiEnv m, MonadIO m) => String -> m ()
log s = view aiLog >>= \f -> liftIO (f $ "[api]: " ++ s)
die' :: String -> Api t
die' str = throwError err404 { errBody = BSL8.pack str }
buildCmdRpc :: (MonadReader ApiEnv m, MonadIO m) => Command T.Text -> m (RequestKey,Command ByteString)
buildCmdRpc c@Command {..} = do
log $ "Processing command with hash: " ++ show _cmdHash
return (cmdToRequestKey c,fmap encodeUtf8 c)
group :: Int -> [a] -> [[a]]
group _ [] = []
group n l
| n > 0 = take n l : group n (drop n l)
| otherwise = error "Negative n"
queueCmds :: (MonadReader ApiEnv m, MonadIO m) => [(RequestKey,Command ByteString)] -> m [RequestKey]
queueCmds rpcs = do
hc <- view aiHistoryChan
liftIO $ writeHistory hc $ AddNew (map snd rpcs)
return $ fst <$> rpcs