Skip to content

Commit

Permalink
core: expire chat items for all users (#1737)
Browse files Browse the repository at this point in the history
  • Loading branch information
spaced4ndy committed Jan 13, 2023
1 parent cccdcef commit 0c3d643
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 50 deletions.
111 changes: 63 additions & 48 deletions src/Simplex/Chat.hs
Expand Up @@ -155,12 +155,12 @@ newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agen
filesFolder <- newTVarIO optFilesFolder
incognitoMode <- newTVarIO False
chatStoreChanged <- newTVarIO False
expireCIsAsync <- newTVarIO Nothing
expireCIs <- newTVarIO False
expireCIThreads <- newTVarIO M.empty
expireCIFlags <- newTVarIO M.empty
cleanupManagerAsync <- newTVarIO Nothing
timedItemThreads <- atomically TM.empty
showLiveItems <- newTVarIO False
pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIsAsync, expireCIs, cleanupManagerAsync, timedItemThreads, showLiveItems}
pure ChatController {activeTo, firstTime, currentUser, smpAgent, agentAsync, chatStore, chatStoreChanged, idsDrg, inputQ, outputQ, notifyQ, chatLock, sndFiles, rcvFiles, currentCalls, config, sendNotification, incognitoMode, filesFolder, expireCIThreads, expireCIFlags, cleanupManagerAsync, timedItemThreads, showLiveItems}
where
configServers :: DefaultAgentServers
configServers =
Expand All @@ -187,45 +187,47 @@ activeAgentServers ChatConfig {defaultServers = DefaultAgentServers {smp}} =
. filter (\ServerCfg {enabled} -> enabled)

startChatController :: (MonadUnliftIO m, MonadReader ChatController m) => User -> Bool -> Bool -> m (Async ())
startChatController user subConns enableExpireCIs = do
startChatController currentUser subConns enableExpireCIs = do
asks smpAgent >>= resumeAgentClient
restoreCalls user
users <- fromRight [] <$> runExceptT (withStore' getUsers)
restoreCalls currentUser
s <- asks agentAsync
readTVarIO s >>= maybe (start s) (pure . fst)
readTVarIO s >>= maybe (start s users) (pure . fst)
where
start s = do
start s users = do
a1 <- async $ race_ notificationSubscriber agentSubscriber
a2 <-
if subConns
then Just <$> async (void . runExceptT $ subscribeUserConnections Agent.subscribeConnections user)
then Just <$> async (void . runExceptT $ subscribeUserConnections Agent.subscribeConnections currentUser)
else pure Nothing
atomically . writeTVar s $ Just (a1, a2)
startCleanupManager
when enableExpireCIs startExpireCIs
when enableExpireCIs $ startExpireCIs users
pure a1
startCleanupManager = do
cleanupAsync <- asks cleanupManagerAsync
readTVarIO cleanupAsync >>= \case
Nothing -> do
a <- Just <$> async (void . runExceptT $ cleanupManager user)
a <- Just <$> async (void . runExceptT $ cleanupManager currentUser)
atomically $ writeTVar cleanupAsync a
_ -> pure ()
startExpireCIs = do
expireAsync <- asks expireCIsAsync
readTVarIO expireAsync >>= \case
Nothing -> do
a <- Just <$> async (void $ runExceptT runExpireCIs)
atomically $ writeTVar expireAsync a
setExpireCIs True
_ -> setExpireCIs True
runExpireCIs = forever $ do
-- TODO per user
flip catchError (toView . CRChatError (Just user)) $ do
expire <- asks expireCIs
atomically $ readTVar expire >>= \b -> unless b retry
ttl <- withStore' (`getChatItemTTL` user)
forM_ ttl $ \t -> expireChatItems user t False
threadDelay $ 1800 * 1000000 -- 30 minutes
startExpireCIs users = do
expireThreads <- asks expireCIThreads
forM_ users $ \u@User {userId} ->
atomically (TM.lookup userId expireThreads) >>= \case
Nothing -> do
a <- Just <$> async (void . runExceptT $ runExpireCIs u)
atomically $ TM.insert userId a expireThreads
setExpireCIFlag u True
_ -> setExpireCIFlag u True
where
runExpireCIs u@User {userId} = forever $ do
flip catchError (toView . CRChatError (Just u)) $ do
expireFlags <- asks expireCIFlags
atomically $ TM.lookup userId expireFlags >>= \b -> unless (b == Just True) retry
ttl <- withStore' (`getChatItemTTL` u)
forM_ ttl $ \t -> expireChatItems u t False
threadDelay $ 1800 * 1000000 -- 30 minutes

restoreCalls :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m ()
restoreCalls user = do
Expand All @@ -235,13 +237,14 @@ restoreCalls user = do
atomically $ writeTVar calls callsMap

stopChatController :: forall m. MonadUnliftIO m => ChatController -> m ()
stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIs} = do
stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles, expireCIFlags} = do
disconnectAgentClient smpAgent
readTVarIO s >>= mapM_ (\(a1, a2) -> uninterruptibleCancel a1 >> mapM_ uninterruptibleCancel a2)
closeFiles sndFiles
closeFiles rcvFiles
atomically $ do
writeTVar expireCIs False
keys <- M.keys <$> readTVar expireCIFlags
forM_ keys $ \k -> TM.insert k False expireCIFlags
writeTVar s Nothing
where
closeFiles :: TVar (Map Int64 Handle) -> m ()
Expand Down Expand Up @@ -304,10 +307,10 @@ processChatCommand = \case
APIActivateChat -> withUser $ \user -> do
restoreCalls user
withAgent activateAgent
setExpireCIs True
setAllExpireCIFlags True
pure $ CRCmdOk Nothing
APISuspendChat t -> do
setExpireCIs False
setAllExpireCIFlags False
withAgent (`suspendAgent` t)
pure $ CRCmdOk Nothing
ResubscribeAllConnections -> withUser $ \user -> do
Expand Down Expand Up @@ -797,22 +800,22 @@ processChatCommand = \case
SetUserSMPServers smpServersConfig -> withUser $ \User {userId} ->
processChatCommand $ APISetUserSMPServers userId smpServersConfig
TestSMPServer userId smpServer -> withUserId userId $ \user ->
CRSmpTestResult <$> (withAgent $ \a -> testSMPServerConnection a (aUserId user) smpServer)
CRSmpTestResult <$> withAgent (\a -> testSMPServerConnection a (aUserId user) smpServer)
APISetChatItemTTL userId newTTL_ -> withUser' $ \user -> do
checkSameUser userId user
checkStoreNotChanged $
withChatLock "setChatItemTTL" $ do
case newTTL_ of
Nothing -> do
withStore' $ \db -> setChatItemTTL db user newTTL_
setExpireCIs False
setExpireCIFlag user False
Just newTTL -> do
oldTTL <- withStore' (`getChatItemTTL` user)
when (maybe True (newTTL <) oldTTL) $ do
setExpireCIs False
setExpireCIFlag user False
expireChatItems user newTTL True
withStore' $ \db -> setChatItemTTL db user newTTL_
whenM chatStarted $ setExpireCIs True
whenM chatStarted $ setExpireCIFlag user True
pure $ CRCmdOk (Just user)
SetChatItemTTL newTTL_ -> withUser' $ \User {userId} -> do
processChatCommand $ APISetChatItemTTL userId newTTL_
Expand Down Expand Up @@ -1535,10 +1538,17 @@ assertDirectAllowed user dir ct event =
XCallInv_ -> False
_ -> True

setExpireCIs :: (MonadUnliftIO m, MonadReader ChatController m) => Bool -> m ()
setExpireCIs b = do
expire <- asks expireCIs
atomically $ writeTVar expire b
setExpireCIFlag :: (MonadUnliftIO m, MonadReader ChatController m) => User -> Bool -> m ()
setExpireCIFlag User {userId} b = do
expireFlags <- asks expireCIFlags
atomically $ TM.insert userId b expireFlags

setAllExpireCIFlags :: (MonadUnliftIO m, MonadReader ChatController m) => Bool -> m ()
setAllExpireCIFlags b = do
expireFlags <- asks expireCIFlags
atomically $ do
keys <- M.keys <$> readTVar expireFlags
forM_ keys $ \k -> TM.insert k b expireFlags

deleteFile :: forall m. ChatMonad m => User -> CIFileInfo -> m ()
deleteFile user CIFileInfo {filePath, fileId, fileStatus} =
Expand Down Expand Up @@ -1909,24 +1919,29 @@ startUpdatedTimedItemThread user chatRef ci ci' =
_ -> pure ()

expireChatItems :: forall m. ChatMonad m => User -> Int64 -> Bool -> m ()
expireChatItems user ttl sync = do
expireChatItems user@User {userId} ttl sync = do
currentTs <- liftIO getCurrentTime
let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs
-- this is to keep group messages created during last 12 hours even if they're expired according to item_ts
createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs
expire <- asks expireCIs
contacts <- withStore' (`getUserContacts` user)
loop expire contacts $ processContact expirationDate
loop contacts $ processContact expirationDate
groups <- withStore' (`getUserGroupDetails` user)
loop expire groups $ processGroup expirationDate createdAtCutoff
loop groups $ processGroup expirationDate createdAtCutoff
where
loop :: TVar Bool -> [a] -> (a -> m ()) -> m ()
loop _ [] _ = pure ()
loop expire (a : as) process = continue expire $ do
loop :: [a] -> (a -> m ()) -> m ()
loop [] _ = pure ()
loop (a : as) process = continue $ do
process a `catchError` (toView . CRChatError (Just user))
loop expire as process
continue :: TVar Bool -> m () -> m ()
continue expire = if sync then id else \a -> whenM (readTVarIO expire) $ threadDelay 100000 >> a
loop as process
continue :: m () -> m ()
continue a =
if sync
then a
else do
expireFlags <- asks expireCIFlags
expire <- atomically $ TM.lookup userId expireFlags
when (expire == Just True) $ threadDelay 100000 >> a
processContact :: UTCTime -> Contact -> m ()
processContact expirationDate ct = do
filesInfo <- withStore' $ \db -> getContactExpiredFileInfo db user ct expirationDate
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Chat/Controller.hs
Expand Up @@ -129,8 +129,8 @@ data ChatController = ChatController
config :: ChatConfig,
filesFolder :: TVar (Maybe FilePath), -- path to files folder for mobile apps,
incognitoMode :: TVar Bool,
expireCIsAsync :: TVar (Maybe (Async ())),
expireCIs :: TVar Bool,
expireCIThreads :: TMap UserId (Maybe (Async ())),
expireCIFlags :: TMap UserId Bool,
cleanupManagerAsync :: TVar (Maybe (Async ())),
timedItemThreads :: TMap (ChatRef, ChatItemId) (TVar (Maybe (Weak ThreadId))),
showLiveItems :: TVar Bool
Expand Down

0 comments on commit 0c3d643

Please sign in to comment.