Permalink
Browse files

RavenDB-1274 IIS crash in 2.5.2666 due to changes api

  • Loading branch information...
ayende committed Aug 25, 2013
1 parent c9df63c commit 747fa0bc28983368c3cc7ac8ecd7df089445ab22
@@ -8,27 +8,34 @@
namespace Raven.Client.Util
{
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return completionSource.Task; }
public Task WaitAsync() { return tcs.Task; }
public void Set() { completionSource.TrySetResult(true); }
public async Task<bool> WaitAsync(int timeout)
{
var task = tcs.Task;
#if !NET45
return await TaskEx.WhenAny(task, TaskEx.Delay(timeout)) == task;
public void Reset()
{
while (true)
{
var tcs = completionSource;
if (tcs.Task.IsCompleted == false)
return;
#else
return await Task.WhenAny(task, Task.Delay(timeout)) == task;
#endif
}
#pragma warning disable 420
if (Interlocked.CompareExchange(ref completionSource, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
#pragma warning restore 420
}
}
}
public void Set() { tcs.TrySetResult(true); }
public void Reset()
{
while (true)
{
var current = tcs;
if (!current.Task.IsCompleted ||
Interlocked.CompareExchange(ref tcs, new TaskCompletionSource<bool>(), current) == current)
return;
}
}
}
}
@@ -21,8 +21,6 @@ public class ConnectionState
private readonly ConcurrentSet<string> matchingBulkInserts =
new ConcurrentSet<string>(StringComparer.InvariantCultureIgnoreCase);
private readonly ConcurrentQueue<object> pendingMessages = new ConcurrentQueue<object>();
private EventsTransport eventsTransport;
private int watchAllDocuments;
@@ -47,7 +45,6 @@ public object DebugStatus
WatchDocumentPrefixes = matchingDocumentPrefixes.ToArray(),
WatchIndexes = matchingIndexes.ToArray(),
WatchDocuments = matchingDocuments.ToArray(),
PendingMessages = pendingMessages.Count
};
}
}
@@ -149,17 +146,10 @@ private void Enqueue(object msg)
{
if (eventsTransport == null || eventsTransport.Connected == false)
{
pendingMessages.Enqueue(msg);
return;
}
eventsTransport.SendAsync(msg)
.ContinueWith(task =>
{
if (task.IsFaulted == false)
return;
pendingMessages.Enqueue(msg);
});
eventsTransport.SendAsync(msg);
}
public void WatchAllDocuments()
@@ -205,23 +195,6 @@ public void UnwatchAllReplicationConflicts()
public void Reconnect(EventsTransport transport)
{
eventsTransport = transport;
var items = new List<object>();
object result;
while (pendingMessages.TryDequeue(out result))
{
items.Add(result);
}
eventsTransport.SendManyAsync(items)
.ContinueWith(task =>
{
if (task.IsFaulted == false)
return;
foreach (var item in items)
{
pendingMessages.Enqueue(item);
}
});
}
public void Dispose()
@@ -4,21 +4,18 @@
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Raven.Abstractions.Data;
using Raven.Abstractions.Logging;
using Raven.Database.Server.Abstractions;
using Raven.Database.Util;
using Raven.Imports.Newtonsoft.Json;
namespace Raven.Database.Server.Connections
{
public class EventsTransport : IDisposable
{
private readonly Timer heartbeat;
private readonly ILog log = LogManager.GetCurrentClassLogger();
private readonly IHttpContext context;
@@ -28,7 +25,8 @@ public class EventsTransport : IDisposable
public event Action Disconnected = delegate { };
private Task initTask;
private readonly ConcurrentQueue<object> msgs = new ConcurrentQueue<object>();
private readonly AsyncManualResetEvent manualResetEvent = new AsyncManualResetEvent();
public EventsTransport(IHttpContext context)
{
@@ -38,125 +36,51 @@ public EventsTransport(IHttpContext context)
if (string.IsNullOrEmpty(Id))
throw new ArgumentException("Id is mandatory");
heartbeat = new Timer(Heartbeat);
}
public Task ProcessAsync()
public async Task ProcessAsync()
{
context.Response.ContentType = "text/event-stream";
initTask = SendAsync(new { Type = "Initialized" });
Thread.MemoryBarrier();
heartbeat.Change(TimeSpan.Zero, TimeSpan.FromSeconds(10));
return initTask;
}
private void Heartbeat(object _)
{
try
{
SendAsync(new { Type = "Heartbeat" });
}
catch (Exception)
{
// we expect and should recover from errors
}
}
public Task SendAsync(object data)
{
try
{
if (initTask != null && // may be the very first time?
initTask.IsCompleted == false) // still pending on this...
return initTask.ContinueWith(_ => SendAsync(data)).Unwrap();
return context.Response.WriteAsync("data: " +
JsonConvert.SerializeObject(data, Formatting.None, new EtagJsonConverter()) +
"\r\n\r\n")
.ContinueWith(DisconnectOnError);
}
catch (Exception e)
{
DisconnectBecauseOfAnError(e);
throw;
}
}
public Task SendManyAsync(IEnumerable<object> data)
{
try
{
if (initTask.IsCompleted == false)
return initTask.ContinueWith(_ => SendManyAsync(data)).Unwrap();
var sb = new StringBuilder();
foreach (var o in data)
{
sb.Append("data: ")
.Append(JsonConvert.SerializeObject(o))
.Append("\r\n\r\n");
}
return context.Response.WriteAsync(sb.ToString())
.ContinueWith(DisconnectOnError);
}
catch (Exception e)
{
DisconnectBecauseOfAnError(e);
throw;
}
}
private void DisconnectOnError(Task prev)
{
prev.ContinueWith(task =>
{
if (task.IsFaulted == false)
return;
DisconnectBecauseOfAnError(task.Exception);
});
}
private void DisconnectBecauseOfAnError(Exception exception)
{
log.DebugException("Error when using events transport", exception);
try
{
Disconnect();
}
catch (ObjectDisposedException)
{
// already closed?
}
catch (Exception e)
{
log.DebugException("Could not close transport", e);
}
}
public void Disconnect()
{
if (heartbeat != null)
heartbeat.Dispose();
Connected = false;
Disconnected();
context.FinalizeResponse();
while (Connected)
{
try
{
var result = await manualResetEvent.WaitAsync(5000);
if (Connected == false)
return;
if (result == false)
{
await context.Response.WriteAsync("data: { 'Type': 'Heartbeat' }\r\n\r\n");
continue;
}
manualResetEvent.Reset();
object msg;
while (msgs.TryDequeue(out msg))
{
var obj = JsonConvert.SerializeObject(msg, Formatting.None, new EtagJsonConverter());
await context.Response.WriteAsync("data: " + obj + "\r\n\r\n");
}
}
catch (Exception e)
{
Connected = false;
log.DebugException("Error when using events transport", e);
Disconnected();
}
}
}
public void Dispose()
{
Connected = false;
Disconnected();
SendAsync(new { Type = "Disconnect" })
.ContinueWith(_ => context.FinalizeResponse());
}
public void Dispose()
{
Connected = false;
manualResetEvent.Set();
}
public void SendAsync(object msg)
{
msgs.Enqueue(msg);
manualResetEvent.Set();
}
}
}

0 comments on commit 747fa0b

Please sign in to comment.