Skip to content

Commit

Permalink
- Only open the RabbitMQ channel when the Subscriber is subscribed to.
Browse files Browse the repository at this point in the history
- Ensure open RabbitMQ channels are closed in the finalize method.
  • Loading branch information
Colin Breck authored and mkiedys committed Feb 13, 2017
1 parent 331ea03 commit 3658602
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
34 changes: 27 additions & 7 deletions src/main/scala/io/scalac/amqp/impl/ExchangeSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.scalac.amqp.impl
import java.util.Objects.requireNonNull
import java.util.concurrent.atomic.AtomicReference

import com.rabbitmq.client.Channel
import com.rabbitmq.client.{Channel, Connection}
import io.scalac.amqp.Routed
import org.reactivestreams.{Subscriber, Subscription}

Expand All @@ -12,20 +12,36 @@ import scala.collection.immutable.Queue
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.stm.{Ref, atomic}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)
private[amqp] class ExchangeSubscriber(connection: Connection, exchange: String)
extends Subscriber[Routed] {
require(exchange.length <= 255, "exchange.length > 255")

val active = new AtomicReference[Subscription]()
val publishingThreadRunning = Ref(false)
val buffer = Ref(Queue[Routed]())
val closeRequested = Ref(false)
val channel = new AtomicReference[Channel]()

override def finalize(): Unit = {
try
closeChannel()
finally
super.finalize()
}

override def onSubscribe(subscription: Subscription): Unit =
active.compareAndSet(null, subscription) match {
case true subscription.request(1)
case true
Try(connection.createChannel()) match {
case Success(newChannel)
channel.set(newChannel)
subscription.request(1)
case Failure(cause)
subscription.cancel()
}
case false subscription.cancel() // 2.5: cancel
}

Expand Down Expand Up @@ -54,7 +70,7 @@ private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)

private def publish(routed: Routed): Unit = {
try {
channel.basicPublish(
channel.get().basicPublish(
exchange,
routed.routingKey,
Conversions.toBasicProperties(routed.message),
Expand All @@ -69,8 +85,12 @@ private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)

/** Double check before calling `close`. Second `close` on channel kills connection.*/
private def closeChannel(): Unit = {
if (closeRequested.single.compareAndSet(false, true) && channel.isOpen()) {
channel.close()
if (closeRequested.single.compareAndSet(false, true)) {
try {
channel.get().close()
} catch {
case NonFatal(_) =>
}
}
}

Expand All @@ -90,5 +110,5 @@ private[amqp] class ExchangeSubscriber(channel: Channel, exchange: String)
}
}

override def toString = s"ExchangeSubscriber(channel=$channel, exchange=$exchange)"
override def toString = s"ExchangeSubscriber(channel=${channel.get()}, exchange=$exchange)"
}
13 changes: 12 additions & 1 deletion src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe
val running = Ref(false)
var closeRequested = Ref(false)

override def finalize(): Unit = {
try
closeChannel()
finally
super.finalize()
}

override def handleCancel(consumerTag: String) = try {
subscriber.onComplete()
} catch {
Expand Down Expand Up @@ -87,7 +94,11 @@ private[amqp] class QueueSubscription(channel: Channel, queue: String, subscribe

def closeChannel(): Unit = synchronized {
if (closeRequested.single.compareAndSet(false, true) && channel.isOpen) {
channel.close()
try {
channel.close()
} catch {
case NonFatal(_) =>
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/io/scalac/amqp/impl/RabbitConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ private[amqp] class RabbitConnection(settings: ConnectionSettings) extends Conne

override def publish(exchange: String, routingKey: String) =
new Subscriber[Message] {
val channel = underlying.createChannel()
val delegate = new ExchangeSubscriber(channel, exchange)
val delegate = new ExchangeSubscriber(underlying, exchange)

override def onError(t: Throwable) = delegate.onError(t)
override def onSubscribe(s: Subscription) = delegate.onSubscribe(s)
Expand All @@ -135,12 +134,12 @@ private[amqp] class RabbitConnection(settings: ConnectionSettings) extends Conne
routingKey = routingKey,
message = message))

override def toString = s"ExchangeSubscriber(channel=$channel, exchange=$exchange, routingKey=$routingKey)"
override def toString = s"ExchangeSubscriber(exchange=$exchange, routingKey=$routingKey)"
}

override def publish(exchange: String) =
new ExchangeSubscriber(
channel = underlying.createChannel(),
connection = underlying,
exchange = exchange)

override def publishDirectly(queue: String) =
Expand Down

0 comments on commit 3658602

Please sign in to comment.