Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added OnJobException to GearmanWorker, to make it easier to handle ex…

…ceptions.

Changed to using JobAssignment in IGearmanJob, instead of having separate properties. This is a breaking change, since it removes the old JobHandle and FunctionName properties. The feature is that it adds the raw byte[] function argument (i.e. not deserialized).
  • Loading branch information...
commit d4b8c964c5c0201ab2bf25af6ff206da56994622 1 parent 913a612
@osks osks authored
View
29 GearmanSharp/Exceptions/GearmanFunctionInternalException.cs
@@ -7,13 +7,32 @@
namespace Twingly.Gearman.Exceptions
{
/// <summary>
- /// Represents unexpected errors/exceptions that occured in a Gearman function when processing a Gearman job.
+ /// Represents an exception that occured in a registered job function when processing a Gearman job.
/// </summary>
public class GearmanFunctionInternalException : GearmanException
{
- public GearmanFunctionInternalException() { }
- public GearmanFunctionInternalException(string message) : base(message) { }
- public GearmanFunctionInternalException(string message, Exception innerException) : base(message, innerException) { }
- protected GearmanFunctionInternalException(SerializationInfo info, StreamingContext context) : base(info, context) { }
+ public JobAssignment JobAssignment { get; set; }
+
+ public GearmanFunctionInternalException(JobAssignment jobAssignment)
+ {
+ JobAssignment = jobAssignment;
+ }
+
+ public GearmanFunctionInternalException(JobAssignment jobAssignment, string message)
+ : base(message)
+ {
+ JobAssignment = jobAssignment;
+ }
+
+ public GearmanFunctionInternalException(JobAssignment jobAssignment, string message, Exception innerException)
+ : base(message, innerException)
+ {
+ JobAssignment = jobAssignment;
+ }
+
+ protected GearmanFunctionInternalException(SerializationInfo info, StreamingContext context)
+ : base(info, context)
+ {
+ }
}
}
View
16 GearmanSharp/GearmanJob.cs
@@ -4,6 +4,10 @@
namespace Twingly.Gearman
{
+ public delegate void GearmanJobFunction<TArg, TResult>(IGearmanJob<TArg, TResult> job)
+ where TArg : class
+ where TResult : class;
+
public enum GearmanJobPriority
{
High = 1,
@@ -19,8 +23,7 @@ public class GearmanJob<TArg, TResult> : IGearmanJob<TArg, TResult>
private readonly DataDeserializer<TArg> _deserializer;
private readonly GearmanWorkerProtocol _protocol;
- public string JobHandle { get; protected set; }
- public string FunctionName { get; protected set; }
+ public JobAssignment Info { get; protected set; }
public TArg FunctionArgument { get; protected set; }
public GearmanJob(GearmanWorkerProtocol protocol, JobAssignment jobAssignment,
@@ -29,24 +32,23 @@ public class GearmanJob<TArg, TResult> : IGearmanJob<TArg, TResult>
_serializer = resultSerializer;
_deserializer = argumentDeserializer;
_protocol = protocol;
- JobHandle = jobAssignment.JobHandle;
- FunctionName = jobAssignment.FunctionName;
+ Info = jobAssignment;
FunctionArgument = _deserializer(jobAssignment.FunctionArgument);
}
public void Complete()
{
- _protocol.WorkComplete(JobHandle);
+ _protocol.WorkComplete(Info.JobHandle);
}
public void Complete(TResult result)
{
- _protocol.WorkComplete(JobHandle, _serializer(result));
+ _protocol.WorkComplete(Info.JobHandle, _serializer(result));
}
public void Fail()
{
- _protocol.WorkFail(JobHandle);
+ _protocol.WorkFail(Info.JobHandle);
}
}
}
View
63 GearmanSharp/GearmanThreadedWorker.cs
@@ -9,10 +9,10 @@ namespace Twingly.Gearman
public class GearmanThreadedWorker : GearmanWorker
{
private const int _NO_JOB_COUNT_BEFORE_SLEEP = 10;
- private const int _NO_JOB_SLEEP_TIME = 1000;
- private const int _NO_SERVERS_SLEEP_TIME = 1000;
+ private const int _NO_JOB_SLEEP_TIME_MS = 1000;
+ private const int _NO_SERVERS_SLEEP_TIME_MS = 1000;
- private volatile bool _shouldQuit = false;
+ protected volatile bool ContinueWorking = false;
private readonly ManualResetEvent _resetEvent = new ManualResetEvent(false);
private readonly Thread _workLoopThread;
@@ -33,10 +33,39 @@ public GearmanThreadedWorker(ClusterConfigurationElement clusterConfiguration)
_workLoopThread = new Thread(WorkLoopThreadProc);
}
+ public void StartWorkLoop()
+ {
+ ContinueWorking = true;
+ _resetEvent.Reset();
+ _workLoopThread.Start();
+ }
+
+ public void StopWorkLoop()
+ {
+ ContinueWorking = false;
+ _resetEvent.Set();
+ if (_workLoopThread.IsAlive)
+ {
+ _workLoopThread.Join();
+ }
+ }
+
+ /// <summary>
+ /// Called when a job function throws an exception. Does nothing and returns false, to not abort the work loop.
+ /// </summary>
+ /// <param name="exception">The exception thrown by the job function.</param>
+ /// <param name="jobAssignment">The job assignment that the job function got.</param>
+ /// <returns>Return true if it should throw, or false if it should not throw after the return.</returns>
+ protected override bool OnJobException(Exception exception, JobAssignment jobAssignment)
+ {
+ // Don't throw the exception, as that would abort the work loop.
+ return false;
+ }
+
private void WorkLoopThreadProc()
{
var noJobCount = 0;
- while (!_shouldQuit)
+ while (ContinueWorking)
{
try
{
@@ -45,7 +74,7 @@ private void WorkLoopThreadProc()
if (aliveConnections.Count() < 1)
{
// No servers available, sleep for a while and try again later
- _resetEvent.WaitOne(_NO_SERVERS_SLEEP_TIME, false);
+ _resetEvent.WaitOne(_NO_SERVERS_SLEEP_TIME_MS, false);
_resetEvent.Reset();
noJobCount = 0;
}
@@ -53,7 +82,7 @@ private void WorkLoopThreadProc()
{
foreach (var connection in aliveConnections)
{
- if (_shouldQuit)
+ if (!ContinueWorking)
{
break;
}
@@ -64,7 +93,7 @@ private void WorkLoopThreadProc()
if (noJobCount >= _NO_JOB_COUNT_BEFORE_SLEEP)
{
- _resetEvent.WaitOne(_NO_JOB_SLEEP_TIME, false);
+ _resetEvent.WaitOne(_NO_JOB_SLEEP_TIME_MS, false);
_resetEvent.Reset();
noJobCount = 0;
}
@@ -72,27 +101,9 @@ private void WorkLoopThreadProc()
}
catch (Exception)
{
- _shouldQuit = true;
+ ContinueWorking = false;
throw;
}
-
- }
- }
-
- public void StartWorkLoop()
- {
- _shouldQuit = false;
- _resetEvent.Reset();
- _workLoopThread.Start();
- }
-
- public void StopWorkLoop()
- {
- _shouldQuit = true;
- _resetEvent.Set();
- if (_workLoopThread.IsAlive)
- {
- _workLoopThread.Join();
}
}
}
View
114 GearmanSharp/GearmanWorker.cs
@@ -8,8 +8,6 @@
namespace Twingly.Gearman
{
- public delegate void GearmanJobFunction<TArg, TResult>(IGearmanJob<TArg, TResult> job) where TArg : class where TResult : class;
-
public class GearmanWorker : GearmanConnectionManager
{
protected struct FunctionInformation
@@ -96,12 +94,70 @@ public bool Work()
throw new NoServerAvailableException("No job servers");
}
+ protected bool Work(IGearmanConnection connection)
+ {
+ try
+ {
+ var protocol = new GearmanWorkerProtocol(connection);
+ var jobAssignment = protocol.GrabJob();
+
+ if (jobAssignment == null)
+ return false;
+
+ if (!_functionInformation.ContainsKey(jobAssignment.FunctionName))
+ throw new GearmanApiException(String.Format("Received work for unknown function {0}", jobAssignment.FunctionName));
+
+ CallFunction(protocol, jobAssignment);
+ return true;
+ }
+ catch (GearmanConnectionException)
+ {
+ connection.MarkAsDead();
+ return false;
+ }
+ catch (GearmanFunctionInternalException functionException)
+ {
+ // The job function threw an exception. Just as with other exceptions, we disconnect
+ // from the server because we don't want the job to be removed. See general exception
+ // catch for more information.
+ connection.Disconnect();
+ var shouldThrow = OnJobException(functionException.InnerException, functionException.JobAssignment);
+ if (shouldThrow)
+ {
+ throw;
+ }
+ return false;
+ }
+ catch (Exception)
+ {
+ // We failed to call the function and there isn't any good response to send the server.
+ // According to this response on the mailing list, the best action is probably to close the connection:
+ // "A worker disconnect with no response message is currently how the server's retry behavior is triggered."
+ // http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
+ //
+ // We can't send Complete or Fail for the job, because that would cause the job to be "done" and the server wouldn't retry.
+ connection.Disconnect();
+ throw;
+ }
+ }
+
protected override void OnConnectionConnected(IGearmanConnection connection)
{
RegisterAllFunctions(connection);
SetClientId(connection);
}
+ /// <summary>
+ /// Called when a job function throws an exception. The default implementation returns true.
+ /// </summary>
+ /// <param name="exception">The exception thrown by the job function.</param>
+ /// <param name="jobAssignment">The job assignment that the job function got.</param>
+ /// <returns>Return true if it should throw, or false if it should not throw after the return.</returns>
+ protected virtual bool OnJobException(Exception exception, JobAssignment jobAssignment)
+ {
+ return true;
+ }
+
private void SetClientId(IGearmanConnection connection)
{
try
@@ -163,40 +219,6 @@ private static void RegisterFunction(IGearmanConnection connection, string funct
});
}
- protected bool Work(IGearmanConnection connection)
- {
- try
- {
- var protocol = new GearmanWorkerProtocol(connection);
- var jobAssignment = protocol.GrabJob(); // This can throw GearmanConnectionException, how do we handle that?
-
- if (jobAssignment == null)
- return false;
-
- if (!_functionInformation.ContainsKey(jobAssignment.FunctionName))
- throw new GearmanApiException(String.Format("Received work for unknown function {0}", jobAssignment.FunctionName));
-
- CallFunction(protocol, jobAssignment);
- return true;
- }
- catch (GearmanConnectionException)
- {
- connection.MarkAsDead();
- return false;
- }
- catch (Exception)
- {
- // We failed to call the function and there isn't any good response to send the server.
- // According to this response on the mailing list, the best action is probably to close the connection:
- // "A worker disconnect with no response message is currently how the server's retry behavior is triggered."
- // http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
- //
- // We can't send Complete or Fail for the job, because that would cause the job to be "done" and the server wouldn't retry.
- connection.Disconnect();
- throw;
- }
- }
-
private void CallFunction(GearmanWorkerProtocol protocol, JobAssignment jobAssignment)
{
var functionInformation = _functionInformation[jobAssignment.FunctionName];
@@ -223,21 +245,23 @@ private void CallFunction(GearmanWorkerProtocol protocol, JobAssignment jobAssig
{
functionInformation.Function.DynamicInvoke(job);
}
- catch (MemberAccessException ex)
- {
- throw new GearmanException("Failed to invoke the function dynamically", ex);
- }
- catch (TargetException ex)
- {
- throw new GearmanException("Failed to invoke the function dynamically", ex);
- }
catch (TargetInvocationException ex)
{
+ if (ex.InnerException != null)
+ {
+ // Remove the TargetInvocationException wrapper that DynamicInvoke added,
+ // so we can give the user the exception from the job function.
+ throw new GearmanFunctionInternalException(
+ jobAssignment,
+ String.Format("Function '{0}' threw exception", jobAssignment.FunctionName), ex.InnerException);
+ }
+
+ // If there is no inner exception, something strange is up, so then we want to throw this exception.
throw new GearmanException("Failed to invoke the function dynamically", ex);
}
catch (Exception ex)
{
- throw new GearmanFunctionInternalException(String.Format("Function '{0}' threw exception", jobAssignment.FunctionName), ex);
+ throw new GearmanException("Failed to invoke the function dynamically", ex);
}
}
}
View
7 GearmanSharp/IGearmanJob.cs
@@ -5,8 +5,11 @@ namespace Twingly.Gearman
{
public interface IGearmanJob<TArg, TResult>
{
- string JobHandle { get; }
- string FunctionName { get; }
+ JobAssignment Info { get; }
+
+ /// <summary>
+ /// The deserialized function argument.
+ /// </summary>
TArg FunctionArgument { get; }
void Complete();
Please sign in to comment.
Something went wrong with that request. Please try again.