Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: expire chat items for all users #1737

Merged
merged 2 commits into from Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 63 additions & 48 deletions src/Simplex/Chat.hs
Expand Up @@ -155,12 +155,12 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen
filesFolder <- newTVarIO optFilesFolder
incognitoMode <- newTVarIO False
chatStoreChanged <- newTVarIO False
expireCIsAsync <- newTVarIO Nothing
expireCIs <- newTVarIO False
expireCIThreads <- newTVarIO M.empty
expireCIFlags <- newTVarIO M.empty
cleanupManagerAsync <- newTVarIO Nothing
timedItemThreads <- atomically TM.empty
showLiveItems <- newTVarIO False
pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIsAsync, expireCIs, cleanupManagerAsync, timedItemThreads, showLiveItems}
pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIThreads, expireCIFlags, cleanupManagerAsync, timedItemThreads, showLiveItems}
where
configServers :: DefaultAgentServers
configServers =
Expand All @@ -187,45 +187,47 @@ activeAgentServers ChatConfig {defaultServers = DefaultAgentServers {smp}} =
. filter (\ServerCfg {enabled} -> enabled)

startChatController :: (MonadUnliftIO m, MonadReader ChatController m) => User -> Bool -> Bool -> m (Async ())
startChatController user subConns enableExpireCIs = do
startChatController currentUser subConns enableExpireCIs = do
asks smpAgent >>= resumeAgentClient
restoreCalls user
users <- fromRight [] <$> runExceptT (withStore' getUsers)
restoreCalls currentUser
s <- asks agentAsync
readTVarIO s >>= maybe (start s) (pure . fst)
readTVarIO s >>= maybe (start s users) (pure . fst)
where
start s = do
start s users = do
a1 <- async $ race_ notificationSubscriber agentSubscriber
a2 <-
if subConns
then Just <$> async (void . runExceptT $ subscribeUserConnections Agent.subscribeConnections user)
then Just <$> async (void . runExceptT $ subscribeUserConnections Agent.subscribeConnections currentUser)
else pure Nothing
atomically . writeTVar s $ Just (a1, a2)
startCleanupManager
when enableExpireCIs startExpireCIs
when enableExpireCIs $ startExpireCIs users
pure a1
startCleanupManager = do
cleanupAsync <- asks cleanupManagerAsync
readTVarIO cleanupAsync >>= \case
Nothing -> do
a <- Just <$> async (void . runExceptT $ cleanupManager user)
a <- Just <$> async (void . runExceptT $ cleanupManager currentUser)
atomically $ writeTVar cleanupAsync a
_ -> pure ()
startExpireCIs = do
expireAsync <- asks expireCIsAsync
readTVarIO expireAsync >>= \case
Nothing -> do
a <- Just <$> async (void $ runExceptT runExpireCIs)
atomically $ writeTVar expireAsync a
setExpireCIs True
_ -> setExpireCIs True
runExpireCIs = forever $ do
-- TODO per user
flip catchError (toView . CRChatError (Just user)) $ do
expire <- asks expireCIs
atomically $ readTVar expire >>= \b -> unless b retry
ttl <- withStore' (`getChatItemTTL` user)
forM_ ttl $ \t -> expireChatItems user t False
threadDelay $ 1800 * 1000000 -- 30 minutes
startExpireCIs users = do
expireThreads <- asks expireCIThreads
forM_ users $ \u@User {userId} ->
atomically (TM.lookup userId expireThreads) >>= \case
Nothing -> do
a <- Just <$> async (void . runExceptT $ runExpireCIs u)
atomically $ TM.insert userId a expireThreads
setExpireCIFlag u True
_ -> setExpireCIFlag u True
where
runExpireCIs u@User {userId} = forever $ do
flip catchError (toView . CRChatError (Just u)) $ do
expireFlags <- asks expireCIFlags
atomically $ TM.lookup userId expireFlags >>= \b -> unless (b == Just True) retry
ttl <- withStore' (`getChatItemTTL` u)
forM_ ttl $ \t -> expireChatItems u t False
threadDelay $ 1800 * 1000000 -- 30 minutes

restoreCalls :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m ()
restoreCalls user = do
Expand All @@ -235,13 +237,14 @@ restoreCalls user = do
atomically $ writeTVar calls callsMap

stopChatController :: forall m. MonadUnliftIO m => ChatController -> m ()
stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIs} = do
stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIFlags} = do
disconnectAgentClient smpAgent
readTVarIO s >>= mapM_ (\(a1, a2) -> uninterruptibleCancel a1 >> mapM_ uninterruptibleCancel a2)
closeFiles sndFiles
closeFiles rcvFiles
atomically $ do
writeTVar expireCIs False
keys <- M.keys <$> readTVar expireCIFlags
forM_ keys $ \k -> TM.insert k False expireCIFlags
writeTVar s Nothing
where
closeFiles :: TVar (Map Int64 Handle) -> m ()
Expand Down Expand Up @@ -304,10 +307,10 @@ processChatCommand = \case
APIActivateChat -> withUser $ \user -> do
restoreCalls user
withAgent activateAgent
setExpireCIs True
setAllExpireCIFlags True
pure $ CRCmdOk Nothing
APISuspendChat t -> do
setExpireCIs False
setAllExpireCIFlags False
withAgent (`suspendAgent` t)
pure $ CRCmdOk Nothing
ResubscribeAllConnections -> withUser $ \user -> do
Expand Down Expand Up @@ -797,22 +800,22 @@ processChatCommand = \case
SetUserSMPServers smpServersConfig -> withUser $ \User {userId} ->
processChatCommand $ APISetUserSMPServers userId smpServersConfig
TestSMPServer userId smpServer -> withUserId userId $ \user ->
CRSmpTestResult <$> (withAgent $ \a -> testSMPServerConnection a (aUserId user) smpServer)
CRSmpTestResult <$> withAgent (\a -> testSMPServerConnection a (aUserId user) smpServer)
APISetChatItemTTL userId newTTL_ -> withUser' $ \user -> do
checkSameUser userId user
checkStoreNotChanged $
withChatLock "setChatItemTTL" $ do
case newTTL_ of
Nothing -> do
withStore' $ \db -> setChatItemTTL db user newTTL_
setExpireCIs False
setExpireCIFlag user False
Just newTTL -> do
oldTTL <- withStore' (`getChatItemTTL` user)
when (maybe True (newTTL <) oldTTL) $ do
setExpireCIs False
setExpireCIFlag user False
expireChatItems user newTTL True
withStore' $ \db -> setChatItemTTL db user newTTL_
whenM chatStarted $ setExpireCIs True
whenM chatStarted $ setExpireCIFlag user True
pure $ CRCmdOk (Just user)
SetChatItemTTL newTTL_ -> withUser' $ \User {userId} -> do
processChatCommand $ APISetChatItemTTL userId newTTL_
Expand Down Expand Up @@ -1535,10 +1538,17 @@ assertDirectAllowed user dir ct event =
XCallInv_ -> False
_ -> True

setExpireCIs :: (MonadUnliftIO m, MonadReader ChatController m) => Bool -> m ()
setExpireCIs b = do
expire <- asks expireCIs
atomically $ writeTVar expire b
setExpireCIFlag :: (MonadUnliftIO m, MonadReader ChatController m) => User -> Bool -> m ()
setExpireCIFlag User {userId} b = do
expireFlags <- asks expireCIFlags
atomically $ TM.insert userId b expireFlags

setAllExpireCIFlags :: (MonadUnliftIO m, MonadReader ChatController m) => Bool -> m ()
setAllExpireCIFlags b = do
expireFlags <- asks expireCIFlags
atomically $ do
keys <- M.keys <$> readTVar expireFlags
forM_ keys $ \k -> TM.insert k b expireFlags

deleteFile :: forall m. ChatMonad m => User -> CIFileInfo -> m ()
deleteFile user CIFileInfo {filePath, fileId, fileStatus} =
Expand Down Expand Up @@ -1909,24 +1919,29 @@ startUpdatedTimedItemThread user chatRef ci ci' =
_ -> pure ()

expireChatItems :: forall m. ChatMonad m => User -> Int64 -> Bool -> m ()
expireChatItems user ttl sync = do
expireChatItems user@User {userId} ttl sync = do
currentTs <- liftIO getCurrentTime
let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs
-- this is to keep group messages created during last 12 hours even if they're expired according to item_ts
createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs
expire <- asks expireCIs
contacts <- withStore' (`getUserContacts` user)
loop expire contacts $ processContact expirationDate
loop contacts $ processContact expirationDate
groups <- withStore' (`getUserGroupDetails` user)
loop expire groups $ processGroup expirationDate createdAtCutoff
loop groups $ processGroup expirationDate createdAtCutoff
where
loop :: TVar Bool -> [a] -> (a -> m ()) -> m ()
loop _ [] _ = pure ()
loop expire (a : as) process = continue expire $ do
loop :: [a] -> (a -> m ()) -> m ()
loop [] _ = pure ()
loop (a : as) process = continue $ do
process a `catchError` (toView . CRChatError (Just user))
loop expire as process
continue :: TVar Bool -> m () -> m ()
continue expire = if sync then id else \a -> whenM (readTVarIO expire) $ threadDelay 100000 >> a
loop as process
continue :: m () -> m ()
continue a =
if sync
then a
else do
expireFlags <- asks expireCIFlags
expire <- atomically $ TM.lookup userId expireFlags
when (expire == Just True) $ threadDelay 100000 >> a
processContact :: UTCTime -> Contact -> m ()
processContact expirationDate ct = do
filesInfo <- withStore' $ \db -> getContactExpiredFileInfo db user ct expirationDate
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Chat/Controller.hs
Expand Up @@ -129,8 +129,8 @@ data ChatController = ChatController
config :: ChatConfig,
filesFolder :: TVar (Maybe FilePath), -- path to files folder for mobile apps,
incognitoMode :: TVar Bool,
expireCIsAsync :: TVar (Maybe (Async ())),
expireCIs :: TVar Bool,
expireCIThreads :: TMap UserId (Maybe (Async ())),
expireCIFlags :: TMap UserId Bool,
cleanupManagerAsync :: TVar (Maybe (Async ())),
timedItemThreads :: TMap (ChatRef, ChatItemId) (TVar (Maybe (Weak ThreadId))),
showLiveItems :: TVar Bool
Expand Down