Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub can still receive mesages after unsubscribe #28

Closed
ra1u opened this issue Jul 7, 2015 · 6 comments
Closed

PubSub can still receive mesages after unsubscribe #28

ra1u opened this issue Jul 7, 2015 · 6 comments

Comments

@ra1u
Copy link

ra1u commented Jul 7, 2015

I would expect that once punsubscribe is invoked handler won't receive single message. In this example getmessages receives many more messages

{-# LANGUAGE OverloadedStrings #-}

import Control.Monad
import Control.Monad.Trans
import Data.Monoid
import Database.Redis
import Data.ByteString.Char8
import Control.Concurrent.Async

sendmessages n = do
    conn <- connect defaultConnectInfo
    runRedis conn $ do
        forM_ [1..n] $ \x -> publish "hello" (pack ( "num" ++ (show x) ) )

getmessages = do
    conn <- connect defaultConnectInfo
    let chan = ["hello"]
    runRedis conn $
        pubSub (psubscribe chan) $ \msg -> do
            Prelude.putStrLn $ "Message from " 
                              ++ unpack (msgChannel msg) 
                              ++ " " 
                              ++ unpack (msgMessage msg)
            return $ punsubscribe chan

testpubsub = do
    a1 <- async getmessages
    a2 <- async ( sendmessages 100000 )
    wait a1
    wait a2

main = testpubsub
@nmattia
Copy link

nmattia commented Feb 21, 2016

I don't think this is specific to hedis (or redis, for that matter).

When I run it I don't get the first messages (~ 1 to 500) either. What probably happens is that your punsubscribe will take some time to reach Redis, during which messages will still be sent, and some will still be on the wire when it arrives. So if it takes the instruction x to get to Redis, you'll get two extra xs worth of messages after you unsubscribed (after you sent the unsubscribe, really).

You'll need to check the pattern (look at Message's constructors) if you want to be sure you haven't already unsubscribed from it.

@Yuras
Copy link
Contributor

Yuras commented Feb 21, 2016

@Nicowcow While technically you are correct, it is still an major API issue. Probably psubscribe and punsubscribe should not return till redis answer arrives. Unfortunately it is not possible with the existing design. Right now there is no (easy) way to know that subscribing or unsubscribing is done, I have a number of ugly hacks to work around that.

@ra1u
Copy link
Author

ra1u commented Feb 21, 2016

From documenation

-- Example: Receive a single message from the "chat" channel.

pubSub (subscribe ["chat"]) $ \msg -> do
    putStrLn $ "Message from " ++ show (msgChannel msg)
    return $ unsubscribe ["chat"]

Maye there should be just differnet description what code actually does

@qrilka
Copy link
Contributor

qrilka commented Mar 17, 2016

@ra1u my example solution for you problem (as a demonstration of my lines from #60 ):

{-# LANGUAGE OverloadedStrings #-}

import Control.Monad
import Control.Monad.Trans
import Data.Monoid
import Data.IORef
import Database.Redis
import Data.ByteString.Char8 (pack, unpack)
import Control.Concurrent.Async

sendmessages n = do
    conn <- connect defaultConnectInfo
    runRedis conn $ do
        forM_ [1..n] $ \x -> publish "hello" (pack ( "num" ++ (show x) ) )

data DeliveryInfo = DeliveryInfo
  { diUnsubscribed :: Bool
  , diLeftovers    :: [Message]
  }

getmessages = do
    conn <- connect defaultConnectInfo
    let chan = ["hello"]
    ref <- newIORef (DeliveryInfo False [])
    runRedis conn $
        pubSub (psubscribe chan) $ \msg -> do
            info <- readIORef ref
            if diUnsubscribed info
               then do writeIORef ref info{ diLeftovers = msg:diLeftovers info }
                       return mempty
               else do writeIORef ref info{ diUnsubscribed = True }
                       putStrLn $ "Message from " 
                         ++ unpack (msgChannel msg) 
                         ++ " " 
                         ++ unpack (msgMessage msg)
                       return $ punsubscribe chan
    leftovers <- reverse . diLeftovers <$> readIORef ref
    putStrLn $ "Number of received after the 1st caught "
      ++ "but before unsubscription ack:\n" ++ show (length leftovers)

testpubsub = do
    a1 <- async getmessages
    a2 <- async ( sendmessages 100000 )
    wait a1
    wait a2

main = testpubsub

E.g. one of its runs:

$ stack runhaskell test28.hs
Message from hello num1272
Number of received after the 1st caught but before unsubscription ack:
4271

@k-bx
Copy link
Collaborator

k-bx commented Mar 18, 2016

@ra1u after giving it a thought, I think solution that @qrilka provides seems like a good enough pattern which shouldn't be a part of hedis itself. What do you think? Would this resolve the issue or would you see some better ways to fix the issue?

@ra1u
Copy link
Author

ra1u commented Mar 18, 2016

It is fine for me. My main concern was actually as I mentioned documentation. I understand what was going on. Now i think documentation is unambiguous regarding such behaviour and I am closing this issue. Thank you all for your support.

@ra1u ra1u closed this as completed Mar 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants