Skip to content

Commit

Permalink
xftp: wait for agent in foreground in XFTP workers without delaying s…
Browse files Browse the repository at this point in the history
…uspension (#721)

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
  • Loading branch information
epoberezkin and spaced4ndy committed Apr 13, 2023
1 parent 29eb9e4 commit ed0f8c7
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 41 deletions.
40 changes: 13 additions & 27 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar
runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
agentOperationBracket c AORcvNetwork waitUntilActive runXFTPOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
Expand All @@ -195,10 +195,7 @@ runXFTPRcvWorker c srv doWork = do
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
closeXFTPServerClient c userId server $ bshow rcvChunkId
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AORcvNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AORcvNetwork
atomically $ checkAgentForeground c
loop
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
Expand Down Expand Up @@ -242,7 +239,7 @@ runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m
runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
Expand Down Expand Up @@ -363,7 +360,7 @@ runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () ->
runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
Expand Down Expand Up @@ -417,7 +414,7 @@ runXFTPSndPrepareWorker c doWork = do
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas
createChunk :: Int -> SndFileChunk -> m ()
createChunk numRecipients' ch = do
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
(replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
addXFTPSndWorker c $ Just srv
Expand All @@ -429,12 +426,7 @@ runXFTPSndPrepareWorker c doWork = do
createWithNextSrv usedSrvs
`catchError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
retryLoop loop = do
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
loop
retryLoop loop = atomically (checkAgentForeground c) >> loop
createWithNextSrv usedSrvs = do
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
when deleted $ throwError $ INTERNAL "file deleted, aborting chunk creation"
Expand All @@ -452,7 +444,7 @@ runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar
runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
Expand All @@ -476,18 +468,15 @@ runXFTPSndWorker c srv doWork = do
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
closeXFTPServerClient c userId server $ bshow sndChunkId
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
atomically $ checkAgentForeground c
loop
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, sndChunkId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
fsFilePath <- toFSFilePath filePath
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
agentXFTPUploadChunk c userId sndChunkId replica' chunkSpec'
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
Expand Down Expand Up @@ -613,8 +602,8 @@ runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar
runXFTPDelWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
agentOperationBracket c AOSndNetwork waitUntilActive runXFTPOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
Expand All @@ -637,10 +626,7 @@ runXFTPDelWorker c srv doWork = do
when notifyOnRetry $ notify c "" $ SFERR e
closeXFTPServerClient c userId server replId
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
atomically $ checkAgentForeground c
loop
retryDone e = delWorkerInternalError c deletedSndChunkReplicaId e
deleteChunkReplica :: DeletedSndChunkReplica -> m ()
Expand Down
2 changes: 0 additions & 2 deletions src/Simplex/FileTransfer/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,5 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of
Just _ -> pure a -- throwError $ PCEResponseError HAS_FILE
_ -> pure a

-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender
-- FDEL :: FileCommand Sender
-- FACK :: FileCommand Recipient
-- PING :: FileCommand Recipient
15 changes: 6 additions & 9 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ module Simplex.Messaging.Agent
xftpSendFile,
xftpDeleteSndFileInternal,
xftpDeleteSndFileRemote,
activateAgent,
foregroundAgent,
suspendAgent,
execAgentStoreSQL,
getAgentMigrations,
Expand All @@ -99,7 +99,6 @@ where

import Control.Concurrent.STM (stateTVar)

Check warning on line 100 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

The import of ‘Control.Concurrent.STM’ is redundant

Check warning on line 100 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

The import of ‘Control.Concurrent.STM’ is redundant
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad ((<=<))
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
Expand Down Expand Up @@ -361,11 +360,9 @@ xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c
xftpDeleteSndFileRemote :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m ()
xftpDeleteSndFileRemote c = withAgentEnv c .:. deleteSndFileRemote c

-- TODO rename setAgentForeground

-- | Activate operations
activateAgent :: MonadUnliftIO m => AgentClient -> m ()
activateAgent c = withAgentEnv c $ activateAgent' c
foregroundAgent :: MonadUnliftIO m => AgentClient -> m ()
foregroundAgent c = withAgentEnv c $ foregroundAgent' c

-- | Suspend operations with max delay to deliver pending messages
suspendAgent :: MonadUnliftIO m => AgentClient -> Int -> m ()
Expand Down Expand Up @@ -1541,9 +1538,9 @@ sendNtfConnCommands c cmd = do
setNtfServers' :: AgentMonad' m => AgentClient -> [NtfServer] -> m ()
setNtfServers' c = atomically . writeTVar (ntfServers c)

activateAgent' :: AgentMonad' m => AgentClient -> m ()
activateAgent' c = do
atomically $ writeTVar (agentState c) ASActive
foregroundAgent' :: AgentMonad' m => AgentClient -> m ()
foregroundAgent' c = do
atomically $ writeTVar (agentState c) ASForeground
mapM_ activate $ reverse agentOperations
where
activate opSel = atomically $ modifyTVar' (opSel c) $ \s -> s {opSuspended = False}
Expand Down
14 changes: 12 additions & 2 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ module Simplex.Messaging.Agent.Client
throwWhenNoDelivery,
beginAgentOperation,
endAgentOperation,
waitUntilForeground,
checkAgentForeground,
suspendSendingAndDatabase,
suspendOperation,
notifySuspended,
Expand Down Expand Up @@ -266,7 +268,7 @@ agentOperations = [ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, data

data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int}

data AgentState = ASActive | ASSuspending | ASSuspended -- TODO rename ASActive -> ASForeground
data AgentState = ASForeground | ASSuspending | ASSuspended
deriving (Eq, Show)

data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String}
Expand Down Expand Up @@ -311,7 +313,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
msgDeliveryOp <- newTVar $ AgentOpState False 0
sndNetworkOp <- newTVar $ AgentOpState False 0
databaseOp <- newTVar $ AgentOpState False 0
agentState <- newTVar ASActive
agentState <- newTVar ASForeground
getMsgLocks <- TM.empty
connLocks <- TM.empty
deleteLock <- createLock
Expand Down Expand Up @@ -1212,6 +1214,14 @@ agentOperationBracket c op check action =
(\_ -> atomically $ endAgentOperation c op)
(const action)

waitUntilForeground :: AgentClient -> STM ()
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry

checkAgentForeground :: AgentClient -> STM ()
checkAgentForeground c = do
throwWhenInactive c
waitUntilForeground c

withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
withStore' c action = withStore c $ fmap Right . action

Expand Down
2 changes: 1 addition & 1 deletion tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ testSuspendingAgent = do
5 <- sendMessage a bId SMP.noMsgFlags "hello 2"
get a ##> ("", bId, SENT 5)
Nothing <- 100000 `timeout` get b
activateAgent b
foregroundAgent b
get b =##> \case ("", c, Msg "hello 2") -> c == aId; _ -> False

testSuspendingAgentCompleteSending :: ATransport -> IO ()
Expand Down

0 comments on commit ed0f8c7

Please sign in to comment.