Skip to content

Commit

Permalink
chat: subscribe to all connections on startup (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
Efim Poberezkin committed Mar 6, 2021
1 parent 11c8bee commit b9943c3
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 9 deletions.
6 changes: 3 additions & 3 deletions apps/dog-food/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ chatHelpInfo =
\/accept <name> <invitation> - accept <invitation>\n\
\ (a string that starts from \"smp::\")\n\
\ from your contact <name>\n\
\/chat <name> - resume chat with <name>\n\
\/name <name> - set <name> to use in invitations\n\
\@<name> <message> - send <message> (any string) to contact <name>\n\
\ @<name> can be omitted to send to previous"
Expand Down Expand Up @@ -173,7 +172,8 @@ sendToChatTerm ChatClient {outQ, username} ChatTerminal {outputQ} = forever $ do
atomically . writeTBQueue outputQ $ serializeChatResponse name resp

sendToAgent :: ChatClient -> ChatTerminal -> AgentClient -> IO ()
sendToAgent ChatClient {inQ, smpServer} ct AgentClient {rcvQ} =
sendToAgent ChatClient {inQ, smpServer} ct AgentClient {rcvQ} = do
atomically $ writeTBQueue rcvQ ("1", "", SUBALL) -- hack for subscribing to all
forever . atomically $ do
cmd <- readTBQueue inQ
writeTBQueue rcvQ `mapM_` agentTransmission cmd
Expand Down Expand Up @@ -209,7 +209,7 @@ receiveFromAgent t ct c = forever . atomically $ do
END -> Disconnected $ Contact a
MSG {m_body} -> ReceivedMessage (Contact a) m_body
SENT _ -> NoChatResponse
OK -> YesYes
OK -> Connected $ Contact a -- hack for subscribing to all
ERR e -> ChatError e
setActiveContact :: ChatResponse -> STM ()
setActiveContact = \case
Expand Down
20 changes: 14 additions & 6 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
case cmd of
NEW smpServer -> createNewConnection smpServer
JOIN smpQueueInfo replyMode -> joinConnection smpQueueInfo replyMode
SUB -> subscribeConnection
SUB -> subscribeConnection connAlias
SUBALL -> subscribeAll
SEND msgBody -> sendMessage msgBody
OFF -> suspendConnection
DEL -> deleteConnection
Expand All @@ -146,16 +147,20 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
ReplyOff -> return ()
respond CON

subscribeConnection :: m ()
subscribeConnection =
withStore (getConn st connAlias) >>= \case
subscribeConnection :: ConnAlias -> m ()
subscribeConnection cAlias =
withStore (getConn st cAlias) >>= \case
SomeConn _ (DuplexConnection _ rq _) -> subscribe rq
SomeConn _ (RcvConnection _ rq) -> subscribe rq
-- TODO possibly there should be a separate error type trying
-- TODO to send the message to the connection without RcvQueue
_ -> throwError PROHIBITED
where
subscribe rq = subscribeQueue c rq connAlias >> respond OK
subscribe rq = subscribeQueue c rq cAlias >> respond' cAlias OK

-- TODO remove - hack for subscribing to all; respond' and parameterization of subscribeConnection are byproduct
subscribeAll :: m ()
subscribeAll = withStore (getAllConnAliases st) >>= mapM_ subscribeConnection

sendMessage :: MsgBody -> m ()
sendMessage msgBody =
Expand Down Expand Up @@ -202,7 +207,10 @@ processCommand c@AgentClient {sndQ} st (corrId, connAlias, cmd) =
sendAgentMessage c sq senderTs $ REPLY qInfo

respond :: ACommand 'Agent -> m ()
respond resp = atomically $ writeTBQueue sndQ (corrId, connAlias, resp)
respond = respond' connAlias

respond' :: ConnAlias -> ACommand 'Agent -> m ()
respond' cAlias resp = atomically $ writeTBQueue sndQ (corrId, cAlias, resp)

subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> SQLiteStore -> m ()
subscriber c@AgentClient {msgQ} st = forever $ do
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Monad m => MonadAgentStore s m where
createRcvConn :: s -> RcvQueue -> m ()
createSndConn :: s -> SndQueue -> m ()
getConn :: s -> ConnAlias -> m SomeConn
getAllConnAliases :: s -> m [ConnAlias] -- TODO remove - hack for subscribing to all
getRcvQueue :: s -> SMPServer -> SMP.RecipientId -> m RcvQueue
deleteConn :: s -> ConnAlias -> m ()
upgradeRcvConnToDuplex :: s -> ConnAlias -> SndQueue -> m ()
Expand Down
12 changes: 12 additions & 0 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
(Nothing, Just sndQ) -> return $ SomeConn SCSnd (SndConnection connAlias sndQ)
_ -> throwError SEBadConn

getAllConnAliases :: SQLiteStore -> m [ConnAlias]
getAllConnAliases SQLiteStore {dbConn} =
liftIO $
retrieveAllConnAliases dbConn

getRcvQueue :: SQLiteStore -> SMPServer -> SMP.RecipientId -> m RcvQueue
getRcvQueue SQLiteStore {dbConn} SMPServer {host, port} rcvId = do
rcvQueue <-
Expand Down Expand Up @@ -353,6 +358,13 @@ retrieveSndQueueByConnAlias_ dbConn connAlias = do
return . Just $ SndQueue srv sndId cAlias sndPrivateKey encryptKey signKey status
_ -> return Nothing

-- * getAllConnAliases helper

retrieveAllConnAliases :: DB.Connection -> IO [ConnAlias]
retrieveAllConnAliases dbConn = do
r <- DB.query_ dbConn "SELECT conn_alias FROM connections;" :: IO [[ConnAlias]]
return (concat r)

-- * getRcvQueue helper

retrieveRcvQueue :: DB.Connection -> HostName -> Maybe ServiceName -> SMP.RecipientId -> IO (Maybe RcvQueue)
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Transmission.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ data ACommand (p :: AParty) where
-- CONF :: OtherPartyId -> ACommand Agent
-- LET :: OtherPartyId -> ACommand Client
SUB :: ACommand Client
SUBALL :: ACommand Client -- TODO should be moved to chat protocol - hack for subscribing to all
END :: ACommand Agent
-- QST :: QueueDirection -> ACommand Client
-- STAT :: QueueDirection -> Maybe QueueStatus -> Maybe SubMode -> ACommand Agent
Expand Down Expand Up @@ -291,6 +292,7 @@ commandP =
<|> "INV " *> invResp
<|> "JOIN " *> joinCmd
<|> "SUB" $> ACmd SClient SUB
<|> "SUBALL" $> ACmd SClient SUBALL -- TODO remove - hack for subscribing to all
<|> "END" $> ACmd SAgent END
<|> "SEND " *> sendCmd
<|> "SENT " *> sentResp
Expand Down Expand Up @@ -336,6 +338,7 @@ serializeCommand = \case
INV qInfo -> "INV " <> serializeSmpQueueInfo qInfo
JOIN qInfo rMode -> "JOIN " <> serializeSmpQueueInfo qInfo <> replyMode rMode
SUB -> "SUB"
SUBALL -> "SUBALL" -- TODO remove - hack for subscribing to all
END -> "END"
SEND msgBody -> "SEND " <> serializeMsg msgBody
SENT mId -> "SENT " <> bshow mId
Expand Down
11 changes: 11 additions & 0 deletions tests/AgentTests/SQLiteTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ storeTests = withStore do
describe "store methods" do
describe "createRcvConn" testCreateRcvConn
describe "createSndConn" testCreateSndConn
describe "getAllConnAliases" testGetAllConnAliases
describe "getRcvQueue" testGetRcvQueue
describe "deleteConn" do
describe "RcvConnection" testDeleteRcvConn
Expand Down Expand Up @@ -142,6 +143,16 @@ testCreateSndConn = do
getConn store "conn1"
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue1 sndQueue1)

testGetAllConnAliases :: SpecWith SQLiteStore
testGetAllConnAliases = do
it "should get all conn aliases" $ \store -> do
createRcvConn store rcvQueue1
`returnsResult` ()
createSndConn store sndQueue1 {connAlias = "conn2"}
`returnsResult` ()
getAllConnAliases store
`returnsResult` ["conn1" :: ConnAlias, "conn2" :: ConnAlias]

testGetRcvQueue :: SpecWith SQLiteStore
testGetRcvQueue = do
it "should get RcvQueue" $ \store -> do
Expand Down

0 comments on commit b9943c3

Please sign in to comment.