Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Rabbit Connection namespace synced
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Fitzgerald committed Oct 3, 2012
1 parent e5f2183 commit c5493fa
Show file tree
Hide file tree
Showing 34 changed files with 1,000 additions and 161 deletions.
Expand Up @@ -15,9 +15,11 @@

#region Using Directives
using System;
using System.Collections;
using System.Collections.Generic;
using Spring.Messaging.Amqp.Core;
using Spring.Objects.Factory;
using Queue = Spring.Messaging.Amqp.Core.Queue;
#endregion

namespace Spring.Messaging.Amqp.Rabbit.Config
Expand All @@ -27,7 +29,7 @@ namespace Spring.Messaging.Amqp.Rabbit.Config
/// </summary>
public class BindingFactoryObject : IFactoryObject
{
private IDictionary<string, object> arguments;
private IDictionary arguments;
private string routingKey = string.Empty;
private string exchange;
private Queue destinationQueue;
Expand All @@ -39,7 +41,7 @@ public class BindingFactoryObject : IFactoryObject
/// <value>The arguments.</value>
/// <author>Dave Syer</author>
/// <author>Joe Fitzgerald</author>
public IDictionary<string, object> Arguments { set { this.arguments = value; } }
public IDictionary Arguments { set { this.arguments = value; } }

/// <summary>
/// Sets the routing key.
Expand Down
Expand Up @@ -114,7 +114,7 @@ private void ParseListener(XmlElement listenerEle, XmlElement containerEle, Pars
}
}

var containerDef = RabbitNamespaceUtils.ParseContainer(listenerEle, containerEle, parserContext);
var containerDef = RabbitNamespaceUtils.ParseContainer(containerEle, parserContext);

if (listenerEle.HasAttribute(RESPONSE_EXCHANGE_ATTRIBUTE))
{
Expand Down
Expand Up @@ -55,6 +55,10 @@ public abstract class AbstractConnectionFactory : IConnectionFactory, IDisposabl
/// </summary>
private readonly CompositeChannelListener channelListener = new CompositeChannelListener();

// private volatile IExecutorService executorService;

private volatile AmqpTcpEndpoint[] addresses;

/// <summary>Initializes a new instance of the <see cref="AbstractConnectionFactory"/> class.</summary>
/// <param name="rabbitConnectionFactory">The rabbit connection factory.</param>
public AbstractConnectionFactory(ConnectionFactory rabbitConnectionFactory)
Expand Down Expand Up @@ -90,6 +94,18 @@ public AbstractConnectionFactory(ConnectionFactory rabbitConnectionFactory)
/// </summary>
public int Port { get { return this.rabbitConnectionFactory.Port; } set { this.rabbitConnectionFactory.Port = value; } }

public string Addresses
{
set
{
var addressArray = AmqpTcpEndpoint.ParseMultiple(RabbitMQ.Client.Protocols.DefaultProtocol, value);
if (addressArray != null && addressArray.Length > 0)
{
this.addresses = addressArray;
}
}
}

/// <summary>
/// Gets the channel listener.
/// </summary>
Expand Down Expand Up @@ -138,7 +154,16 @@ public virtual IConnection CreateBareConnection()
{
try
{
return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection());
if (this.addresses != null)
{
// TODO: Waiting on RabbitMQ.Client to catch up to the Java equivalent here.
// return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection(this.addresses));
return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection());
}
else
{
return new SimpleConnection(this.rabbitConnectionFactory.CreateConnection());
}
}
catch (Exception ex)
{
Expand Down
Expand Up @@ -16,37 +16,19 @@
#region Using Directives
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using AopAlliance.Intercept;
using Common.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Impl;
using Spring.Aop.Framework;
using Spring.Messaging.Amqp.Rabbit.Support;
using Spring.Util;
#endregion

namespace Spring.Messaging.Amqp.Rabbit.Connection
{
/**
* A {@link IConnectionFactory} implementation that returns the same Connections from all {@link #createConnection()}
* calls, and ignores calls to {@link com.rabbitmq.client.Connection#close()} and caches
* {@link com.rabbitmq.client.Channel}.
*
* <p>
* By default, only one Channel will be cached, with further requested Channels being created and disposed on demand.
* Consider raising the {@link #setChannelCacheSize(int) "channelCacheSize" value} in case of a high-concurrency
* environment.
*
* <p>
* <b>NOTE: This ConnectionFactory requires explicit closing of all Channels obtained form its shared Connection.</b>
* This is the usual recommendation for native Rabbit access code anyway. However, with this ConnectionFactory, its use
* is mandatory in order to actually allow for Channel reuse.
*
* @author Mark Pollack
* @author Mark Fisher
* @author Dave Syer
*/

/// <summary>
/// A caching connection factory implementation. The default channel cache size is 1, please modify to
/// meet your scaling needs.
Expand Down Expand Up @@ -97,21 +79,23 @@ public class CachingConnectionFactory : AbstractConnectionFactory
/// </summary>
private ChannelCachingConnectionProxy connection;

private volatile bool publisherConfirms;

private volatile bool publisherReturns;

/// <summary>
/// Synchronization monitor for the shared Connection.
/// </summary>
private readonly object connectionMonitor = new object();

/// <summary>
/// Initializes a new instance of the <see cref="CachingConnectionFactory"/> class
/// initializing the hostname to be the value returned from Dns.GetHostName() or "localhost"
/// if Dns.GetHostName() throws an exception.
/// </summary>
/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class.
/// Create a new <see cref="CachingConnectionFactory"/> initializing the hostname to be the value returned from Dns.GetHostName() or "localhost"
/// if Dns.GetHostName() throws an exception.</summary>
public CachingConnectionFactory() : this(string.Empty) { }

/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class given a host name and port</summary>
/// <param name="hostname">The hostname.</param>
/// <param name="port">The port.</param>
/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class. Create a new <see cref="CachingConnectionFactory"/> given a host name and port</summary>
/// <param name="hostname">The hostname to connect to.</param>
/// <param name="port">The port number.</param>
public CachingConnectionFactory(string hostname, int port) : base(new ConnectionFactory())
{
if (string.IsNullOrWhiteSpace(hostname))
Expand All @@ -123,15 +107,15 @@ public CachingConnectionFactory(string hostname, int port) : base(new Connection
this.Port = port;
}

/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class given a port</summary>
/// <param name="port">The port.</param>
/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class. Create a new <see cref="CachingConnectionFactory"/> given a port</summary>
/// <param name="port">The port number.</param>
public CachingConnectionFactory(int port) : this(string.Empty, port) { }

/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class given a host name.</summary>
/// <param name="hostname">The hostname.</param>
/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class. Create a new <see cref="CachingConnectionFactory"/> given a host name.</summary>
/// <param name="hostname">The hostname to connect to.</param>
public CachingConnectionFactory(string hostname) : this(hostname, Protocols.DefaultProtocol.DefaultPort) { }

/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class.</summary>
/// <summary>Initializes a new instance of the <see cref="CachingConnectionFactory"/> class. Create a new <see cref="CachingConnectionFactory"/> for the given ConnectionFactory.</summary>
/// <param name="rabbitConnectionFactory">The rabbit connection factory.</param>
public CachingConnectionFactory(ConnectionFactory rabbitConnectionFactory) : base(rabbitConnectionFactory) { }

Expand All @@ -149,6 +133,12 @@ public int ChannelCacheSize
}
}

/// <summary>Gets or sets a value indicating whether is publisher confirms.</summary>
public bool IsPublisherConfirms { get { return this.publisherConfirms; } set { this.publisherConfirms = value; } }

/// <summary>Gets or sets a value indicating whether is publisher returns.</summary>
public bool IsPublisherReturns { get { return this.publisherReturns; } set { this.publisherReturns = value; } }

/// <summary>
/// Gets or sets a value indicating whether this <see cref="CachingConnectionFactory"/> is active.
/// </summary>
Expand All @@ -159,7 +149,7 @@ public int ChannelCacheSize
/// Sets the connection listeners.
/// </summary>
/// <value>The connection listeners.</value>
public new IList<IConnectionListener> ConnectionListeners
public override IList<IConnectionListener> ConnectionListeners
{
set
{
Expand All @@ -174,7 +164,7 @@ public new IList<IConnectionListener> ConnectionListeners

/// <summary>Add a connection listener.</summary>
/// <param name="listener">The listener.</param>
public new void AddConnectionListener(IConnectionListener listener)
public override void AddConnectionListener(IConnectionListener listener)
{
base.AddConnectionListener(listener);

Expand Down Expand Up @@ -202,18 +192,12 @@ internal IModel GetChannel(bool transactional)

if (channel != null)
{
if (this.Logger.IsTraceEnabled)
{
this.Logger.Trace("Found cached Rabbit Channel");
}
this.Logger.Trace(m => m("Found cached Rabbit Channel"));
}
else
{
if (this.Logger.IsDebugEnabled)
{
this.Logger.Debug("Creating cached Rabbit Channel");
}

this.Logger.Debug(m => m("Creating cached Rabbit Channel"));

channel = this.GetCachedChannelProxy(channelList, transactional);
}

Expand All @@ -229,8 +213,24 @@ internal IModel GetChannel(bool transactional)
protected virtual IChannelProxy GetCachedChannelProxy(LinkedList<IChannelProxy> channelList, bool transactional)
{
var targetChannel = this.CreateBareChannel(transactional);
this.Logger.Debug(m => m("Creating cached Rabbit Channel from {0}", targetChannel));

this.ChannelListener.OnCreate(targetChannel, transactional);
/*
* TODO: Pending Completion of PublisherCallbackChannelImpl
*
IList<Type> interfaces;
if(this.publisherConfirms || this.publisherReturns)
{
interfaces = new List<Type>() { typeof(IChannelProxy), typeof(IPublisherCallbackChannel) };
}
else
{
interfaces = new List<Type>() { typeof(IChannelProxy) };
}
*/
var factory = new ProxyFactory(typeof(IChannelProxy), new CachedChannelInvocationHandler(targetChannel, channelList, transactional, this));
// factory.Interfaces = interfaces.ToArray();
var channelProxy = (IChannelProxy)factory.GetProxy();
return channelProxy;
}
Expand All @@ -248,7 +248,29 @@ internal IModel CreateBareChannel(bool transactional)
this.CreateConnection();
}

return this.connection.CreateBareChannel(transactional);
var channel = this.connection.CreateBareChannel(transactional);
/*
* TODO: Pending Completion of PublisherCallbackChannelImpl
if (this.publisherConfirms)
{
try
{
channel.ConfirmSelect();
}
catch (Exception ex)
{
this.Logger.Error(m => m("Could not configure the channel to receive publisher confirms"), ex);
}
}
if (this.publisherConfirms || this.publisherReturns)
{
if (!(channel is PublisherCallbackChannelImpl))
{
channel = new PublisherCallbackChannelImpl(channel);
}
}
*/
return channel;
}

/// <summary>
Expand Down Expand Up @@ -345,7 +367,7 @@ public override string ToString()
}
}

#region CachedChannelInvocationHandler - For a Rainy Day
#region CachedChannelInvocationHandler

/// <summary>
/// A cached channel invocation handler.
Expand Down Expand Up @@ -468,7 +490,7 @@ public object Invoke(IMethodInvocation invocation)
{
// Basic re-connection logic...
this.target = null;
this.Logger.Debug("Detected closed channel on exception. Re-initializing: " + this.target);
this.Logger.Debug(m => m("Detected closed channel on exception. Re-initializing: {0}", this.target));
lock (this.targetMonitor)
{
if (this.target == null)
Expand Down Expand Up @@ -499,17 +521,11 @@ private void LogicalClose(IChannelProxy proxy)
}

// Allow for multiple close calls...
// TODO: Figure out why the proxied channels don't work with this.channelList.Contains()
// if (!this.channelList.Contains(proxy))
// {
if (this.Logger.IsTraceEnabled)
if (!this.channelList.Contains(proxy))
{
this.Logger.Trace("Returning cached Channel: " + this.target);
this.Logger.Trace(m => m("Returning cached Channel: {0}", this.target));
this.channelList.AddLast(proxy);
}

this.channelList.AddLast(proxy);

// }
}

/// <summary>
Expand All @@ -519,7 +535,7 @@ private void PhysicalClose()
{
if (this.Logger.IsDebugEnabled)
{
this.Logger.Debug("Closing cached Channel: " + this.target);
this.Logger.Debug(m => m("Closing cached Channel: " + this.target));
}

if (this.target == null)
Expand Down Expand Up @@ -589,13 +605,13 @@ public IModel CreateChannel(bool transactional)
/// <summary>The dispose.</summary>
public void Dispose()
{
this.outer.Reset();
if (this.target != null)
{
this.outer.ConnectionListener.OnClose(this.target);
RabbitUtils.CloseConnection(this.target);
}

this.outer.Reset();
this.target = null;
}

Expand Down
Expand Up @@ -23,6 +23,8 @@ namespace Spring.Messaging.Amqp.Rabbit.Connection
/// <summary>
/// A composite channel listener.
/// </summary>
/// <author>Dave Syer</author>
/// <author>Joe Fitzgerald</author>
public class CompositeChannelListener : IChannelListener
{
/// <summary>
Expand Down

0 comments on commit c5493fa

Please sign in to comment.