Resque fixes #3

Merged
merged 4 commits into from Jul 27, 2012
View
@@ -1,13 +1,9 @@
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
+using System.Collections.Generic;
namespace Resque
{
public class Resque
{
- private readonly ConcurrentDictionary<string, Queue> _queues = new ConcurrentDictionary<string, Queue>();
public IJobCreator JobCreator { get; set; }
public IFailureService FailureService { get; set; }
public IRedis Client { get; set; }
@@ -29,14 +25,6 @@ public void StopAll()
worker.Shutdown = true;
}
}
- public void Push(string queue, string job, params string[] args)
- {
- GetQueue(queue).Push(new QueuedItem()
- {
- @class = job,
- args = args
- });
- }
public void Work(params string[] queues)
{
var worker = new Worker(JobCreator, FailureService, Client, queues);
@@ -49,9 +37,5 @@ public System.Threading.Tasks.Task WorkAsync(params string[] queues)
Workers.Add(worker);
return System.Threading.Tasks.Task.Factory.StartNew(() => worker.Work());
}
- private Queue GetQueue(string name)
- {
- return _queues.GetOrAdd(name, n => new Queue(Client, n));
- }
}
}
@@ -60,6 +60,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="ResqueClient.cs" />
<Compile Include="FailureBackend\IBackendFactory.cs" />
<Compile Include="FailureBackend\RedisBackend.cs" />
<Compile Include="FailureService.cs" />
@@ -0,0 +1,32 @@
+using System.Collections.Concurrent;
+
+namespace Resque
+{
+ public interface IResqueClient
+ {
+ void Push(string queue, string job, params string[] args);
+ }
+
+ public class ResqueClient : IResqueClient
+ {
+ private readonly ConcurrentDictionary<string, Queue> _queues = new ConcurrentDictionary<string, Queue>();
+ public IRedis Client { get; set; }
+
+ public ResqueClient(IRedis client)
+ {
+ Client = client;
+ }
+ public void Push(string queue, string job, params string[] args)
+ {
+ GetQueue(queue).Push(new QueuedItem()
+ {
+ @class = job,
+ args = args
+ });
+ }
+ private Queue GetQueue(string name)
+ {
+ return _queues.GetOrAdd(name, n => new Queue(Client, n));
+ }
+ }
+}
View
@@ -44,12 +44,17 @@ protected string[] RedisQueues
public DateTime Started { get { return DateTime.Parse(Client.Get(string.Format("worker:{0}:started", WorkerId))); } }
+ private string _workerId;
+
public string WorkerId
{
get
{
- return String.Format("{0}:{1}:{2}", _dnsName, _threadId, string.Join(",", Queues));
+ if(_workerId == null)
+ _workerId = String.Format("{0}:{1}:{2}", _dnsName, _threadId, string.Join(",", Queues));
+ return _workerId;
}
+ set { _workerId = value; }
}
public Worker(IJobCreator locator, IFailureService failureService, IRedis client, params string[] queues)