Skip to content

Commit

Permalink
moved to a single pool of fixed redis objects each with their own ded…
Browse files Browse the repository at this point in the history
…icated connection
  • Loading branch information
Karl Seguin committed Nov 4, 2011
1 parent 8b6269c commit 0c97ba9
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 174 deletions.
8 changes: 1 addition & 7 deletions Metsys.Redis/Connections/Connection.cs
Expand Up @@ -8,7 +8,6 @@ public interface IConnection : IDisposable
{
void Send(byte[] data, int length);
Stream GetStream();
DateTime Created { get; }
bool IsAlive();
}

Expand All @@ -20,11 +19,6 @@ public class Connection : IConnection
private readonly NetworkStream _stream;
private bool _isValid;

public DateTime Created
{
get { return _created; }
}

public Connection(ConnectionInfo connectionInfo)
{
_client = new TcpClient
Expand Down Expand Up @@ -59,7 +53,7 @@ public Stream GetStream()

public bool IsAlive()
{
return _client.Connected && _isValid;
return DateTime.Now.Subtract(_created).TotalMinutes < 10 && _client.Connected && _isValid;
}

public void Dispose()
Expand Down
94 changes: 0 additions & 94 deletions Metsys.Redis/Connections/ConnectionPool.cs

This file was deleted.

2 changes: 0 additions & 2 deletions Metsys.Redis/Metsys.Redis.csproj
Expand Up @@ -40,8 +40,6 @@
<Compile Include="Configuration.cs" />
<Compile Include="Connections\Connection.cs" />
<Compile Include="Connections\ConnectionInfo.cs" />
<Compile Include="Connections\ConnectionPool.cs" />
<Compile Include="Pool.cs" />
<Compile Include="IRedis.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Protocol\Encoding.cs" />
Expand Down
35 changes: 0 additions & 35 deletions Metsys.Redis/Pool.cs

This file was deleted.

51 changes: 29 additions & 22 deletions Metsys.Redis/Redis.cs
Expand Up @@ -10,12 +10,6 @@ public class Redis : IRedis
private DynamicBuffer _dynamicBuffer;
private IConnection _connection;
private bool _selectedADatabase;
private bool _error;

public IConnection Connection
{
get { return _connection; }
}

internal Redis(RedisManager manager)
{
Expand Down Expand Up @@ -70,27 +64,42 @@ private void Select(int database, bool flagAsDifferent)

private T Send<T>(DynamicBuffer context, Func<Stream, DynamicBuffer, T> callback)
{
EnsureConnection();
try
{
_connection.Send(context.Buffer, context.Length);
return callback(_connection.GetStream(), _dynamicBuffer);
}
catch
{
KillConnection();
throw;
}
}

private void EnsureConnection()
{
if (_connection != null && !_connection.IsAlive())
{
KillConnection();
}
if (_connection == null)
{
_error = false;
if (_manager.GetConnection(out _connection) && _configuration.Database != 0)
_connection = _manager.GetConnection();
if (_configuration.Database != 0)
{
var realBuffer = _dynamicBuffer;
_dynamicBuffer = new DynamicBuffer();
Select(_configuration.Database, false);
_dynamicBuffer = realBuffer;
}
}
try
{
Connection.Send(context.Buffer, context.Length);
return callback(Connection.GetStream(), _dynamicBuffer);
}
catch
{
_error = true;
throw;
}
}

private void KillConnection()
{
_connection.Dispose();
_connection = null;
}

public void Dispose()
Expand All @@ -102,13 +111,11 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (!_error && _selectedADatabase)
if (_connection != null && _selectedADatabase)
{
Select(_configuration.Database, false);
}
_manager.CheckIn(this, _error);
_error = false;
_connection = null;
_manager.CheckIn(this);
}
}

Expand Down
42 changes: 28 additions & 14 deletions Metsys.Redis/RedisManager.cs
@@ -1,4 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;

namespace Metsys.Redis
{
Expand All @@ -9,9 +11,9 @@ public interface IRedisManager

public class RedisManager : IRedisManager
{
private ConnectionPool _connectionPool;
private Pool<Redis> _redisPool;
private readonly ConcurrentQueue<IRedis> _redisPool;
private readonly Configuration _configuration = new Configuration();
private readonly AutoResetEvent _notifier = new AutoResetEvent(false);

public Configuration Configuration
{
Expand All @@ -22,29 +24,41 @@ public static IRedisManager Configure(Action<IConfiguration> action)
{
var manager = new RedisManager();
action(manager._configuration);
manager._connectionPool = new ConnectionPool(manager._configuration.Server);
manager._redisPool = new Pool<Redis>(25, p => new Redis(manager));
return manager;
}

private RedisManager()
{
_redisPool = new ConcurrentQueue<IRedis>();
for(var i = 0; i < 25; ++i)
{
_redisPool.Enqueue(new Redis(this));
}
}

public IRedis Redis()
{
return _redisPool.CheckOut();
IRedis redis;
if (_redisPool.TryDequeue(out redis))
{
return redis;
}
if (!_notifier.WaitOne(10000))
{
throw new RedisException("Timed out waiting for a connection from the pool");
}
return Redis();
}

public bool GetConnection(out IConnection connection)
public Connection GetConnection()
{
return _connectionPool.CheckOut(out connection);
return new Connection(_configuration.Server);
}

public void CheckIn(Redis redis, bool error)
public void CheckIn(Redis redis)
{
var connection = redis.Connection;
if (connection != null)
{
_connectionPool.CheckIn(connection, error);
}
_redisPool.CheckIn(redis);
_redisPool.Enqueue(redis);
_notifier.Set();
}
}
}

0 comments on commit 0c97ba9

Please sign in to comment.