Permalink
Browse files

reconnect cleanup

  • Loading branch information...
1 parent 0d0d199 commit ffa5fc7e95bbf3720192cbf1fa5af2048c8e9d00 @slider committed Dec 7, 2009
Showing with 78 additions and 30 deletions.
  1. +78 −30 src/main/scala/com/turtle/amqprelay/AMQPRelay.scala
@@ -26,7 +26,7 @@ import net.lag.logging._
case class AMQPMessage(tag: String, env: Envelope, props: AMQP.BasicProperties, body: Array[Byte], a: Actor)
case class AMQPAckMessage(deliveryTag: Long)
-case class AMQPReconnect(i: Int)
+case class AMQPReconnect(cause: ShutdownSignalException)
trait Configuration {
var config = {
@@ -45,8 +45,9 @@ trait Logging extends Configuration {
class AMQPConsumer(exchange_name: String) extends Actor with Configuration with Logging {
log.info("Initializing relay for exchange %s", exchange_name)
-
- def connect = {
+ val ref = this
+
+ def connect_server : Connection = {
log.debug("Connecting to %s", config.getString("master.host", "localhost"))
val params = new ConnectionParameters
params.setUsername(config.getString("master.username", "guest"))
@@ -55,36 +56,59 @@ class AMQPConsumer(exchange_name: String) extends Actor with Configuration with
params.setRequestedHeartbeat(1)
val factory = new ConnectionFactory(params)
val connection = factory.newConnection(config.getString("master.host", "localhost"), config.getInt("master.port", 5672))
+ val listener = new ShutdownListener {
+ override def shutdownCompleted(cause: ShutdownSignalException) {
+ log.error("Lost connection to %s", config.getString("master.host", "localhost"))
+ ref ! AMQPReconnect(cause)
+ }
+ }
+ connection.addShutdownListener(listener)
log.info("Successfully connected consumer to %s", config.getString("master.host", "localhost"))
- val producer = new AMQPProducer(exchange_name)
- producer.start
- val channel = connection.createChannel
+ connection
+ }
+
+ val producer = new AMQPProducer(exchange_name)
+ producer.start
+
+ def create_channel(connection: Connection) : Channel = {
+ val channel = connection.createChannel
channel.queueDeclare(exchange_name + "_relay" , true)
channel.queueBind(exchange_name + "_relay", exchange_name, "#")
channel.basicQos(25)
- val ref = this
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(tag: String, env: Envelope, props: AMQP.BasicProperties, body: Array[Byte]) = {
producer ! AMQPMessage(tag, env, props, body, ref)
}
}
channel.basicConsume(exchange_name + "_relay", false, consumer)
- val listener = new ShutdownListener {
- override def shutdownCompleted(cause: ShutdownSignalException) {
- log.error("Lost connection to %s", config.getString("master.host", "localhost"))
- ref ! AMQPReconnect(0)
- }
- }
- connection.addShutdownListener(listener)
- channel
+ channel
}
- var channel = connect
+ var connection: Connection = _
+ var channel: Channel = _
+
+ try {
+ connection = connect_server
+ channel = create_channel(connection)
+ } catch {
+ case e: Exception =>
+ Thread.sleep(1000)
+ self ! AMQPReconnect(null)
+ }
+
def act = {
loop {
react {
- case AMQPReconnect(i: Int) =>
- channel = connect
+ case AMQPReconnect(cause: ShutdownSignalException) =>
+ try {
+ if(cause.isHardError || cause == null) //FIXME: clean that up
+ connection = connect_server
+ channel = create_channel(connection)
+ } catch {
+ case e: Exception =>
+ Thread.sleep(1000)
+ self ! AMQPReconnect(cause)
+ }
case AMQPAckMessage(deliveryTag) =>
channel.basicAck(deliveryTag, false)
case _ => log.error("Unhandled Message.")
@@ -94,29 +118,51 @@ class AMQPConsumer(exchange_name: String) extends Actor with Configuration with
}
class AMQPProducer(exchange_name: String) extends Actor with Configuration with Logging {
- def connect = {
- log.debug("Connecting to %s", config.getString("remote.host", "localhost"))
+
+ val ref = this
+ def connect_server : Connection = {
+ log.debug("Connecting to %s", config.getString("remote.host", "localhost"))
val params = new ConnectionParameters
params.setUsername(config.getString("remote.username", "guest"))
params.setPassword(config.getString("remote.password", "guest"))
params.setVirtualHost(config.getString("remote.vhost", "/"))
params.setRequestedHeartbeat(1)
val factory = new ConnectionFactory(params)
val connection = factory.newConnection(config.getString("remote.host", "localhost"), config.getInt("remote.port", 5672))
+ val listener = new ShutdownListener {
+ override def shutdownCompleted(cause: ShutdownSignalException) {
+ log.error("Lost connection to %s", config.getString("remote.host", "localhost"))
+ ref ! AMQPReconnect(cause)
+ }
+ }
log.info("Successfully connected producer to %s", config.getString("remote.host", "localhost"))
- val channel = connection.createChannel
- val ref = this
+ connection.addShutdownListener(listener)
+ connection
+ }
+
+
+ def create_channel(connection: Connection) : Channel = {
+ val channel = connection.createChannel
val listener = new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException) {
log.error("Lost connection to %s", config.getString("remote.host", "localhost"))
- ref ! AMQPReconnect(0)
+ ref ! AMQPReconnect(cause)
}
}
- connection.addShutdownListener(listener)
channel
}
- var channel = connect
+ var connection: Connection = _
+ var channel: Channel = _
+
+ try {
+ connection = connect_server
+ channel = create_channel(connection)
+ } catch {
+ case e: Exception =>
+ Thread.sleep(1000)
+ self ! AMQPReconnect(null)
+ }
def act = {
loop {
@@ -125,13 +171,15 @@ class AMQPProducer(exchange_name: String) extends Actor with Configuration with
log.debug("Relaying message %s", new String(body))
channel.basicPublish(exchange_name, env.getRoutingKey, props, body)
a ! AMQPAckMessage(env.getDeliveryTag)
- case AMQPReconnect(i: Int) =>
- try {
- channel = connect
- } catch {
+ case AMQPReconnect(cause: ShutdownSignalException) =>
+ try {
+ if(cause.isHardError || cause == null) //FIXME: clean that up
+ connection = connect_server
+ channel = create_channel(connection)
+ } catch {
case e: Exception =>
Thread.sleep(1000)
- self ! AMQPReconnect(i + 1)
+ self ! AMQPReconnect(cause)
}
case _ => log.error("Unhandled Message.")
}

0 comments on commit ffa5fc7

Please sign in to comment.