Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ConnectionFactory#CreateConnection with a list of hostnames #92

Merged
merged 5 commits into from May 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
Expand Up @@ -139,6 +139,7 @@
<Compile Include="src\client\api\IConnection.cs" />
<Compile Include="src\client\api\IConnectionFactory.cs" />
<Compile Include="src\client\api\IContentHeader.cs" />
<Compile Include="src\client\api\IHostnameSelector.cs" />
<Compile Include="src\client\api\IMethod.cs" />
<Compile Include="src\client\api\IModel.cs" />
<Compile Include="src\client\api\IProtocol.cs" />
Expand Down Expand Up @@ -216,6 +217,7 @@
<Compile Include="src\client\impl\ContentHeaderBase.cs" />
<Compile Include="src\client\impl\ContentHeaderPropertyReader.cs" />
<Compile Include="src\client\impl\ContentHeaderPropertyWriter.cs" />
<Compile Include="src\client\impl\ExtensionMethods.cs" />
<Compile Include="src\client\impl\Frame.cs" />
<Compile Include="src\client\impl\HardProtocolException.cs" />
<Compile Include="src\client\impl\IConsumerDispatcher.cs" />
Expand All @@ -232,6 +234,7 @@
<Compile Include="src\client\impl\ProtocolBase.cs" />
<Compile Include="src\client\impl\ProtocolException.cs" />
<Compile Include="src\client\impl\QuiescingSession.cs" />
<Compile Include="src\client\impl\RandomHostnameSelector.cs" />
<Compile Include="src\client\impl\RecordedBinding.cs" />
<Compile Include="src\client\impl\RecordedConsumer.cs" />
<Compile Include="src\client\impl\RecordedEntity.cs" />
Expand Down
Expand Up @@ -55,7 +55,7 @@ namespace RabbitMQ.Client
/// the Uri "Scheme" property is ignored: only the "Host" and
/// "Port" properties are extracted.
/// </para>
public class AmqpTcpEndpoint
public class AmqpTcpEndpoint : ICloneable
{
/// <summary>
/// Default Amqp ssl port.
Expand Down Expand Up @@ -119,6 +119,25 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
{
}

/// <summary>
/// Clones the endpoint.
/// </summary>
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
public object Clone()
{
return new AmqpTcpEndpoint(HostName, _port, Ssl);
}

/// <summary>
/// Clones the endpoint using the provided hostname.
/// </summary>
/// <param name="hostname">Hostname to use</param>
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
public AmqpTcpEndpoint CloneWithHostname(string hostname)
{
return new AmqpTcpEndpoint(hostname, _port, Ssl);
}

/// <summary>
/// Retrieve or set the hostname of this <see cref="AmqpTcpEndpoint"/>.
/// </summary>
Expand Down
59 changes: 46 additions & 13 deletions projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs
Expand Up @@ -44,6 +44,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;

#if !NETFX_CORE

Expand Down Expand Up @@ -143,18 +144,26 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
/// <summary>
/// Default SASL auth mechanisms to use.
/// </summary>
public static readonly AuthMechanismFactory[] DefaultAuthMechanisms = { new PlainMechanismFactory() };
public static readonly IList<AuthMechanismFactory> DefaultAuthMechanisms =
new List<AuthMechanismFactory>(){ new PlainMechanismFactory() };

/// <summary>
/// SASL auth mechanisms to use.
/// </summary>
public AuthMechanismFactory[] AuthMechanisms = DefaultAuthMechanisms;
public IList<AuthMechanismFactory> AuthMechanisms = DefaultAuthMechanisms;

/// <summary>
/// Set to true to enable automatic connection recovery.
/// </summary>
public bool AutomaticRecoveryEnabled;

/// <summary>
/// Used to select next hostname to try when performing
/// connection recovery (re-connecting). Is not used for
/// non-recovering connections.
/// </summary>
public IHostnameSelector HostnameSelector = new RandomHostnameSelector();

/// <summary>The host to connect to.</summary>
public String HostName = "localhost";

Expand Down Expand Up @@ -212,7 +221,7 @@ public ConnectionFactory()
}

/// <summary>
/// The AMQP connection target.
/// Connection endpoint.
/// </summary>
public AmqpTcpEndpoint Endpoint
{
Expand Down Expand Up @@ -285,12 +294,12 @@ public Uri uri
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
/// or null if we have none in common.
/// </summary>
public AuthMechanismFactory AuthMechanismFactory(string[] mechanismNames)
public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
{
// Our list is in order of preference, the server one is not.
foreach (AuthMechanismFactory factory in AuthMechanisms)
{
if (Array.Exists(mechanismNames, x => string.Equals(x, factory.Name, StringComparison.OrdinalIgnoreCase)))
if (mechanismNames.Any<string>(x => string.Equals(x, factory.Name, StringComparison.OrdinalIgnoreCase)))
{
return factory;
}
Expand All @@ -301,35 +310,59 @@ public AuthMechanismFactory AuthMechanismFactory(string[] mechanismNames)
/// <summary>
/// Create a connection to the specified endpoint.
/// </summary>
/// <exception cref="BrokerUnreachableException">
/// When the configured hostname was not reachable.
/// </exception>
public virtual IConnection CreateConnection()
{
IConnection connection;
return CreateConnection(new List<string>() { HostName });
}

/// <summary>
/// Create a connection using a list of hostnames. The first reachable
/// hostname will be used initially. Subsequent hostname picks are determined
/// by the <see cref="IHostnameSelector" /> configured.
/// </summary>
/// <param name="hostnames">
/// List of hostnames to use for the initial
/// connection and recovery.
/// </param>
/// <returns>Open connection</returns>
/// <exception cref="BrokerUnreachableException">
/// When no hostname was reachable.
/// </exception>
public IConnection CreateConnection(IList<string> hostnames)
{
IConnection conn;
try
{
if (AutomaticRecoveryEnabled)
{
var autorecoveringConnection = new AutorecoveringConnection(this);
autorecoveringConnection.init();
connection = autorecoveringConnection;
autorecoveringConnection.Init(hostnames);
conn = autorecoveringConnection;
}
else
{
IProtocol protocol = Protocols.DefaultProtocol;
connection = protocol.CreateConnection(this, false, CreateFrameHandler());
conn = protocol.CreateConnection(this, false, CreateFrameHandler());
}
}
catch (Exception e)
{
throw new BrokerUnreachableException(e);
}

return connection;
return conn;
}

public IFrameHandler CreateFrameHandler()
{
IProtocol protocol = Protocols.DefaultProtocol;
return protocol.CreateFrameHandler(Endpoint, SocketFactory, RequestedConnectionTimeout);
return Protocols.DefaultProtocol.CreateFrameHandler(Endpoint, SocketFactory, RequestedConnectionTimeout);
}

public IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
return Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout);
}

private void SetUri(Uri uri)
Expand Down
Expand Up @@ -90,13 +90,20 @@ public interface IConnectionFactory
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
/// or null if we have none in common.
/// </summary>
AuthMechanismFactory AuthMechanismFactory(string[] mechanismNames);
AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames);

/// <summary>
/// Create a connection to the specified endpoint.
/// </summary>
IConnection CreateConnection();

/// <summary>
/// Connects to the first reachable hostname from the list.
/// </summary>
/// <param name="hostnames">List of host names to use</param>
/// <returns></returns>
IConnection CreateConnection(IList<string> hostnames);

/// <summary>
/// Advanced option.
///
Expand Down
@@ -0,0 +1,56 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (C) 2007-2014 GoPivotal, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is GoPivotal, Inc.
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace RabbitMQ.Client
{
public interface IHostnameSelector
{
/// <summary>
/// Picks a hostname from a list of options that should be used
/// by <see cref="IConnectionFactory"/>.
/// </summary>
/// <param name="options"></param>
/// <returns></returns>
string NextFrom(IList<string> options);
}
}
Expand Up @@ -47,25 +47,11 @@ namespace RabbitMQ.Client
/// </summary>
public interface NetworkConnection
{
#if !NETFX_CORE
/// <summary>
/// Identifies local network address.
/// </summary>
EndPoint LocalEndPoint { get; }
#endif

/// <summary>
/// Local port.
/// </summary>
int LocalPort { get; }

#if !NETFX_CORE
/// <summary>
/// Identifies remote network address.
/// </summary>
EndPoint RemoteEndPoint { get; }
#endif

/// <summary>
/// Remote port.
/// </summary>
Expand Down