Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xftp: wait for agent in foreground in XFTP workers without delaying suspension #721

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 10 additions & 21 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
where

import Control.Concurrent.STM (stateTVar)

Check warning on line 27 in src/Simplex/FileTransfer/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

The import of ‘Control.Concurrent.STM’ is redundant

Check warning on line 27 in src/Simplex/FileTransfer/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

The import of ‘Control.Concurrent.STM’ is redundant
import Control.Logger.Simple (logError)
import Control.Monad
import Control.Monad.Except
Expand Down Expand Up @@ -165,8 +165,8 @@
runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
agentOperationBracket c AORcvNetwork waitUntilActive runXFTPOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
Expand All @@ -189,10 +189,7 @@
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
closeXFTPServerClient c userId server replicaId
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AORcvNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AORcvNetwork
atomically $ checkAgentForeground c
loop
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
Expand Down Expand Up @@ -236,7 +233,7 @@
runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
Expand Down Expand Up @@ -357,7 +354,7 @@
runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
Expand Down Expand Up @@ -411,7 +408,7 @@
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas
createChunk :: Int -> SndFileChunk -> m ()
createChunk numRecipients' ch = do
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
(replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
addXFTPSndWorker c $ Just srv
Expand All @@ -423,12 +420,7 @@
createWithNextSrv usedSrvs
`catchError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
retryLoop loop = do
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
loop
retryLoop loop = atomically (checkAgentForeground c) >> loop
createWithNextSrv usedSrvs = do
withNextSrv c userId usedSrvs [] $ \srvAuth -> do
replica <- agentXFTPNewChunk c ch numRecipients' srvAuth
Expand All @@ -444,7 +436,7 @@
runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
Expand All @@ -468,18 +460,15 @@
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
closeXFTPServerClient c userId server replicaId
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
-- TODO waitUntilNotSuspended
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
atomically $ checkAgentForeground c
loop
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, sndChunkId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
fsFilePath <- toFSFilePath filePath
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
-- TODO waitUntilNotSuspended
atomically $ checkAgentForeground c
agentXFTPUploadChunk c userId sndChunkId replica' chunkSpec'
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
Expand Down
2 changes: 0 additions & 2 deletions src/Simplex/FileTransfer/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,5 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of
Just _ -> pure a -- throwError $ PCEResponseError HAS_FILE
_ -> pure a

-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender
-- FDEL :: FileCommand Sender
-- FACK :: FileCommand Recipient
-- PING :: FileCommand Recipient
15 changes: 6 additions & 9 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
xftpDeleteRcvFile,
xftpSendFile,
xftpDeleteSndFileInternal,
activateAgent,
foregroundAgent,
suspendAgent,
execAgentStoreSQL,
getAgentMigrations,
Expand All @@ -96,9 +96,8 @@
)
where

import Control.Concurrent.STM (stateTVar)

Check warning on line 99 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

The import of ‘Control.Concurrent.STM’ is redundant

Check warning on line 99 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

The import of ‘Control.Concurrent.STM’ is redundant
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad ((<=<))
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
Expand Down Expand Up @@ -356,11 +355,9 @@
xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> m ()
xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c

-- TODO rename setAgentForeground

-- | Activate operations
activateAgent :: MonadUnliftIO m => AgentClient -> m ()
activateAgent c = withAgentEnv c $ activateAgent' c
foregroundAgent :: MonadUnliftIO m => AgentClient -> m ()
foregroundAgent c = withAgentEnv c $ foregroundAgent' c

-- | Suspend operations with max delay to deliver pending messages
suspendAgent :: MonadUnliftIO m => AgentClient -> Int -> m ()
Expand Down Expand Up @@ -1536,9 +1533,9 @@
setNtfServers' :: AgentMonad' m => AgentClient -> [NtfServer] -> m ()
setNtfServers' c = atomically . writeTVar (ntfServers c)

activateAgent' :: AgentMonad' m => AgentClient -> m ()
activateAgent' c = do
atomically $ writeTVar (agentState c) ASActive
foregroundAgent' :: AgentMonad' m => AgentClient -> m ()
foregroundAgent' c = do
atomically $ writeTVar (agentState c) ASForeground
mapM_ activate $ reverse agentOperations
where
activate opSel = atomically $ modifyTVar' (opSel c) $ \s -> s {opSuspended = False}
Expand Down
14 changes: 12 additions & 2 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
throwWhenNoDelivery,
beginAgentOperation,
endAgentOperation,
waitUntilForeground,
checkAgentForeground,
suspendSendingAndDatabase,
suspendOperation,
notifySuspended,
Expand All @@ -98,7 +100,7 @@
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry, stateTVar, throwSTM)

Check warning on line 103 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

The import of ‘stateTVar’

Check warning on line 103 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

The import of ‘stateTVar’
import Control.Exception (AsyncException (..))
import Control.Logger.Simple
import Control.Monad.Except
Expand Down Expand Up @@ -265,7 +267,7 @@

data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int}

data AgentState = ASActive | ASSuspending | ASSuspended -- TODO rename ASActive -> ASForeground
data AgentState = ASForeground | ASSuspending | ASSuspended
deriving (Eq, Show)

data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String}
Expand Down Expand Up @@ -310,7 +312,7 @@
msgDeliveryOp <- newTVar $ AgentOpState False 0
sndNetworkOp <- newTVar $ AgentOpState False 0
databaseOp <- newTVar $ AgentOpState False 0
agentState <- newTVar ASActive
agentState <- newTVar ASForeground
getMsgLocks <- TM.empty
connLocks <- TM.empty
deleteLock <- createLock
Expand Down Expand Up @@ -1207,6 +1209,14 @@
(\_ -> atomically $ endAgentOperation c op)
(const action)

waitUntilForeground :: AgentClient -> STM ()
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry

checkAgentForeground :: AgentClient -> STM ()
checkAgentForeground c = do
throwWhenInactive c
epoberezkin marked this conversation as resolved.
Show resolved Hide resolved
waitUntilForeground c

withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
withStore' c action = withStore c $ fmap Right . action

Expand Down Expand Up @@ -1255,7 +1265,7 @@
where
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}

userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))

Check warning on line 1268 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04

• Redundant constraint: UserProtocol p

Check warning on line 1268 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04

• Redundant constraint: UserProtocol p
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
Expand Down
2 changes: 1 addition & 1 deletion tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ testSuspendingAgent = do
5 <- sendMessage a bId SMP.noMsgFlags "hello 2"
get a ##> ("", bId, SENT 5)
Nothing <- 100000 `timeout` get b
activateAgent b
foregroundAgent b
get b =##> \case ("", c, Msg "hello 2") -> c == aId; _ -> False

testSuspendingAgentCompleteSending :: ATransport -> IO ()
Expand Down