diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index ae3fd1772f..f92c97a506 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -15,9 +15,7 @@ module Simplex.Messaging.Notifications.Server where import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad.Except -import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader -import Crypto.Random (MonadRandom) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) @@ -57,15 +55,17 @@ import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.STM -runNtfServer :: (MonadRandom m, MonadUnliftIO m) => NtfServerConfig -> m () +runNtfServer :: NtfServerConfig -> IO () runNtfServer cfg = do started <- newEmptyTMVarIO runNtfServerBlocking started cfg -runNtfServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> NtfServerConfig -> m () +runNtfServerBlocking :: TMVar Bool -> NtfServerConfig -> IO () runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg -ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfig -> TMVar Bool -> m () +type M a = ReaderT NtfEnv IO a + +ntfServer :: NtfServerConfig -> TMVar Bool -> M () ntfServer cfg@NtfServerConfig {transports} started = do restoreServerStats s <- asks subscriber @@ -74,30 +74,30 @@ ntfServer cfg@NtfServerConfig {transports} started = do void . forkIO $ resubscribe s subs raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports <> serverStatsThread_ cfg) `finally` stopServer where - runServer :: (ServiceName, ATransport) -> m () + runServer :: (ServiceName, ATransport) -> M () runServer (tcpPort, ATransport t) = do serverParams <- asks tlsServerParams runTransportServer started tcpPort serverParams (runClient t) - runClient :: Transport c => TProxy c -> c -> m () + runClient :: Transport c => TProxy c -> c -> M () runClient _ h = do kh <- asks serverIdentity liftIO (runExceptT $ ntfServerHandshake h kh supportedNTFServerVRange) >>= \case Right th -> runNtfClientTransport th Left _ -> pure () - stopServer :: m () + stopServer :: M () stopServer = do withNtfLog closeStoreLog saveServerStats asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread)) - serverStatsThread_ :: NtfServerConfig -> [m ()] + serverStatsThread_ :: NtfServerConfig -> [M ()] serverStatsThread_ NtfServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int -> Int -> FilePath -> m () + logServerStats :: Int -> Int -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath @@ -138,7 +138,7 @@ ntfServer cfg@NtfServerConfig {transports} started = do ] threadDelay interval -resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m () +resubscribe :: NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> M () resubscribe NtfSubscriber {newSubQ} subs = do d <- asks $ resubscribeDelay . config forM_ subs $ \sub@NtfSubData {} -> @@ -147,11 +147,11 @@ resubscribe NtfSubscriber {newSubQ} subs = do threadDelay d liftIO $ logInfo "SMP connections resubscribed" -ntfSubscriber :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> m () +ntfSubscriber :: NtfSubscriber -> M () ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do raceAny_ [subscribe, receiveSMP, receiveAgent] where - subscribe :: m () + subscribe :: M () subscribe = forever $ atomically (readTBQueue newSubQ) >>= \case @@ -159,7 +159,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber smpServer atomically $ writeTQueue subscriberSubQ sub - getSMPSubscriber :: SMPServer -> m SMPSubscriber + getSMPSubscriber :: SMPServer -> M SMPSubscriber getSMPSubscriber smpServer = atomically (TM.lookup smpServer smpSubscribers) >>= maybe createSMPSubscriber pure where @@ -170,7 +170,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge atomically . writeTVar subThreadId $ Just tId pure sub - runSMPSubscriber :: SMPSubscriber -> m () + runSMPSubscriber :: SMPSubscriber -> M () runSMPSubscriber SMPSubscriber {newSubQ = subscriberSubQ} = forever $ atomically (peekTQueue subscriberSubQ) @@ -188,7 +188,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge PCENetworkError -> pure () _ -> void . atomically $ readTQueue subscriberSubQ - receiveSMP :: m () + receiveSMP :: M () receiveSMP = forever $ do (srv, _, _, ntfId, msg) <- atomically $ readTBQueue msgQ let smpQueue = SMPQueueNtf srv ntfId @@ -227,7 +227,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge where showServer' = decodeLatin1 . strEncode . host - handleSubError :: SMPQueueNtf -> ProtocolClientError -> m () + handleSubError :: SMPQueueNtf -> ProtocolClientError -> M () handleSubError smpQueue = \case PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth PCEProtocolError e -> updateErr "SMP error " e @@ -240,7 +240,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge PCEResponseTimeout -> pure () PCENetworkError -> pure () where - updateErr :: Show e => ByteString -> e -> m () + updateErr :: Show e => ByteString -> e -> M () updateErr errType e = updateSubStatus smpQueue . NSErr $ errType <> bshow e updateSubStatus smpQueue status = do @@ -252,7 +252,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge withNtfLog $ \sl -> logSubscriptionStatus sl ntfSubId status ) -ntfPush :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfPushServer -> m () +ntfPush :: NtfPushServer -> M () ntfPush s@NtfPushServer {pushQ} = forever $ do (tkn@NtfTknData {ntfTknId, token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) @@ -275,7 +275,7 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do _ -> liftIO $ logError "bad notification token status" where - deliverNotification :: PushProvider -> NtfTknData -> PushNotification -> m (Either PushProviderError ()) + deliverNotification :: PushProvider -> NtfTknData -> PushNotification -> M (Either PushProviderError ()) deliverNotification pp tkn@NtfTknData {ntfTknId, tknStatus} ntf = do deliver <- liftIO $ getPushClient s pp liftIO (runExceptT $ deliver tkn ntf) >>= \case @@ -289,17 +289,17 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do PPTokenInvalid -> updateTknStatus NTInvalid >> err e PPPermanentError -> err e where - retryDeliver :: m (Either PushProviderError ()) + retryDeliver :: M (Either PushProviderError ()) retryDeliver = do deliver <- liftIO $ newPushClient s pp liftIO (runExceptT $ deliver tkn ntf) >>= either err (pure . Right) - updateTknStatus :: NtfTknStatus -> m () + updateTknStatus :: NtfTknStatus -> M () updateTknStatus status = do atomically $ writeTVar tknStatus status withNtfLog $ \sl -> logTokenStatus sl ntfTknId status err e = logError (T.pack $ "Push provider error (" <> show pp <> "): " <> show e) $> Left e -runNtfClientTransport :: (Transport c, MonadUnliftIO m, MonadReader NtfEnv m) => THandle c -> m () +runNtfClientTransport :: Transport c => THandle c -> M () runNtfClientTransport th@THandle {sessionId} = do qSize <- asks $ clientQSize . config ts <- liftIO getSystemTime @@ -307,15 +307,15 @@ runNtfClientTransport th@THandle {sessionId} = do s <- asks subscriber ps <- asks pushServer expCfg <- asks $ inactiveClientExpiration . config - raceAny_ ([send th c, client c s ps, receive th c] <> disconnectThread_ c expCfg) - `finally` clientDisconnected c + raceAny_ ([liftIO $ send th c, client c s ps, receive th c] <> disconnectThread_ c expCfg) + `finally` liftIO (clientDisconnected c) where - disconnectThread_ c expCfg = maybe [] ((: []) . disconnectTransport th c activeAt) expCfg + disconnectThread_ c expCfg = maybe [] ((: []) . liftIO . disconnectTransport th c activeAt) expCfg -clientDisconnected :: MonadUnliftIO m => NtfServerClient -> m () +clientDisconnected :: NtfServerClient -> IO () clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connected False -receive :: (Transport c, MonadUnliftIO m, MonadReader NtfEnv m) => THandle c -> NtfServerClient -> m () +receive :: Transport c => THandle c -> NtfServerClient -> M () receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do ts <- tGet th forM_ ts $ \t@(_, _, (corrId, entId, cmdOrError)) -> do @@ -330,7 +330,7 @@ receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do where write q t = atomically $ writeTBQueue q t -send :: (Transport c, MonadUnliftIO m) => THandle c -> NtfServerClient -> m () +send :: Transport c => THandle c -> NtfServerClient -> IO () send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, activeAt} = forever $ do t <- atomically $ readTBQueue sndQ void . liftIO $ tPut h [(Nothing, encodeTransmission v sessionId t)] @@ -341,8 +341,7 @@ send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, activeAt} = for data VerificationResult = VRVerified NtfRequest | VRFailed -verifyNtfTransmission :: - forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => SignedTransmission NtfCmd -> NtfCmd -> m VerificationResult +verifyNtfTransmission :: SignedTransmission NtfCmd -> NtfCmd -> M VerificationResult verifyNtfTransmission (sig_, signed, (corrId, entId, _)) cmd = do st <- asks store case cmd of @@ -384,7 +383,7 @@ verifyNtfTransmission (sig_, signed, (corrId, entId, _)) cmd = do where verifiedTknCmd t c = VRVerified (NtfReqCmd SToken (NtfTkn t) (corrId, entId, c)) verifiedSubCmd s c = VRVerified (NtfReqCmd SSubscription (NtfSub s) (corrId, entId, c)) - verifyToken :: Maybe NtfTknData -> (NtfTknData -> VerificationResult) -> m VerificationResult + verifyToken :: Maybe NtfTknData -> (NtfTknData -> VerificationResult) -> M VerificationResult verifyToken t_ positiveVerificationResult = pure $ case t_ of Just t@NtfTknData {tknVerifyKey} -> @@ -392,17 +391,17 @@ verifyNtfTransmission (sig_, signed, (corrId, entId, _)) cmd = do then positiveVerificationResult t else VRFailed _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed - verifyToken' :: Maybe NtfTknData -> VerificationResult -> m VerificationResult + verifyToken' :: Maybe NtfTknData -> VerificationResult -> M VerificationResult verifyToken' t_ = verifyToken t_ . const -client :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerClient -> NtfSubscriber -> NtfPushServer -> m () +client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ, intervalNotifiers} = forever $ atomically (readTBQueue rcvQ) >>= processCommand >>= atomically . writeTBQueue sndQ where - processCommand :: NtfRequest -> m (Transmission NtfResponse) + processCommand :: NtfRequest -> M (Transmission NtfResponse) processCommand = \case NtfReqNew corrId (ANE SToken newTkn@(NewNtfTkn _ _ dhPubKey)) -> do logDebug "TNEW - new token" @@ -531,28 +530,28 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu incNtfStat subDeleted pure NROk PING -> pure NRPong - getId :: m NtfEntityId + getId :: M NtfEntityId getId = getRandomBytes =<< asks (subIdBytes . config) - getRegCode :: m NtfRegCode + getRegCode :: M NtfRegCode getRegCode = NtfRegCode <$> (getRandomBytes =<< asks (regCodeBytes . config)) - getRandomBytes :: Int -> m ByteString + getRandomBytes :: Int -> M ByteString getRandomBytes n = do gVar <- asks idsDrg atomically (C.pseudoRandomBytes n gVar) - cancelInvervalNotifications :: NtfTokenId -> m () + cancelInvervalNotifications :: NtfTokenId -> M () cancelInvervalNotifications tknId = atomically (TM.lookupDelete tknId intervalNotifiers) >>= mapM_ (uninterruptibleCancel . action) -withNtfLog :: (MonadUnliftIO m, MonadReader NtfEnv m) => (StoreLog 'WriteMode -> IO a) -> m () +withNtfLog :: (StoreLog 'WriteMode -> IO a) -> M () withNtfLog action = liftIO . mapM_ action =<< asks storeLog -incNtfStat :: (MonadUnliftIO m, MonadReader NtfEnv m) => (NtfServerStats -> TVar Int) -> m () +incNtfStat :: (NtfServerStats -> TVar Int) -> M () incNtfStat statSel = do stats <- asks serverStats atomically $ modifyTVar (statSel stats) (+ 1) -saveServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m () +saveServerStats :: M () saveServerStats = asks (serverStatsBackupFile . config) >>= mapM_ (\f -> asks serverStats >>= atomically . getNtfServerStatsData >>= liftIO . saveStats f) @@ -562,7 +561,7 @@ saveServerStats = B.writeFile f $ strEncode stats logInfo "server stats saved" -restoreServerStats :: (MonadUnliftIO m, MonadReader NtfEnv m) => m () +restoreServerStats :: M () restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats where restoreStats f = whenM (doesFileExist f) $ do diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 8bdafaf043..f65a9b69cd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -87,7 +87,7 @@ import UnliftIO.STM -- | Runs an SMP server using passed configuration. -- -- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs -runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServerConfig -> m () +runSMPServer :: ServerConfig -> IO () runSMPServer cfg = do started <- newEmptyTMVarIO runSMPServerBlocking started cfg @@ -96,10 +96,12 @@ runSMPServer cfg = do -- -- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True) -- and when it is disconnected from the TCP socket once the server thread is killed (False). -runSMPServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> ServerConfig -> m () +runSMPServerBlocking :: TMVar Bool -> ServerConfig -> IO () runSMPServerBlocking started cfg = newEnv cfg >>= runReaderT (smpServer started) -smpServer :: forall m. (MonadUnliftIO m, MonadReader Env m) => TMVar Bool -> m () +type M a = ReaderT Env IO a + +smpServer :: TMVar Bool -> M () smpServer started = do s <- asks server cfg@ServerConfig {transports} <- asks config @@ -112,7 +114,7 @@ smpServer started = do ) `finally` (withLog closeStoreLog >> saveServerMessages >> saveServerStats) where - runServer :: (ServiceName, ATransport) -> m () + runServer :: (ServiceName, ATransport) -> M () runServer (tcpPort, ATransport t) = do serverParams <- asks tlsServerParams runTransportServer started tcpPort serverParams (runClient t) @@ -123,12 +125,12 @@ smpServer started = do (Server -> TBQueue (QueueId, Client)) -> (Server -> TMap QueueId Client) -> (Client -> TMap QueueId s) -> - (s -> m ()) -> - m () + (s -> IO ()) -> + M () serverThread s subQ subs clientSubs unsub = forever $ do atomically updateSubscribers $>>= endPreviousSubscriptions - >>= mapM_ unsub + >>= liftIO . mapM_ unsub where updateSubscribers :: STM (Maybe (QueueId, Client)) updateSubscribers = do @@ -140,17 +142,17 @@ smpServer started = do yes <- readTVar $ connected c' pure $ if yes then Just (qId, c') else Nothing TM.lookupInsert qId clnt (subs s) $>>= clientToBeNotified - endPreviousSubscriptions :: (QueueId, Client) -> m (Maybe s) + endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s) endPreviousSubscriptions (qId, c) = do void . forkIO . atomically $ writeTBQueue (sndQ c) [(CorrId "", qId, END)] atomically $ TM.lookupDelete qId (clientSubs c) - expireMessagesThread_ :: ServerConfig -> [m ()] + expireMessagesThread_ :: ServerConfig -> [M ()] expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessages msgExp] expireMessagesThread_ _ = [] - expireMessages :: ExpirationConfig -> m () + expireMessages :: ExpirationConfig -> M () expireMessages expCfg = do ms <- asks msgStore quota <- asks $ msgQueueQuota . config @@ -163,12 +165,12 @@ smpServer started = do atomically (getMsgQueue ms rId quota) >>= atomically . (`deleteExpiredMsgs` old) - serverStatsThread_ :: ServerConfig -> [m ()] + serverStatsThread_ :: ServerConfig -> [M ()] serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int -> Int -> FilePath -> m () + logServerStats :: Int -> Int -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath @@ -189,7 +191,7 @@ smpServer started = do hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps] threadDelay interval - runClient :: Transport c => TProxy c -> c -> m () + runClient :: Transport c => TProxy c -> c -> M () runClient _ h = do kh <- asks serverIdentity smpVRange <- asks $ smpServerVRange . config @@ -197,24 +199,24 @@ smpServer started = do Right th -> runClientTransport th Left _ -> pure () -runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m () +runClientTransport :: Transport c => THandle c -> M () runClientTransport th@THandle {thVersion, sessionId} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime c <- atomically $ newClient q thVersion sessionId ts s <- asks server expCfg <- asks $ inactiveClientExpiration . config - raceAny_ ([send th c, client c s, receive th c] <> disconnectThread_ c expCfg) + raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg) `finally` clientDisconnected c where - disconnectThread_ c (Just expCfg) = [disconnectTransport th c activeAt expCfg] + disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th c activeAt expCfg] disconnectThread_ _ _ = [] -clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m () +clientDisconnected :: Client -> M () clientDisconnected c@Client {subscriptions, connected} = do atomically $ writeTVar connected False subs <- readTVarIO subscriptions - mapM_ cancelSub subs + liftIO $ mapM_ cancelSub subs atomically $ writeTVar subscriptions M.empty cs <- asks $ subscribers . server atomically . mapM_ (\rId -> TM.update deleteCurrentClient rId cs) $ M.keys subs @@ -227,13 +229,13 @@ clientDisconnected c@Client {subscriptions, connected} = do sameClientSession :: Client -> Client -> Bool sameClientSession Client {sessionId} Client {sessionId = s'} = sessionId == s' -cancelSub :: MonadUnliftIO m => TVar Sub -> m () +cancelSub :: TVar Sub -> IO () cancelSub sub = readTVarIO sub >>= \case Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread _ -> return () -receive :: forall c m. (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m () +receive :: Transport c => THandle c -> Client -> M () receive th Client {rcvQ, sndQ, activeAt} = forever $ do ts <- L.toList <$> tGet th atomically . writeTVar activeAt =<< liftIO getSystemTime @@ -241,7 +243,7 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do write sndQ $ fst as write rcvQ $ snd as where - cmdAction :: SignedTransmission Cmd -> m (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) + cmdAction :: SignedTransmission Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) cmdAction (sig, signed, (corrId, queueId, cmdOrError)) = case cmdOrError of Left e -> pure $ Left (corrId, queueId, ERR e) @@ -252,7 +254,7 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do VRFailed -> Left (corrId, queueId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty -send :: (Transport c, MonadUnliftIO m) => THandle c -> Client -> m () +send :: Transport c => THandle c -> Client -> IO () send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ -- TODO the line below can return Lefts, but we ignore it and do not disconnect the client @@ -264,7 +266,7 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do MSG {} -> 0 _ -> 1 -disconnectTransport :: (Transport c, MonadUnliftIO m) => THandle c -> client -> (client -> TVar SystemTime) -> ExpirationConfig -> m () +disconnectTransport :: Transport c => THandle c -> client -> (client -> TVar SystemTime) -> ExpirationConfig -> IO () disconnectTransport THandle {connection} c activeAt expCfg = do let interval = checkInterval expCfg * 1000000 forever . liftIO $ do @@ -275,8 +277,7 @@ disconnectTransport THandle {connection} c activeAt expCfg = do data VerificationResult = VRVerified (Maybe QueueRec) | VRFailed -verifyTransmission :: - forall m. (MonadUnliftIO m, MonadReader Env m) => Maybe C.ASignature -> ByteString -> QueueId -> Cmd -> m VerificationResult +verifyTransmission :: Maybe C.ASignature -> ByteString -> QueueId -> Cmd -> M VerificationResult verifyTransmission sig_ signed queueId cmd = do case cmd of Cmd SRecipient (NEW k _) -> pure $ Nothing `verified` verifyCmdSignature sig_ signed k @@ -285,7 +286,7 @@ verifyTransmission sig_ signed queueId cmd = do Cmd SSender PING -> pure $ VRVerified Nothing Cmd SNotifier NSUB -> verifyCmd SNotifier $ verifyMaybe . fmap notifierKey . notifier where - verifyCmd :: SParty p -> (QueueRec -> Bool) -> m VerificationResult + verifyCmd :: SParty p -> (QueueRec -> Bool) -> M VerificationResult verifyCmd party f = do st <- asks queueStore q_ <- atomically (getQueue st party queueId) diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index c0f715362a..42a31db54c 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -18,7 +18,6 @@ module NtfClient where import Control.Monad import Control.Monad.Except (runExceptT) import Control.Monad.IO.Unlift -import Crypto.Random import Data.Aeson (FromJSON (..), ToJSON (..), (.:)) import qualified Data.Aeson as J import qualified Data.Aeson.Types as JT @@ -105,13 +104,13 @@ ntfServerCfg = serverStatsBackupFile = Nothing } -withNtfServerStoreLog :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ThreadId -> m a) -> m a +withNtfServerStoreLog :: ATransport -> (ThreadId -> IO a) -> IO a withNtfServerStoreLog t = withNtfServerCfg t ntfServerCfg {storeLogFile = Just ntfTestStoreLogFile} -withNtfServerThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a +withNtfServerThreadOn :: ATransport -> ServiceName -> (ThreadId -> IO a) -> IO a withNtfServerThreadOn t port' = withNtfServerCfg t ntfServerCfg {transports = [(port', t)]} -withNtfServerCfg :: (MonadUnliftIO m, MonadRandom m) => ATransport -> NtfServerConfig -> (ThreadId -> m a) -> m a +withNtfServerCfg :: ATransport -> NtfServerConfig -> (ThreadId -> IO a) -> IO a withNtfServerCfg t cfg = serverBracket (\started -> runNtfServerBlocking started cfg {transports = [(ntfTestPort, t)]}) @@ -130,13 +129,13 @@ serverBracket process afterProcess f = do Nothing -> error $ "server did not " <> s _ -> pure () -withNtfServerOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> m a -> m a +withNtfServerOn :: ATransport -> ServiceName -> IO a -> IO a withNtfServerOn t port' = withNtfServerThreadOn t port' . const -withNtfServer :: (MonadUnliftIO m, MonadRandom m) => ATransport -> m a -> m a +withNtfServer :: ATransport -> IO a -> IO a withNtfServer t = withNtfServerOn t ntfTestPort -runNtfTest :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => (THandle c -> m a) -> m a +runNtfTest :: forall c a. Transport c => (THandle c -> IO a) -> IO a runNtfTest test = withNtfServer (transport @c) $ testNtfClient test ntfServerTest :: diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 322c669212..88525182a4 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -67,12 +67,12 @@ smpAgentTest _ cmd = runSmpAgentTest $ \(h :: c) -> tPutRaw h cmd >> get h Right (ACmd SAgent DISCONNECT {}) -> get h _ -> pure t -runSmpAgentTest :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => (c -> m a) -> m a +runSmpAgentTest :: forall c a. Transport c => (c -> IO a) -> IO a runSmpAgentTest test = withSmpServer t . withSmpAgent t $ testSMPAgentClient test where t = transport @c -runSmpAgentServerTest :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => ((ThreadId, ThreadId) -> c -> m a) -> m a +runSmpAgentServerTest :: forall c a. Transport c => ((ThreadId, ThreadId) -> c -> IO a) -> IO a runSmpAgentServerTest test = withSmpServerThreadOn t testPort $ \server -> withSmpAgentThreadOn t (agentTestPort, testPort, testDB) $ @@ -83,18 +83,18 @@ runSmpAgentServerTest test = smpAgentServerTest :: Transport c => ((ThreadId, ThreadId) -> c -> IO ()) -> Expectation smpAgentServerTest test' = runSmpAgentServerTest test' `shouldReturn` () -runSmpAgentTestN :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => [(ServiceName, ServiceName, AgentDatabase)] -> ([c] -> m a) -> m a +runSmpAgentTestN :: forall c a. Transport c => [(ServiceName, ServiceName, AgentDatabase)] -> ([c] -> IO a) -> IO a runSmpAgentTestN agents test = withSmpServer t $ run agents [] where - run :: [(ServiceName, ServiceName, AgentDatabase)] -> [c] -> m a + run :: [(ServiceName, ServiceName, AgentDatabase)] -> [c] -> IO a run [] hs = test hs run (a@(p, _, _) : as) hs = withSmpAgentOn t a $ testSMPAgentClientOn p $ \h -> run as (h : hs) t = transport @c -runSmpAgentTestN_1 :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => Int -> ([c] -> m a) -> m a +runSmpAgentTestN_1 :: forall c a. Transport c => Int -> ([c] -> IO a) -> IO a runSmpAgentTestN_1 nClients test = withSmpServer t . withSmpAgent t $ run nClients [] where - run :: Int -> [c] -> m a + run :: Int -> [c] -> IO a run 0 hs = test hs run n hs = testSMPAgentClient $ \h -> run (n - 1) (h : hs) t = transport @c diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 192dda1c84..5e631c6d81 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -12,7 +12,6 @@ module SMPClient where import Control.Monad.Except (runExceptT) import Control.Monad.IO.Unlift -import Crypto.Random import Data.ByteString.Char8 (ByteString) import Data.List.NonEmpty (NonEmpty) import Network.Socket @@ -91,22 +90,22 @@ cfg = smpServerVRange = supportedSMPServerVRange } -withSmpServerStoreMsgLogOnV2 :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a +withSmpServerStoreMsgLogOnV2 :: ATransport -> ServiceName -> (ThreadId -> IO a) -> IO a withSmpServerStoreMsgLogOnV2 t = withSmpServerConfigOn t cfgV2 {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile} -withSmpServerStoreMsgLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a +withSmpServerStoreMsgLogOn :: ATransport -> ServiceName -> (ThreadId -> IO a) -> IO a withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsBackupFile = Just testServerStatsBackupFile} -withSmpServerStoreLogOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a +withSmpServerStoreLogOn :: ATransport -> ServiceName -> (ThreadId -> IO a) -> IO a withSmpServerStoreLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, serverStatsBackupFile = Just testServerStatsBackupFile} -withSmpServerConfigOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServerConfig -> ServiceName -> (ThreadId -> m a) -> m a +withSmpServerConfigOn :: ATransport -> ServerConfig -> ServiceName -> (ThreadId -> IO a) -> IO a withSmpServerConfigOn t cfg' port' = serverBracket (\started -> runSMPServerBlocking started cfg' {transports = [(port', t)]}) (pure ()) -withSmpServerThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a +withSmpServerThreadOn :: ATransport -> ServiceName -> (ThreadId -> IO a) -> IO a withSmpServerThreadOn t = withSmpServerConfigOn t cfg serverBracket :: MonadUnliftIO m => (TMVar Bool -> m ()) -> m () -> (ThreadId -> m a) -> m a @@ -122,19 +121,19 @@ serverBracket process afterProcess f = do Nothing -> error $ "server did not " <> s _ -> pure () -withSmpServerOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> m a -> m a +withSmpServerOn :: ATransport -> ServiceName -> IO a -> IO a withSmpServerOn t port' = withSmpServerThreadOn t port' . const -withSmpServer :: (MonadUnliftIO m, MonadRandom m) => ATransport -> m a -> m a +withSmpServer :: ATransport -> IO a -> IO a withSmpServer t = withSmpServerOn t testPort -runSmpTest :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => (THandle c -> m a) -> m a +runSmpTest :: forall c a. Transport c => (THandle c -> IO a) -> IO a runSmpTest test = withSmpServer (transport @c) $ testSMPClient test -runSmpTestN :: forall c m a. (Transport c, MonadUnliftIO m, MonadRandom m, MonadFail m) => Int -> ([THandle c] -> m a) -> m a +runSmpTestN :: forall c a. Transport c => Int -> ([THandle c] -> IO a) -> IO a runSmpTestN nClients test = withSmpServer (transport @c) $ run nClients [] where - run :: Int -> [THandle c] -> m a + run :: Int -> [THandle c] -> IO a run 0 hs = test hs run n hs = testSMPClient $ \h -> run (n - 1) (h : hs)