Permalink
Browse files

ServiceProxy.Redis now uses the new StackExchange.Redis API instead o…

…f Booksleeve.
  • Loading branch information...
mfelicio committed Apr 6, 2014
1 parent cd33f70 commit bbb2366a8c3de1cd7835f387f546280ab870088d
@@ -2,19 +2,19 @@
<package >
<metadata>
<id>ServiceProxy.Redis</id>
- <version>1.0.1</version>
+ <version>1.1.0</version>
<title>ServiceProxy.Redis</title>
<authors>Manuel Felicio</authors>
<owners>Manuel Felicio</owners>
<licenseUrl>http://opensource.org/licenses/mit-license.php</licenseUrl>
<projectUrl>http://github.com/mfelicio/ServiceProxy</projectUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
- <description>ServiceProxy.Redis is a simple request/reply messaging framework built on Redis queues that support service contracts using ServiceProxy.</description>
+ <description>ServiceProxy.Redis is a simple request/reply messaging framework built on Redis queues that support service contracts using ServiceProxy and the StackExchange.Redis library.</description>
<copyright>Copyright 2014 Manuel Felício</copyright>
<tags>redis messaging services request reply proxy async asynchronous</tags>
<dependencies>
<dependency id="ServiceProxy" version="1.0.1" />
- <dependency id="BookSleeve" version="1.3.41" />
+ <dependency id="StackExchange.Redis" version="1.0.247" />
</dependencies>
</metadata>
</package>
@@ -50,11 +50,11 @@ public void StopRedis()
{
var resolver = new DependencyResolver();
- using (var server = new RedisServer(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
+ using (var server = new RedisServer(new RedisConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
{
server.Listen();
- using (var client = new RedisClient(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
+ using (var client = new RedisClient(new RedisConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
{
var clientFactory = new ServiceProxy.ServiceClientFactory(client);
@@ -80,11 +80,11 @@ public void StopRedis()
{
var resolver = new DependencyResolver();
- using (var server = new RedisServer(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
+ using (var server = new RedisServer(new RedisConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
{
server.Listen();
- using (var client = new RedisClient(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
+ using (var client = new RedisClient(new RedisConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
{
var clientFactory = new ServiceClientFactory(client);
@@ -117,21 +117,21 @@ public void StopRedis()
[Test]
[TestCase(1, 1000)]
[TestCase(2, 10000)]
- //[TestCase(4, 100000)]
+ [TestCase(2, 100000)]
//[TestCase(5, 1000000)]
public async void TestLoadBalancing(int nServers, int nMsgs)
{
var resolver = new DependencyResolver();
var servers = Enumerable.Range(0, nServers)
- .Select(i => new RedisServer(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
+ .Select(i => new RedisServer(new RedisConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
.ToArray();
try
{
foreach (var server in servers) server.Listen();
- using (var client = new RedisClient(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
+ using (var client = new RedisClient(new RedisConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
{
var clientFactory = new ServiceClientFactory(client);
@@ -39,4 +39,24 @@ public static T ToObject<T>(byte[] objBytes)
}
}
}
+
+ static class TaskExtensions
+ {
+ public static async Task<T> IgnoreException<T>(this Task<T> task, params Type[] exceptionTypes)
+ {
+ try
+ {
+ return await task;
+ }
+ catch (Exception ex)
+ {
+ if (exceptionTypes.Any(type => type.IsAssignableFrom(ex.GetType())))
+ {
+ return default(T);
+ }
+
+ throw;
+ }
+ }
+ }
}
@@ -14,11 +14,11 @@ public async void TestSendAndReceive()
{
var resolver = new DependencyResolver();
- using (var server = new RedisServer(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
+ using (var server = new RedisServer(new RedisConnection(RedisHost, RedisPort, RedisPassword), ServerQueue, new ServiceFactory(resolver)))
{
server.Listen();
- using (var client = new RedisClient(new RedisDuplexConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
+ using (var client = new RedisClient(new RedisConnection(RedisHost, RedisPort, RedisPassword), ClientQueue, ServerQueue))
{
var clientFactory = new ServiceProxy.ServiceClientFactory(client);
@@ -40,7 +40,8 @@ It supports load balancing by having multiple servers listening on the same Redi
## Dependencies
-ServiceProxy.Redis uses the [Booksleeve][booksleeve-home] Redis client library, which has a Task based asynchronous API for all redis commands, integrating nicely with ServiceProxy.
+ServiceProxy.Redis now uses the [StackExchange.Redis][stackexchange.redis-github] client library, the successor of the [Booksleeve][booksleeve-home] Redis client library.
[serviceproxy.redis-nuget]: http://www.nuget.org/packages/ServiceProxy.Redis
-[booksleeve-home]: https://code.google.com/p/booksleeve/
+[booksleeve-home]: https://code.google.com/p/booksleeve/
+[stackexchange.redis-github]: https://github.com/StackExchange/StackExchange.Redis
@@ -1,4 +1,5 @@
using ServiceProxy;
+using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -11,9 +12,9 @@ namespace ServiceProxy.Redis
{
public class RedisClient : IClient, IDisposable
{
- private readonly RedisDuplexConnection connection;
+ private readonly RedisConnection connection;
- private readonly string[] receiveQueues;
+ private readonly string receiveQueue;
private readonly string sendQueue;
private readonly ConcurrentDictionary<string, TaskCompletionSource<ResponseData>> requestCallbacks;
@@ -22,10 +23,10 @@ public class RedisClient : IClient, IDisposable
private long receiveState;
private volatile Task receiveTask;
- public RedisClient(RedisDuplexConnection connection, string receiveQueue, string sendQueue)
+ public RedisClient(RedisConnection connection, string receiveQueue, string sendQueue)
{
this.connection = connection;
- this.receiveQueues = new string[] { receiveQueue };
+ this.receiveQueue = receiveQueue;
this.sendQueue = sendQueue;
this.nextId = 0;
this.receiveState = 0;
@@ -43,10 +44,11 @@ public Task<ResponseData> Request(RequestData request, CancellationToken token)
var requestId = this.NextId();
- var redisRequest = new RedisRequest(this.receiveQueues[0], requestId, request);
+ var redisRequest = new RedisRequest(this.receiveQueue, requestId, request);
var redisRequestBytes = redisRequest.ToBinary();
- this.connection.Sender.Lists.AddFirst(0, this.sendQueue, redisRequestBytes);
+ var redis = this.connection.GetClient();
+ var lpushTask = redis.ListLeftPushAsync(sendQueue, redisRequestBytes);
var callback = new TaskCompletionSource<ResponseData>();
this.requestCallbacks[requestId] = callback;
@@ -55,11 +57,18 @@ public Task<ResponseData> Request(RequestData request, CancellationToken token)
{
token.Register(() =>
{
- TaskCompletionSource<ResponseData> _;
- this.requestCallbacks.TryRemove(requestId, out _);
+ this.OnRequestCancelled(requestId);
});
}
+ lpushTask.ContinueWith(t =>
+ {
+ if (lpushTask.Exception != null)
+ {
+ this.OnRequestError(requestId, lpushTask.Exception.InnerException);
+ }
+ });
+
return callback.Task;
}
@@ -81,28 +90,54 @@ private void EnsureIsNotReceiving()
private async Task Receive()
{
+ int delayTimeout = 1;
+ byte[] rawResponse;
+
+ var redis = this.connection.GetClient();
+
while (Interlocked.Read(ref this.receiveState) == 1)
{
- var rawResponse = await this.connection.Receiver.Lists.BlockingRemoveLast(0, this.receiveQueues, 1);
+ rawResponse = await redis.ListRightPopAsync(receiveQueue).IgnoreException(typeof(RedisException));
if (rawResponse == null)
{
+ await Task.Delay(delayTimeout);
+ //maybe increase delayTimeout a little bit?
continue;
}
+ var redisResponseBytes = rawResponse;
+
Task.Run(() =>
{
- var redisResponseBytes = rawResponse.Item2;
var redisResponse = RedisResponse.FromBinary(redisResponseBytes);
+ this.OnResponse(redisResponse);
+ });
+ }
+
+ }
- TaskCompletionSource<ResponseData> callback;
- if (this.requestCallbacks.TryRemove(redisResponse.RequestId, out callback))
- {
- callback.SetResult(redisResponse.Response);
- }
+ private void OnResponse(RedisResponse redisResponse)
+ {
+ TaskCompletionSource<ResponseData> callback;
+ if (this.requestCallbacks.TryRemove(redisResponse.RequestId, out callback))
+ {
+ callback.SetResult(redisResponse.Response);
+ }
+ }
- });
+ private void OnRequestError(string requestId, Exception exception)
+ {
+ TaskCompletionSource<ResponseData> callback;
+ if (this.requestCallbacks.TryRemove(requestId, out callback))
+ {
+ callback.TrySetException(exception);
}
+ }
+ private void OnRequestCancelled(string requestId)
+ {
+ TaskCompletionSource<ResponseData> _;
+ this.requestCallbacks.TryRemove(requestId, out _);
}
public void Dispose()
@@ -0,0 +1,40 @@
+using StackExchange.Redis;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace ServiceProxy.Redis
+{
+ public class RedisConnection
+ {
+ private readonly ConnectionMultiplexer connectionManager;
+
+ public RedisConnection(string host, int port = 6379, string password = null)
+ {
+ var options = new ConfigurationOptions();
+ options.Password = password;
+ options.EndPoints.Add(host, port);
+ options.AbortOnConnectFail = false;
+
+ this.connectionManager = ConnectionMultiplexer.Connect(options);
+ }
+
+ public RedisConnection(ConnectionMultiplexer connectionMultiplexer)
+ {
+ this.connectionManager = connectionMultiplexer;
+ }
+
+ public IDatabase GetClient()
+ {
+ return this.connectionManager.GetDatabase();
+ }
+
+ public ISubscriber GetSubscriber()
+ {
+ return this.connectionManager.GetSubscriber();
+ }
+
+ }
+}
@@ -1,38 +0,0 @@
-using BookSleeve;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace ServiceProxy.Redis
-{
- public class RedisDuplexConnection
- {
- private readonly RedisConnection sender;
- private readonly RedisConnection receiver;
-
- public RedisDuplexConnection(string host, int port = 6379, string password = null)
- {
- this.sender = new RedisConnection(host, port: port, password: password);
- this.receiver = new RedisConnection(host, port: port, password: password);
-
- this.Open();
- }
-
- public RedisConnection Sender { get { return this.sender; } }
- public RedisConnection Receiver { get { return this.receiver; } }
-
- public void Open()
- {
- this.sender.Open();
- this.receiver.Open();
- }
-
- public void Close()
- {
- this.sender.Dispose();
- this.receiver.Dispose();
- }
- }
-}
Oops, something went wrong.

0 comments on commit bbb2366

Please sign in to comment.