Skip to content

Commit

Permalink
adding worker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
chad committed Feb 9, 2010
1 parent e972054 commit 8e86b6a
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 9 deletions.
32 changes: 27 additions & 5 deletions resque-sharp/Job.cs
Expand Up @@ -11,9 +11,9 @@ namespace resque
{
public class Job
{
Dictionary<string, object> payload;
string queue;

public Dictionary<string, object> payload { get; set; }
public string queue { get; set; }
public Worker worker{get; set;}
public Job(string queue, Dictionary<string, object> payload)
{
this.queue = queue;
Expand Down Expand Up @@ -46,8 +46,12 @@ internal static Job Reserve(string queue)

internal void perform()
{
//Type type = Type.GetType("resque.DummyJob", true);
//return (Job)Activator.CreateInstance(type);

System.Reflection.MethodInfo methodInfo = PayloadClass().GetMethod("perform", System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.FlattenHierarchy);
if (methodInfo == null)
throw new NotImplementedException();
methodInfo.Invoke(null, args().ToArray());

}

public ArrayList args()
Expand Down Expand Up @@ -96,6 +100,11 @@ private bool arrayListElementsAreEqual(ArrayList list, ArrayList otherList)
}
return true;
}

internal void fail(Exception e)
{
throw new NotImplementedException();
}
}


Expand All @@ -120,6 +129,19 @@ public static string queue()
}
}

public class BadJob
{
public static string queue()
{
return "tester";
}
public static string perform()
{
throw new Exception("Bad Job!!");
}
}


public class UninferrableInvalidJob
{
}
Expand Down
6 changes: 3 additions & 3 deletions resque-sharp/Resque.cs
Expand Up @@ -141,16 +141,16 @@ public static bool enqueue(string className, params object[] args)


#region encoding
private static string encode(object item)
public static string encode(object item)
{
return JsonConvert.SerializeObject(item);
}

private static object decode(string json)
public static object decode(string json)
{
return JsonConvert.DeserializeObject<Dictionary<string, object>>(json);
}
private static object decode(byte[] json)
public static object decode(byte[] json)
{
return decode(Encoding.UTF8.GetString(json));
}
Expand Down
150 changes: 150 additions & 0 deletions resque-sharp/Worker.cs
@@ -0,0 +1,150 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace resque
{
public class Worker
{
string[] queues;
public Worker(params string[] queues)
{
this.queues = queues;
}

public void work(int interval)
{
work(interval, null);
}

public void work(int interval, Func<Job,bool> block)
{
try
{
startup();
while (true)
{
Job job = reserve();
if (job != null)
{
process(job, block);
}
else
{
if (interval == 0)
break;
System.Threading.Thread.Sleep(interval * 1000);
}


}
}
finally
{
unregisterWorker();
}
}

private void unregisterWorker()
{
throw new NotImplementedException();
}

private void process(Job job, Func<Job, bool> block)
{
try
{
setWorkingOn(job);
job.perform();
}
catch (Exception e)
{
job.fail(e);
setFailed();
}
finally
{
if (block != null)
{
block(job);
}
setDoneWorking();
}
}

private void setFailed()
{
throw new NotImplementedException();
}

private void setWorkingOn(Job job)
{
job.worker = this;
string data = Resque.encode(new Dictionary<string, object>() { { "queue", job.queue }, { "run_at", currentTimeFormatted() }, { "payload", job.payload } });
Resque.redis().Set("resque:worker:" + workerId(), data);
}

private void setDoneWorking()
{
throw new NotImplementedException();
}

private void startup()
{
//pruneDeadWorkers();
registerWorker();
}

private void registerWorker()
{
Resque.redis().AddToSet("resque:workers", workerId());
setStarted();
}

private Job reserve()
{
foreach(string queue in queues) {
Job job = Job.Reserve(queue);
if (job != null)
{
return job;
}
}
return null;
}

private void setStarted()
{
currentTimeFormatted();
Resque.redis().Set(new Dictionary<string, byte[]>() { { startedKey(), Encoding.UTF8.GetBytes(currentTimeFormatted()) } });
}

private static string currentTimeFormatted()
{
DateTime currentTime = DateTime.Now;
string currentTimeFormatted = currentTime.ToString("ddd MMM dd hh:mm:ss zzzz yyyy");
return currentTimeFormatted;
}

private string startedKey()
{
return "resque:worker:" + workerId() + ":started";
}

public bool IsWorking()
{
return state() == "working";
}

public string state()
{
return Resque.redis().ContainsKey("resque:worker:" + workerId()) ? "working" : "idle";
}

internal string workerId()
{
return "FIXME";
}
}
}
1 change: 1 addition & 0 deletions resque-sharp/resque-sharp.csproj
Expand Up @@ -53,6 +53,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="redis-sharp.cs" />
<Compile Include="Resque.cs" />
<Compile Include="Worker.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Expand Down
8 changes: 7 additions & 1 deletion tests/ResqueTest.cs
Expand Up @@ -182,7 +182,6 @@ public void CanDeleteAQueue()
}

[Test]

public void BadlyWantsAClassName()
{
Assert.That(
Expand All @@ -191,6 +190,13 @@ public void BadlyWantsAClassName()
);
}

[Test]
public void KeepsStats()
{
Job.create("jobs", "resque.DummyJob", 20, "/tmp");

}

internal void EnqueueUninferrableJob()
{
Resque.enqueue("resque.UninferrableInvalidJob", 123);
Expand Down
49 changes: 49 additions & 0 deletions tests/WorkerTest.cs
@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NUnit.Framework;

namespace resque
{
[TestFixture]
public class WorkerTest
{
private Worker worker;
[SetUp]
public void Init()
{
Resque.setRedis(new Redis("192.168.1.119", 6379));
Resque.redis().FlushAll();
worker = new Worker("jobs");
Job.create("jobs", "resque.DummyJob", 20, "/tmp");
}

//[Test]
//public void CanFailJobs()
//{
// Job.create("jobs", "resque.BadJob");
// worker.work(0);
//}

[Test]
public void KnowsWhenItsWorking()
{
worker.work(0, (Job Job) => {Assert.That(worker.IsWorking(), Is.True); return true;});
}

[Test]
public void CanDoLambdaDambdaDingDong()
{
Assert.That(
foo(x => x.ToUpper()),
Is.EqualTo("LOL")
);
foo((string x) => { Assert.That(x, Is.EqualTo("lol")); return "blah"; });
}

public string foo(Func<string, string> rocker) {
return rocker("lol");
}
}
}
1 change: 1 addition & 0 deletions tests/tests.csproj
Expand Up @@ -59,6 +59,7 @@
<ItemGroup>
<Compile Include="ResqueTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="WorkerTest.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\resque-sharp\resque-sharp.csproj">
Expand Down

0 comments on commit 8e86b6a

Please sign in to comment.