Skip to content

Commit

Permalink
Fixed some concurrency/race conditions during node failure (WIP?)
Browse files Browse the repository at this point in the history
  • Loading branch information
mnunberg committed Sep 25, 2012
1 parent 96fbece commit f2b161a
Showing 1 changed file with 46 additions and 21 deletions.
67 changes: 46 additions & 21 deletions Enyim.Caching/Memcached/MemcachedNode.cs
Expand Up @@ -139,6 +139,33 @@ public IPooledSocketResult Acquire()
return result; return result;
} }
} }
/// <summary>
/// Tries to perform any actions on failure. Returns true if the policy
/// indicates that we should fail this node
/// </summary>
/// <returns></returns>
private bool TryToFail()
{

bool shouldFail;
if (failurePolicy != null)
{
shouldFail = failurePolicy.ShouldFail();
}
else
{
shouldFail = true;
}

if (!shouldFail)
{
return false;
}
if (Failed != null) {
Failed(this);
}
return true;
}


~MemcachedNode() ~MemcachedNode()
{ {
Expand Down Expand Up @@ -375,23 +402,15 @@ public IPooledSocketResult Acquire()
private void MarkAsDead() private void MarkAsDead()
{ {
if (log.IsDebugEnabled) log.DebugFormat("Mark as dead was requested for {0}", this.endPoint); if (log.IsDebugEnabled) log.DebugFormat("Mark as dead was requested for {0}", this.endPoint);

if (ownerNode == null || ownerNode.TryToFail() == false)
var shouldFail = ownerNode.FailurePolicy.ShouldFail();

if (log.IsDebugEnabled) log.Debug("FailurePolicy.ShouldFail(): " + shouldFail);

if (shouldFail)
{ {
if (log.IsWarnEnabled) log.WarnFormat("Marking node {0} as dead", this.endPoint); return;

}
this.isAlive = false;
this.markedAsDeadUtc = DateTime.UtcNow;


var f = this.ownerNode.Failed; if (log.IsWarnEnabled) log.WarnFormat("Marking node {0} as dead", this.endPoint);


if (f != null) this.isAlive = false;
f(this.ownerNode); this.markedAsDeadUtc = DateTime.UtcNow;
}
} }


/// <summary> /// <summary>
Expand All @@ -413,9 +432,6 @@ private void ReleaseSocket(PooledSocket socket)
{ {
// mark the item as free // mark the item as free
this.freeItems.Push(socket); this.freeItems.Push(socket);

// signal the event so if someone is waiting for it can reuse this item
this.semaphore.Release();
} }
else else
{ {
Expand All @@ -424,10 +440,6 @@ private void ReleaseSocket(PooledSocket socket)


// mark ourselves as not working for a while // mark ourselves as not working for a while
this.MarkAsDead(); this.MarkAsDead();

// make sure to signal the Acquire so it can create a new conenction
// if the failure policy keeps the pool alive
this.semaphore.Release();
} }
} }
else else
Expand All @@ -436,6 +448,19 @@ private void ReleaseSocket(PooledSocket socket)
// are dead. so, kill the socket (this will eventually clear the pool as well) // are dead. so, kill the socket (this will eventually clear the pool as well)
socket.Destroy(); socket.Destroy();
} }

// In any event, we want to let any waiters know that we can create a new
// socket:
if (semaphore != null)
{
try
{
semaphore.Release();
}
catch (ObjectDisposedException)
{
}
}
} }




Expand Down

0 comments on commit f2b161a

Please sign in to comment.