Skip to content

Commit

Permalink
Merge pull request #37 from rabbitmq/bug26329
Browse files Browse the repository at this point in the history
Make consumer operation dispatch concurrent
  • Loading branch information
michaelklishin committed Mar 5, 2015
2 parents db90353 + 64ce259 commit 3f9f463
Show file tree
Hide file tree
Showing 26 changed files with 1,489 additions and 126 deletions.
1 change: 1 addition & 0 deletions projects/client/Apigen/src/apigen/Apigen.cs
Expand Up @@ -775,6 +775,7 @@ public bool IsAmqpClass(Type t)
public void EmitModelImplementation() {
EmitLine(" public class Model: RabbitMQ.Client.Impl.ModelBase {");
EmitLine(" public Model(RabbitMQ.Client.Impl.ISession session): base(session) {}");
EmitLine(" public Model(RabbitMQ.Client.Impl.ISession session, RabbitMQ.Client.ConsumerWorkService workService): base(session, workService) {}");
IList<MethodInfo> asynchronousHandlers = new List<MethodInfo>();
foreach (Type t in m_modelTypes) {
foreach (MethodInfo method in t.GetMethods()) {
Expand Down
140 changes: 137 additions & 3 deletions projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<!-- Warning! This file contains important customizations. Using Visual Studio to edit project's properties might break things. -->
<!-- Props file -->
Expand Down Expand Up @@ -120,7 +120,141 @@
<AssemblyOriginatorKeyFile>$(PropKeyfile)</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
<Compile Include="src\client\**\*.cs" />
<Compile Include="src\client\api\AmqpTcpEndpoint.cs" />
<Compile Include="src\client\api\AmqpTimestamp.cs" />
<Compile Include="src\client\api\AmqpVersion.cs" />
<Compile Include="src\client\api\AuthMechanism.cs" />
<Compile Include="src\client\api\AuthMechanismFactory.cs" />
<Compile Include="src\client\api\BasicGetResult.cs" />
<Compile Include="src\client\api\BinaryTableValue.cs" />
<Compile Include="src\client\api\ConnectionFactory.cs" />
<Compile Include="src\client\api\ConnectionFactoryBase.cs" />
<Compile Include="src\client\api\DefaultBasicConsumer.cs" />
<Compile Include="src\client\api\ExchangeType.cs" />
<Compile Include="src\client\api\ExternalMechanism.cs" />
<Compile Include="src\client\api\ExternalMechanismFactory.cs" />
<Compile Include="src\client\api\IBasicConsumer.cs" />
<Compile Include="src\client\api\IBasicProperties.cs" />
<Compile Include="src\client\api\IConnection.cs" />
<Compile Include="src\client\api\IConnectionFactory.cs" />
<Compile Include="src\client\api\IContentHeader.cs" />
<Compile Include="src\client\api\IMethod.cs" />
<Compile Include="src\client\api\IModel.cs" />
<Compile Include="src\client\api\IProtocol.cs" />
<Compile Include="src\client\api\IQueueingBasicConsumer.cs" />
<Compile Include="src\client\api\IRecoverable.cs" />
<Compile Include="src\client\api\IStreamProperties.cs" />
<Compile Include="src\client\api\NetworkConnection.cs" />
<Compile Include="src\client\api\PlainMechanism.cs" />
<Compile Include="src\client\api\PlainMechanismFactory.cs" />
<Compile Include="src\client\api\Protocols.cs" />
<Compile Include="src\client\api\PublicationAddress.cs" />
<Compile Include="src\client\api\QueueDeclareOk.cs" />
<Compile Include="src\client\api\QueueingBasicConsumer.cs" />
<Compile Include="src\client\api\ShutdownEventArgs.cs" />
<Compile Include="src\client\api\ShutdownInitiator.cs" />
<Compile Include="src\client\api\ShutdownReportEntry.cs" />
<Compile Include="src\client\api\SslHelper.cs" />
<Compile Include="src\client\api\SslOption.cs" />
<Compile Include="src\client\api\TopologyRecoveryException.cs" />
<Compile Include="src\client\content\BasicMessageBuilder.cs" />
<Compile Include="src\client\content\BasicMessageReader.cs" />
<Compile Include="src\client\content\BytesMessageBuilder.cs" />
<Compile Include="src\client\content\BytesMessageReader.cs" />
<Compile Include="src\client\content\BytesWireFormatting.cs" />
<Compile Include="src\client\content\IBytesMessageBuilder.cs" />
<Compile Include="src\client\content\IBytesMessageReader.cs" />
<Compile Include="src\client\content\IMapMessageBuilder.cs" />
<Compile Include="src\client\content\IMapMessageReader.cs" />
<Compile Include="src\client\content\IMessageBuilder.cs" />
<Compile Include="src\client\content\IMessageReader.cs" />
<Compile Include="src\client\content\IStreamMessageBuilder.cs" />
<Compile Include="src\client\content\IStreamMessageReader.cs" />
<Compile Include="src\client\content\MapMessageBuilder.cs" />
<Compile Include="src\client\content\MapMessageReader.cs" />
<Compile Include="src\client\content\MapWireFormatting.cs" />
<Compile Include="src\client\content\PrimitiveParser.cs" />
<Compile Include="src\client\content\StreamMessageBuilder.cs" />
<Compile Include="src\client\content\StreamMessageReader.cs" />
<Compile Include="src\client\content\StreamWireFormatting.cs" />
<Compile Include="src\client\events\BasicAckEventArgs.cs" />
<Compile Include="src\client\events\BasicDeliverEventArgs.cs" />
<Compile Include="src\client\events\BasicNackEventArgs.cs" />
<Compile Include="src\client\events\BasicReturnEventArgs.cs" />
<Compile Include="src\client\events\CallbackExceptionEventArgs.cs" />
<Compile Include="src\client\events\ConnectionBlockedEventArgs.cs" />
<Compile Include="src\client\events\ConsumerEventArgs.cs" />
<Compile Include="src\client\events\ConsumerTagChangedAfterRecoveryEventArgs.cs" />
<Compile Include="src\client\events\EventingBasicConsumer.cs" />
<Compile Include="src\client\events\FlowControlEventArgs.cs" />
<Compile Include="src\client\events\QueueNameChangedAfterRecoveryEventArgs.cs" />
<Compile Include="src\client\events\RecoveryExceptionEventArgs.cs" />
<Compile Include="src\client\exceptions\AlreadyClosedException.cs" />
<Compile Include="src\client\exceptions\AuthenticationFailureException.cs" />
<Compile Include="src\client\exceptions\BrokerUnreachableException.cs" />
<Compile Include="src\client\exceptions\ChannelAllocationException.cs" />
<Compile Include="src\client\exceptions\ConnectFailureException.cs" />
<Compile Include="src\client\exceptions\OperationInterruptedException.cs" />
<Compile Include="src\client\exceptions\PacketNotRecognizedException.cs" />
<Compile Include="src\client\exceptions\PossibleAuthenticationFailureException.cs" />
<Compile Include="src\client\exceptions\ProtocolVersionMismatchException.cs" />
<Compile Include="src\client\exceptions\UnexpectedMethodException.cs" />
<Compile Include="src\client\exceptions\UnsupportedMethodException.cs" />
<Compile Include="src\client\exceptions\UnsupportedMethodFieldException.cs" />
<Compile Include="src\client\exceptions\WireFormattingException.cs" />
<Compile Include="src\client\impl\AutorecoveringConnection.cs" />
<Compile Include="src\client\impl\AutorecoveringModel.cs" />
<Compile Include="src\client\impl\BasicProperties.cs" />
<Compile Include="src\client\impl\ChannelErrorException.cs" />
<Compile Include="src\client\impl\Command.cs" />
<Compile Include="src\client\impl\CommandAssembler.cs" />
<Compile Include="src\client\impl\ConcurrentConsumerDispatcher.cs" />
<Compile Include="src\client\impl\Connection.cs" />
<Compile Include="src\client\impl\ConnectionStartDetails.cs" />
<Compile Include="src\client\impl\ConsumerWorkService.cs" />
<Compile Include="src\client\impl\ContentHeaderBase.cs" />
<Compile Include="src\client\impl\ContentHeaderPropertyReader.cs" />
<Compile Include="src\client\impl\ContentHeaderPropertyWriter.cs" />
<Compile Include="src\client\impl\Frame.cs" />
<Compile Include="src\client\impl\HardProtocolException.cs" />
<Compile Include="src\client\impl\IConsumerDispatcher.cs" />
<Compile Include="src\client\impl\IFrameHandler.cs" />
<Compile Include="src\client\impl\IFullModel.cs" />
<Compile Include="src\client\impl\IRpcContinuation.cs" />
<Compile Include="src\client\impl\ISession.cs" />
<Compile Include="src\client\impl\MainSession.cs" />
<Compile Include="src\client\impl\MalformedFrameException.cs" />
<Compile Include="src\client\impl\MethodArgumentReader.cs" />
<Compile Include="src\client\impl\MethodArgumentWriter.cs" />
<Compile Include="src\client\impl\MethodBase.cs" />
<Compile Include="src\client\impl\ModelBase.cs" />
<Compile Include="src\client\impl\ProtocolBase.cs" />
<Compile Include="src\client\impl\ProtocolException.cs" />
<Compile Include="src\client\impl\QuiescingSession.cs" />
<Compile Include="src\client\impl\RecordedBinding.cs" />
<Compile Include="src\client\impl\RecordedConsumer.cs" />
<Compile Include="src\client\impl\RecordedEntity.cs" />
<Compile Include="src\client\impl\RecordedExchange.cs" />
<Compile Include="src\client\impl\RecordedNamedEntity.cs" />
<Compile Include="src\client\impl\RecordedQueue.cs" />
<Compile Include="src\client\impl\RecoveryAwareModel.cs" />
<Compile Include="src\client\impl\RpcContinuationQueue.cs" />
<Compile Include="src\client\impl\Session.cs" />
<Compile Include="src\client\impl\SessionBase.cs" />
<Compile Include="src\client\impl\SessionManager.cs" />
<Compile Include="src\client\impl\ShutdownContinuation.cs" />
<Compile Include="src\client\impl\SimpleBlockingRpcContinuation.cs" />
<Compile Include="src\client\impl\SocketFrameHandler.cs" />
<Compile Include="src\client\impl\SoftProtocolException.cs" />
<Compile Include="src\client\impl\StreamProperties.cs" />
<Compile Include="src\client\impl\SyntaxError.cs" />
<Compile Include="src\client\impl\UnexpectedFrameException.cs" />
<Compile Include="src\client\impl\UnknownClassOrMethodException.cs" />
<Compile Include="src\client\impl\WireFormatting.cs" />
<Compile Include="src\client\messagepatterns\ISubscription.cs" />
<Compile Include="src\client\messagepatterns\SimpleRpcClient.cs" />
<Compile Include="src\client\messagepatterns\SimpleRpcServer.cs" />
<Compile Include="src\client\messagepatterns\Subscription.cs" />
<Compile Include="src\util\**\*.cs" />
</ItemGroup>
<ItemGroup>
Expand All @@ -142,4 +276,4 @@
<Target Name="BeforeBuild" DependsOnTargets="GenerateApi; Detokenize" />
<!-- Custom BeforeClean -->
<Target Name="BeforeClean" DependsOnTargets="CleanGenerateApi; CleanDetokenize" />
</Project>
</Project>
Expand Up @@ -41,6 +41,7 @@
using System;
using System.Collections.Generic;
using System.Net.Security;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -183,11 +184,18 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
/// </summary>
public bool TopologyRecoveryEnabled = true;

/// <summary>
/// Task scheduler connections created by this factory will use when
/// dispatching consumer operations, such as message deliveries.
/// </summary>
public TaskScheduler TaskScheduler { get; set; }

/// <summary>
/// Construct a fresh instance, with all fields set to their respective defaults.
/// </summary>
public ConnectionFactory()
{
this.TaskScheduler = TaskScheduler.Default;
VirtualHost = DefaultVHost;
UserName = DefaultUser;
RequestedHeartbeat = DefaultHeartbeat;
Expand Down Expand Up @@ -254,7 +262,7 @@ public Uri uri
public ushort RequestedHeartbeat { get; set; }

/// <summary>
/// When set to true, background threads will be used for I/O and heartbeats.
/// When set to true, background thread will be used for the I/O loop.
/// </summary>
public bool UseBackgroundThreadsForIO { get; set; }

Expand Down
2 changes: 2 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IConnection.cs
Expand Up @@ -309,5 +309,7 @@ public interface IConnection : NetworkConnection, IDisposable
/// Handle incoming Connection.Unblocked methods.
/// </summary>
void HandleConnectionUnblocked();

ConsumerWorkService ConsumerWorkService { get; }
}
}
Expand Up @@ -40,6 +40,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -95,5 +96,12 @@ public interface IConnectionFactory
/// Create a connection to the specified endpoint.
/// </summary>
IConnection CreateConnection();

/// <summary>
/// Advanced option.
///
/// What task scheduler should consumer dispatcher use.
/// </summary>
TaskScheduler TaskScheduler { get; set; }
}
}
13 changes: 10 additions & 3 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Expand Up @@ -38,11 +38,13 @@
// Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
//---------------------------------------------------------------------------

using RabbitMQ.Client;
using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
using System;
using System.Collections.Generic;
using System.IO;
using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
Expand All @@ -56,6 +58,11 @@ namespace RabbitMQ.Client
/// </remarks>
public interface IModel : IDisposable
{
/// <summary>
/// Channel number, unique per connections.
/// </summary>
int ChannelNumber { get; }

/// <summary>
/// Returns null if the session is still in a state where it can be used,
/// or the cause of its closure otherwise.
Expand Down Expand Up @@ -445,7 +452,7 @@ public interface IModel : IDisposable
void QueueBind(string queue, string exchange, string routingKey);

/// <summary>Same as QueueBind but sets nowait parameter to true.</summary>
void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

/// <summary>(Spec method) Declare a queue.</summary>
/// <remarks>
Expand Down
Expand Up @@ -58,6 +58,15 @@ public BaseExceptionEventArgs(Exception exception)

///<summary>Access the wrapped exception.</summary>
public Exception Exception { get; private set; }

public IDictionary<string, object> UpdateDetails(IDictionary<string, object> other)
{
foreach (var pair in other)
{
this.Detail[pair.Key] = pair.Value;
}
return this.Detail;
}
}


Expand Down Expand Up @@ -86,5 +95,23 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs
public CallbackExceptionEventArgs(Exception e) : base(e)
{
}

public static CallbackExceptionEventArgs Build(Exception e,
string context)
{
var details = new Dictionary<string, object>
{
{"context", context}
};
return Build(e, details);
}

public static CallbackExceptionEventArgs Build(Exception e,
IDictionary<string, object> details)
{
var exnArgs = new CallbackExceptionEventArgs(e);
exnArgs.UpdateDetails(details);
return exnArgs;
}
}
}
Expand Up @@ -236,6 +236,11 @@ public ushort ChannelMax
get { return m_delegate.ChannelMax; }
}

public ConsumerWorkService ConsumerWorkService
{
get { return m_delegate.ConsumerWorkService; }
}

public IDictionary<string, object> ClientProperties
{
get { return m_delegate.ClientProperties; }
Expand Down
Expand Up @@ -71,6 +71,10 @@ public class AutorecoveringModel : IFullModel, IRecoverable
protected bool usesPublisherConfirms = false;
protected bool usesTransactions = false;
private EventHandler<EventArgs> m_recovery;
public IConsumerDispatcher ConsumerDispatcher
{
get { return m_delegate.ConsumerDispatcher; }
}

public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
{
Expand Down

0 comments on commit 3f9f463

Please sign in to comment.