Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel is not thread safe #119

Closed
pnuz3n opened this issue Oct 7, 2014 · 17 comments
Closed

Channel is not thread safe #119

pnuz3n opened this issue Oct 7, 2014 · 17 comments

Comments

@pnuz3n
Copy link

pnuz3n commented Oct 7, 2014

Channel.shutdown(*error) function sets me.send function using lock. However Ack, Nack, Reject functions are not using the same lock.

Is the Channel supposed to be thread safe? Simple fix would be just add locking to those methods missing it, but I am not sure if there is something else behind this.

@streadway
Copy link
Owner

Channel and Connection are intended to be thread safe. Thanks for finding this. Indeed the simple fix would be to hold the lock.

@pnuz3n
Copy link
Author

pnuz3n commented Oct 7, 2014

This might not be so simple fix. It seems that call function also uses send and does not use "m" mutex. However Confirm method use "m" lock when making using call-function. Thus one can not simply add locking to call method.

Only place where send method seems to be changing is when channel is closed. Thus errors caused by this problem seems unlikely if channel is only used from one goroutine.

Would it make sense just declare Channel not thread safe and remove locking? It would make the internal working of the library faster in general and less error prone (in this rare case)?

@CrackerJackMack
Copy link

I personally saw massive performance improvements by using a channel per go routine. I allocated a channel then passed it into my function which was spawned as a go routine. If the connection or channel had an error I'd just clean up all the go routines and re-spawn them.

Using this method it made my code much cleaner to deal with than having to juggle whether the connection and/or channel were still valid.

Removing the mutex and saying that channels are not thread safe would surely increase performance for my use case.

@cenkalti
Copy link

So, ACKing messages (that are received via same delivery channel) from different goroutines cause any problem?

@bmorton
Copy link

bmorton commented Dec 1, 2014

@cenkalti yeah, i'm running into that problem.

@CrackerJackMack would you mind sharing some code around what you're doing to get around this? I was hoping that I could spawn a goroutine for each amqp.Delivery received through the channel, but if I tried to Ack them, everything blows up.

@pnuz3n
Copy link
Author

pnuz3n commented Dec 1, 2014

@bmortin just create go chan for the ack messages and in spawned go routine just "push" ack message to this chan which is read by the goroutine responisible for handling the AMQP channel.

Something like this (I pulled it out from my sleeve so, probably won't compile):

ackChan := make(chan int) // or something
for {
   select {
    case m := <-fromAmqp:
        go handle(m,ackChan)
    case m := <-ackChan:
        amqpChannel.Ack(m)
   }
}

@pnuz3n
Copy link
Author

pnuz3n commented Dec 1, 2014

So, should this be solved by improving synchronization and remove raceconditions or remove partial synchronization?

@streadway
Copy link
Owner

I'll give this some time today and tomorrow to fix.

@bmorton
Copy link

bmorton commented Dec 1, 2014

Actually, I think part of my problem was misunderstanding the parameter that Ack took. I was calling delivery.Ack(true) thinking that was what I needed to do, but it turns out that was acking messages that were running in other goroutines. I changed that to delivery.Ack(false) and I think that fixed my issue.

@bmorton
Copy link

bmorton commented Dec 1, 2014

Some sample code:

func main() {

  // ...

  log.Printf("Waiting for messages...")
  for {
    d, ok := <-c.Deliveries
    if !ok {
      break
    }
    go processMessage(d)
  }

  // ...

}

func processMessage(d amqp.Delivery) {
  log.Printf("[%v] %q", d.DeliveryTag, d.Body)
  d.Ack(false)
}

@michaelklishin
Copy link
Collaborator

@bmorton you can't ack deliveries on a different channel. In your code example all goroutines use the same channel: were you running into "double acknowledgements"?

@bmorton
Copy link

bmorton commented Dec 1, 2014

Yeah, I was ack'ing things before the goroutine started processing some messages which was giving me some empty messages which became panics at some point.

@michaelklishin
Copy link
Collaborator

Double acking should result in a [RabbitMQ] channel error. I'm guessing this leads to the deliveries [Go] channe to producel nils? We have some conflicting terminology here :)

@streadway
Copy link
Owner

@pnuz3n update: I've put together an Ack race in https://github.com/streadway/amqp/compare/ack-race-119 but am failing to detect the race with go test -cpu 8 -race. I'll try with getting a shutdown() in there somehow.

@streadway
Copy link
Owner

The race is now in the ack-race-119 branch. I don't see a simple fix due to Connection's reentrant call from Connection.send back to Channel.shutdown so I can't hold a lock on Channel.send's mutation.

I will need to sleep on this. Suggestions welcome for any approaches.

cenkalti added a commit to cenkalti/amqp that referenced this issue Dec 2, 2014
@cenkalti
Copy link

cenkalti commented Dec 2, 2014

  • Combined sendOpen and sendClosed methods into one (send).
  • Added new state shutdowned bool (sorry for weird name, couldn't come up with better one)

Assigning functions to variables are cool but personally I don't like this feature. Making send a regular method has fixed the weird calling form as a side effect me.send(me, ...)

What do you think? Is it a good solution?

cenkalti added a commit to cenkalti/amqp that referenced this issue Dec 22, 2014
@michaelklishin
Copy link
Collaborator

This issue seems to be very broad. Discussing concurrency hazard safety without specific scenarios is going to be very time consuming at best. Most clients do not support concurrent publishing on a shared channel: this is not unreasonable.

If there are specific examples that are still not covered over 2 years later, please file new, more focussed issues. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants