Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chat: subscribe to all connections on startup #70

Merged
merged 9 commits into from
Mar 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/dog-food/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ chatHelpInfo =
\/accept <name> <invitation> - accept <invitation>\n\
\ (a string that starts from \"smp::\")\n\
\ from your contact <name>\n\
\/chat <name> - resume chat with <name>\n\
\/name <name> - set <name> to use in invitations\n\
\@<name> <message> - send <message> (any string) to contact <name>\n\
\ @<name> can be omitted to send to previous"
Expand Down Expand Up @@ -173,7 +172,8 @@ sendToChatTerm ChatClient {outQ, username} ChatTerminal {outputQ} = forever $ do
atomically . writeTBQueue outputQ $ serializeChatResponse name resp

sendToAgent :: ChatClient -> ChatTerminal -> AgentClient -> IO ()
sendToAgent ChatClient {inQ, smpServer} ct AgentClient {rcvQ} =
sendToAgent ChatClient {inQ, smpServer} ct AgentClient {rcvQ} = do
atomically $ writeTBQueue rcvQ ("1", "", SUBALL) -- hack for subscribing to all
forever . atomically $ do
cmd <- readTBQueue inQ
writeTBQueue rcvQ `mapM_` agentTransmission cmd
Expand Down Expand Up @@ -209,7 +209,7 @@ receiveFromAgent t ct c = forever . atomically $ do
END -> Disconnected $ Contact a
MSG {m_body} -> ReceivedMessage (Contact a) m_body
SENT _ -> NoChatResponse
OK -> YesYes
OK -> Connected $ Contact a -- hack for subscribing to all
ERR e -> ChatError e
setActiveContact :: ChatResponse -> STM ()
setActiveContact = \case
Expand Down
20 changes: 14 additions & 6 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
case cmd of
NEW smpServer -> createNewConnection smpServer
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
SUB -> subscribeConnection
SUB -> subscribeConnection connAlias
SUBALL -> subscribeAll
SEND msgBody -> sendMessage msgBody
OFF -> suspendConnection
DEL -> deleteConnection
Expand All @@ -146,16 +147,20 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
ReplyOff -> return ()
respond CON

subscribeConnection :: m ()
subscribeConnection =
withStore (getConn st connAlias) >>= \case
subscribeConnection :: ConnAlias -> m ()
subscribeConnection cAlias =
withStore (getConn st cAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> subscribe rq
SomeConn _ (RcvConnection _ rq) -> subscribe rq
-- TODO possibly there should be a separate error type trying
-- TODO to send the message to the connection without RcvQueue
_ -> throwError PROHIBITED
where
subscribe rq = subscribeQueue c rq connAlias >> respond OK
subscribe rq = subscribeQueue c rq cAlias >> respond' cAlias OK

-- TODO remove - hack for subscribing to all; respond' and parameterization of subscribeConnection are byproduct
subscribeAll :: m ()
subscribeAll = withStore (getAllConnAliases st) >>= mapM_ subscribeConnection

sendMessage :: MsgBody -> m ()
sendMessage msgBody =
Expand Down Expand Up @@ -202,7 +207,10 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
sendAgentMessage c sq senderTs $ REPLY qInfo

respond :: ACommand 'Agent -> m ()
respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp)
respond = respond' connAlias

respond' :: ConnAlias -> ACommand 'Agent -> m ()
respond' cAlias resp = atomically $ writeTBQueue sndQ (corrId, cAlias, resp)

subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m ()
subscriber c@AgentClient {msgQ} st = forever $ do
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Monad m => MonadAgentStore s m where
createRcvConn :: s -> RcvQueue -> m ()
createSndConn :: s -> SndQueue -> m ()
getConn :: s -> ConnAlias -> m SomeConn
getAllConnAliases :: s -> m [ConnAlias] -- TODO remove - hack for subscribing to all
getRcvQueue :: s -> SMPServer -> SMP.RecipientId -> m RcvQueue
deleteConn :: s -> ConnAlias -> m ()
upgradeRcvConnToDuplex :: s -> ConnAlias -> SndQueue -> m ()
Expand Down
12 changes: 12 additions & 0 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
(Nothing, Just sndQ) -> return $ SomeConn SCSnd (SndConnection connAlias sndQ)
_ -> throwError SEBadConn

getAllConnAliases :: SQLiteStore -> m [ConnAlias]
getAllConnAliases SQLiteStore {dbConn} =
liftIO $
retrieveAllConnAliases dbConn

getRcvQueue :: SQLiteStore -> SMPServer -> SMP.RecipientId -> m RcvQueue
getRcvQueue SQLiteStore {dbConn} SMPServer {host, port} rcvId = do
rcvQueue <-
Expand Down Expand Up @@ -353,6 +358,13 @@ retrieveSndQueueByConnAlias_ dbConn connAlias = do
return . Just $ SndQueue srv sndId cAlias sndPrivateKey encryptKey signKey status
_ -> return Nothing

-- * getAllConnAliases helper

retrieveAllConnAliases :: DB.Connection -> IO [ConnAlias]
retrieveAllConnAliases dbConn = do
r <- DB.query_ dbConn "SELECT conn_alias FROM connections;" :: IO [[ConnAlias]]
return (concat r)

-- * getRcvQueue helper

retrieveRcvQueue :: DB.Connection -> HostName -> Maybe ServiceName -> SMP.RecipientId -> IO (Maybe RcvQueue)
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Transmission.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ data ACommand (p :: AParty) where
-- CONF :: OtherPartyId -> ACommand Agent
-- LET :: OtherPartyId -> ACommand Client
SUB :: ACommand Client
SUBALL :: ACommand Client -- TODO should be moved to chat protocol - hack for subscribing to all
END :: ACommand Agent
-- QST :: QueueDirection -> ACommand Client
-- STAT :: QueueDirection -> Maybe QueueStatus -> Maybe SubMode -> ACommand Agent
Expand Down Expand Up @@ -291,6 +292,7 @@ commandP =
<|> "INV " *> invResp
<|> "JOIN " *> joinCmd
<|> "SUB" $> ACmd SClient SUB
<|> "SUBALL" $> ACmd SClient SUBALL -- TODO remove - hack for subscribing to all
<|> "END" $> ACmd SAgent END
<|> "SEND " *> sendCmd
<|> "SENT " *> sentResp
Expand Down Expand Up @@ -336,6 +338,7 @@ serializeCommand = \case
INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo
JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode
SUB -> "SUB"
SUBALL -> "SUBALL" -- TODO remove - hack for subscribing to all
END -> "END"
SEND msgBody -> "SEND " <> serializeMsg msgBody
SENT mId -> "SENT " <> bshow mId
Expand Down
11 changes: 11 additions & 0 deletions tests/AgentTests/SQLiteTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ storeTests = withStore do
describe "store methods" do
describe "createRcvConn" testCreateRcvConn
describe "createSndConn" testCreateSndConn
describe "getAllConnAliases" testGetAllConnAliases
describe "getRcvQueue" testGetRcvQueue
describe "deleteConn" do
describe "RcvConnection" testDeleteRcvConn
Expand Down Expand Up @@ -142,6 +143,16 @@ testCreateSndConn = do
getConn store "conn1"
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue1 sndQueue1)

testGetAllConnAliases :: SpecWith SQLiteStore
testGetAllConnAliases = do
it "should get all conn aliases" $ \store -> do
createRcvConn store rcvQueue1
`returnsResult` ()
createSndConn store sndQueue1 {connAlias = "conn2"}
`returnsResult` ()
getAllConnAliases store
`returnsResult` ["conn1" :: ConnAlias, "conn2" :: ConnAlias]

testGetRcvQueue :: SpecWith SQLiteStore
testGetRcvQueue = do
it "should get RcvQueue" $ \store -> do
Expand Down