Permalink
Browse files

The worker processes should listen for work through the application s…

…ervice bus
  • Loading branch information...
jrusbatch committed Oct 20, 2012
1 parent 9ccec91 commit a5402960fc7e121af0476d804460bf1d0df6bad4
View
Binary file not shown.
@@ -11,7 +11,7 @@ namespace Compilify.Web
{
public static class MessagingConfig
{
public static void ConfigureServiceBus(IServiceBus bus)
public static void ConfigureServiceBus()
{
Bus.Initialize(sbc =>
{
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web.Mvc;
using Compilify.Extensions;
using Compilify.LanguageServices;
using Compilify.Web.Models;
using MassTransit;
@@ -22,8 +18,6 @@ public class ExecuteEndPoint : PersistentConnection
private static readonly Task EmptyTask = Task.FromResult<object>(null);
private readonly IServiceBus bus;
static ExecuteEndPoint()
{
int timeout;
@@ -35,11 +29,6 @@ static ExecuteEndPoint()
ExecutionTimeout = TimeSpan.FromSeconds(timeout);
}
public ExecuteEndPoint(IServiceBus serviceBus)
{
bus = serviceBus;
}
protected override Task OnReceivedAsync(IRequest request, string connectionId, string data)
{
var viewModel = JsonConvert.DeserializeObject<PostViewModel>(data);
@@ -55,7 +44,7 @@ protected override Task OnReceivedAsync(IRequest request, string connectionId, s
TimeoutPeriod = ExecutionTimeout
};
bus.Publish(command);
Bus.Instance.Publish(command);
return EmptyTask;
}
View
@@ -1,7 +1,6 @@
using System.Web.Mvc;
using System.Web.Optimization;
using System.Web.Routing;
using MassTransit;
namespace Compilify.Web
{
@@ -19,7 +18,7 @@ protected void Application_Start()
FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
RouteConfig.RegisterRoutes(RouteTable.Routes);
BundleConfig.RegisterBundles(BundleTable.Bundles);
MessagingConfig.ConfigureServiceBus(Bus.Instance);
MessagingConfig.ConfigureServiceBus();
}
}
}
View
@@ -2,27 +2,18 @@
using System.Configuration;
using System.Diagnostics;
using System.Threading.Tasks;
using Compilify.DataAccess.Redis;
using Compilify.Infrastructure;
using Compilify.LanguageServices;
using Compilify.Messaging;
using Compilify.Models;
using Compilify.Serialization;
using MassTransit;
using Newtonsoft.Json;
using NLog;
namespace Compilify.Worker
{
public sealed class Program
{
private const int DefaultTimeout = 5000;
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static readonly ICodeCompiler Compiler = new CSharpCompiler();
private static readonly ISerializationProvider Serializer = new ProtobufSerializationProvider();
private static IQueue<EvaluateCodeCommand> queue;
private static IMessenger messenger;
public static int Main(string[] args)
{
@@ -35,105 +26,99 @@ public static int Main(string[] args)
TaskScheduler.UnobservedTaskException +=
(sender, e) => Logger.ErrorException("An unobserved task exception occurred", e.Exception);
var gateway = new RedisConnectionGateway(ConfigurationManager.AppSettings["REDISTOGO_URL"]);
queue = new RedisExecutionQueue(Serializer, gateway, 0, "queue:execute");
messenger = new RedisMessenger(gateway);
Bus.Initialize(sbc =>
{
sbc.UseRabbitMq();
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom(ConfigurationManager.AppSettings["CLOUDAMQP_URL"]);
sbc.Subscribe(subs => subs.Handler<EvaluateCodeCommand>(ProcessCommand));
});
ProcessQueue();
Console.ReadLine();
return -1; // Return a non-zero code so AppHarbor restarts the worker
}
public static void OnUnhandledApplicationException(object sender, UnhandledExceptionEventArgs e)
private static void ProcessCommand(EvaluateCodeCommand cmd)
{
var exception = e.ExceptionObject as Exception;
if (e.IsTerminating)
if (cmd == null)
{
Logger.FatalException("An unhandled exception is causing the worker to terminate.", exception);
return;
}
else
var timeInQueue = DateTime.UtcNow - cmd.Submitted;
Logger.Info("Job received after {0:N3} seconds in queue.", timeInQueue.TotalSeconds);
if (timeInQueue > cmd.TimeoutPeriod)
{
Logger.ErrorException("An unhandled exception occurred in the worker process.", exception);
Logger.Warn("Job was in queue for longer than {0} seconds, skipping!", cmd.TimeoutPeriod.Seconds);
return;
}
}
private static void ProcessQueue()
{
var startedOn = DateTime.UtcNow;
var stopWatch = new Stopwatch();
stopWatch.Start();
Logger.Info("ProcessQueue started.");
var assembly = Compiler.Compile(cmd);
while (true)
ExecutionResult result;
if (assembly == null)
{
var cmd = queue.Dequeue();
if (cmd == null)
result = new ExecutionResult
{
continue;
}
var timeInQueue = DateTime.UtcNow - cmd.Submitted;
Logger.Info("Job received after {0:N3} seconds in queue.", timeInQueue.TotalSeconds);
if (timeInQueue > cmd.TimeoutPeriod)
Result = "[compiling of code failed]"
};
}
else
{
using (var executor = new Sandbox())
{
Logger.Warn("Job was in queue for longer than {0} seconds, skipping!", cmd.TimeoutPeriod.Seconds);
continue;
result = executor.Execute(assembly, cmd.TimeoutPeriod);
}
}
var startedOn = DateTime.UtcNow;
stopWatch.Start();
stopWatch.Stop();
var stoppedOn = DateTime.UtcNow;
var assembly = Compiler.Compile(cmd);
Logger.Info("Work completed in {0} milliseconds.", stopWatch.ElapsedMilliseconds);
ExecutionResult result;
if (assembly == null)
{
result = new ExecutionResult
{
Result = "[compiling of code failed]"
};
}
else
try
{
var response = new WorkerResult
{
using (var executor = new Sandbox())
{
result = executor.Execute(assembly, cmd.TimeoutPeriod);
}
}
stopWatch.Stop();
var stoppedOn = DateTime.UtcNow;
ExecutionId = cmd.ExecutionId,
ClientId = cmd.ClientId,
StartTime = startedOn,
StopTime = stoppedOn,
RunDuration = stopWatch.Elapsed,
ProcessorTime = result.ProcessorTime,
TotalMemoryAllocated = result.TotalMemoryAllocated,
ConsoleOutput = result.ConsoleOutput,
Result = result.Result
};
Bus.Instance.Publish(response);
}
catch (JsonSerializationException ex)
{
Logger.ErrorException("An error occurred while attempting to serialize the JSON result.", ex);
}
Logger.Info("Work completed in {0} milliseconds.", stopWatch.ElapsedMilliseconds);
stopWatch.Reset();
}
try
{
var response = new WorkerResult
{
ExecutionId = cmd.ExecutionId,
ClientId = cmd.ClientId,
StartTime = startedOn,
StopTime = stoppedOn,
RunDuration = stopWatch.Elapsed,
ProcessorTime = result.ProcessorTime,
TotalMemoryAllocated = result.TotalMemoryAllocated,
ConsoleOutput = result.ConsoleOutput,
Result = result.Result
};
var message = Serializer.Serialize(response);
messenger.Publish("workers:job-done", message);
}
catch (JsonSerializationException ex)
{
Logger.ErrorException("An error occurred while attempting to serialize the JSON result.", ex);
}
public static void OnUnhandledApplicationException(object sender, UnhandledExceptionEventArgs e)
{
var exception = e.ExceptionObject as Exception;
stopWatch.Reset();
if (e.IsTerminating)
{
Logger.FatalException("An unhandled exception is causing the worker to terminate.", exception);
}
else
{
Logger.ErrorException("An unhandled exception occurred in the worker process.", exception);
}
}
}

0 comments on commit a540296

Please sign in to comment.