Permalink
Browse files

Single threaded AMQP RPC server with simple start-stop test

  • Loading branch information...
bmjames committed Jan 28, 2011
1 parent 30dab6d commit b1a069ac22e0417d02ef8017787a7db2866fba8b
View
@@ -0,0 +1,4 @@
+*.suo
+*ReSharper*
+DiffaParticipant/*/bin
+DiffaParticipant/*/obj
@@ -0,0 +1,32 @@
+
+Microsoft Visual Studio Solution File, Format Version 11.00
+# Visual Studio 2010
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Net.LShift.Diffa.Messaging.AMQP", "Net.LShift.Diffa.Messaging.AMQP\Net.LShift.Diffa.Messaging.AMQP.csproj", "{38735842-7EAE-451E-BB34-C1D88ED1E2EF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Net.LShift.Diffa.Participants", "Net.LShift.Diffa.Participants\Net.LShift.Diffa.Participants.csproj", "{D3E42187-4C30-4519-AABC-44AE924F94E6}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Net.LShift.Diffa.Messaging.AMQP.Test", "Net.LShift.Diffa.Messaging.AMQP.Test\Net.LShift.Diffa.Messaging.AMQP.Test.csproj", "{D8E2EDD6-F29E-474D-A5A7-A09C619DED65}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {38735842-7EAE-451E-BB34-C1D88ED1E2EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {38735842-7EAE-451E-BB34-C1D88ED1E2EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {38735842-7EAE-451E-BB34-C1D88ED1E2EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {38735842-7EAE-451E-BB34-C1D88ED1E2EF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D3E42187-4C30-4519-AABC-44AE924F94E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D3E42187-4C30-4519-AABC-44AE924F94E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D3E42187-4C30-4519-AABC-44AE924F94E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D3E42187-4C30-4519-AABC-44AE924F94E6}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D8E2EDD6-F29E-474D-A5A7-A09C619DED65}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D8E2EDD6-F29E-474D-A5A7-A09C619DED65}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D8E2EDD6-F29E-474D-A5A7-A09C619DED65}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D8E2EDD6-F29E-474D-A5A7-A09C619DED65}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
@@ -0,0 +1,56 @@
+//
+// Copyright (C) 2011 LShift Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System;
+
+using NUnit.Framework;
+
+using RabbitMQ.Client;
+using RabbitMQ.Client.MessagePatterns.Configuration;
+using RabbitMQ.Client.MessagePatterns.Unicast;
+
+using Net.LShift.Diffa.Messaging.Amqp;
+
+namespace Net.LShift.Diffa.Messaging.AMQP.Test
+{
+ [TestFixture]
+ public class AmqpEndToEndTest
+ {
+
+ [Test]
+ public void ServerCanStartAndDispose()
+ {
+ // Simple smoke test for the worker thread disposal; just starts and then disposes the server.
+ // TODO this assumes Rabbit is running on the named endpoint; could boot up a Rabbit instance on localhost instead
+ var connectionBuilder = new ConnectionBuilder(new ConnectionFactory(), new AmqpTcpEndpoint("mrnoisy.lshift.net"));
+ var connector = Factory.CreateConnector(connectionBuilder);
+ var handler = new DummyHandler();
+ using (var server = new AmqpRpcServer(connector, "DUMMY_QUEUE_NAME", handler))
+ {
+ server.Start();
+ }
+ }
+
+ internal class DummyHandler : IJsonRpcHandler
+ {
+ public JsonTransportResponse HandleRequest(JsonTransportRequest request)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ }
+}
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{D8E2EDD6-F29E-474D-A5A7-A09C619DED65}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Net.LShift.Diffa.Messaging.AMQP.Test</RootNamespace>
+ <AssemblyName>Net.LShift.Diffa.Messaging.AMQP.Test</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="nunit.framework, Version=2.5.4.10098, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+ <HintPath>..\..\..\..\AppData\Local\WebGAC\Repository\nunit.framework\2.5.4.10098_Release\nunit.framework.dll</HintPath>
+ </Reference>
+ <Reference Include="RabbitMQ.Client, Version=1.7.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\AppData\Local\WebGAC\Repository\RabbitMQ.Client\1.7.0.0_Release\RabbitMQ.Client.dll</HintPath>
+ </Reference>
+ <Reference Include="RabbitMQ.Client.MessagePatterns, Version=1.1.0.0, Culture=neutral, PublicKeyToken=9a7c71af6e5c398b, processorArchitecture=MSIL">
+ <HintPath>..\..\..\..\AppData\Local\WebGAC\Repository\RabbitMQ.Client.MessagePatterns\1.1.0.0_Release\RabbitMQ.Client.MessagePatterns.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AmqpEndToEndTest.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Net.LShift.Diffa.Messaging.AMQP\Net.LShift.Diffa.Messaging.AMQP.csproj">
+ <Project>{38735842-7EAE-451E-BB34-C1D88ED1E2EF}</Project>
+ <Name>Net.LShift.Diffa.Messaging.AMQP</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Net.LShift.Diffa.Participants\Net.LShift.Diffa.Participants.csproj">
+ <Project>{D3E42187-4C30-4519-AABC-44AE924F94E6}</Project>
+ <Name>Net.LShift.Diffa.Participants</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Net.LShift.Diffa.Messaging.AMQP.Test")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Net.LShift.Diffa.Messaging.AMQP.Test")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("81d1c6f3-1fb8-4606-8796-34c9a5e9e663")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
@@ -0,0 +1,164 @@
+//
+// Copyright (C) 2011 LShift Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+
+using RabbitMQ.Client.MessagePatterns.Unicast;
+
+using Net.LShift.Diffa.Messaging.AMQP;
+
+namespace Net.LShift.Diffa.Messaging.Amqp
+{
+ public class AmqpRpcServer : IDisposable
+ {
+ private readonly IMessaging _messaging;
+ private readonly IJsonRpcHandler _handler;
+ private Thread _worker;
+ private bool _disposing;
+
+ public AmqpRpcServer(IConnector connector, string queueName, IJsonRpcHandler handler)
+ {
+ _messaging = CreateMessaging(connector, queueName);
+ _handler = handler;
+ _worker = new Thread(WorkerLoop);
+ }
+
+ /// <summary>
+ /// Initialize the server's Messaging and start its worker thread
+ /// </summary>
+ public void Start()
+ {
+ _messaging.Init();
+ _worker.Start();
+ }
+
+ /// <summary>
+ /// Attempt to stop the worker thread gracefully, giving it time to finish handling and acking a message.
+ /// After a certain timeout, give up and dispose of the worker thread forcefully.
+ /// </summary>
+ public void Dispose()
+ {
+ lock (this)
+ {
+ if (_disposing)
+ {
+ return;
+ }
+ _disposing = true;
+ }
+ _messaging.Cancel();
+ if (_worker != null)
+ {
+ _worker.Join(2000);
+ _messaging.Dispose();
+ _worker.Join(2000);
+ _worker = null;
+ }
+ }
+
+ private IReceivedMessage Receive()
+ {
+ return _messaging.Receive(100);
+ }
+
+ private void Ack(IReceivedMessage message)
+ {
+ _messaging.Ack(message);
+ }
+
+ private void Reply(IReceivedMessage message, JsonTransportRequest request, JsonTransportResponse response)
+ {
+ var reply = message.CreateReply();
+ var headers = new Dictionary<string, object>
+ {
+ {AmqpRpc.EndpointHeader, request.Endpoint},
+ {AmqpRpc.StatusCodeHeader, response.Status}
+ };
+ reply.Properties.Headers = headers;
+ reply.Body = Serialize(response.Content);
+ _messaging.Send(reply);
+ }
+
+ private void ReceiveHandleAckReply()
+ {
+ var message = Receive();
+ var request = new JsonTransportRequest(EndpointFor(message), Deserialize(message.Body));
+ var response = _handler.HandleRequest(request);
+ Ack(message);
+ Reply(message, request, response);
+ }
+
+ private static IMessaging CreateMessaging(IConnector connector, string queueName)
+ {
+ var messaging = Factory.CreateMessaging();
+ messaging.Connector = connector;
+ messaging.QueueName = queueName;
+ messaging.SetupReceiver += channel => channel.QueueDeclare(queueName);
+ return messaging;
+ }
+
+ private void WorkerLoop()
+ {
+ while (true)
+ {
+ lock (this)
+ {
+ if (_disposing)
+ {
+ return;
+ }
+ }
+ ReceiveHandleAckReply();
+ }
+ }
+
+ private static byte[] Serialize(JObject obj)
+ {
+ var sw = new StringWriter();
+ var writer = new JsonTextWriter(sw);
+ obj.WriteTo(writer);
+ return Encoding.UTF8.GetBytes(sw.ToString());
+ }
+
+ private static JObject Deserialize(byte[] data)
+ {
+ return JObject.Parse(Encoding.UTF8.GetString(data));
+ }
+
+ private static String EndpointFor(IReceivedMessage message)
+ {
+ var headers = message.Properties.Headers;
+ var endpointHeader = headers[AmqpRpc.EndpointHeader];
+ return endpointHeader.ToString();
+ }
+
+ }
+
+ internal class AmqpRpc
+ {
+ public const String Encoding = "UTF-8";
+ public const String EndpointHeader = "rpc-endpoint";
+ public const String StatusCodeHeader = "rpc-status-code";
+ public const int DefaultStatusCode = 200;
+ }
+}
@@ -0,0 +1,23 @@
+//
+// Copyright (C) 2011 LShift Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+namespace Net.LShift.Diffa.Messaging.AMQP
+{
+ public interface IJsonRpcHandler
+ {
+ JsonTransportResponse HandleRequest(JsonTransportRequest request);
+ }
+}
Oops, something went wrong.

0 comments on commit b1a069a

Please sign in to comment.