Navigation Menu

Skip to content

Commit

Permalink
working on some more connection pool tests.. ThreadPool.QueueWorkItem…
Browse files Browse the repository at this point in the history
… is useless
  • Loading branch information
sdether committed Sep 1, 2010
1 parent 345eac6 commit a998314
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 1 deletion.
66 changes: 66 additions & 0 deletions Beanstalk.Client.IntegrationTest/PerformanceTests.cs
Expand Up @@ -22,6 +22,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using Droog.Beanstalk.Client.Net;
using Droog.Beanstalk.Client.Test;
using NUnit.Framework;

Expand Down Expand Up @@ -71,6 +73,70 @@ public class PerformanceTests {
Console.WriteLine("delete: {0:0} items/sec", n / stopwatch.Elapsed.TotalSeconds);
}
}

[Test]
public void Bulk_put_reserve_delete_with_multiple_connections() {
var pool = ConnectionPool.GetPool(TestConfig.Host, TestConfig.Port);
var goSignal = new ManualResetEvent(false);
var n = 10000;
var enqueue = 10;
var dequeue = 6;
var data = new List<MemoryStream>();
for(var i = 0; i < n; i++) {
data.Add(("data-" + i).AsStream());
}
var idx = -1;
var r = new Random();
var enqueued = 0;
WaitCallback enqueueWorker = id => {
goSignal.WaitOne();
Thread.Sleep((int)id * 100);
Console.WriteLine("enqueue worker {0:00} started", id);
var client = new BeanstalkClient(pool);
while(true) {
var i = Interlocked.Increment(ref idx);
if(i >= n) {
break;
}
var item = data[i];
Interlocked.Increment(ref enqueued);
client.Put(100, TimeSpan.Zero, TimeSpan.FromMinutes(2), item, item.Length);
}
client.Dispose();
Console.WriteLine("enqueue worker {0:00} finished", id);
};
var dequeued = 0;
WaitCallback dequeueWorker = id => {
goSignal.WaitOne();
Thread.Sleep(200 + (int)id * 100);
Console.WriteLine("dequeue worker {0:00} started", id);
var client = new BeanstalkClient(pool);
while(true) {
try {
var job = client.Reserve(TimeSpan.Zero);
client.Delete(job.Id);
Interlocked.Increment(ref dequeued);
} catch(TimedoutException) {
break;
}
}
client.Dispose();
Console.WriteLine("dequeue worker {0:00} finished", id);
};
for(var i = 0; i < dequeue; i++) {
ThreadPool.QueueUserWorkItem(dequeueWorker, i);
}
for(var i = 0; i < enqueue; i++) {
ThreadPool.QueueUserWorkItem(enqueueWorker, i);
}
Thread.Sleep(1000);
goSignal.Set();
while(dequeued < n) {
Thread.Sleep(500);
Console.WriteLine("{0}>{1} - busy: {2}, idle: {3}", dequeued, enqueued, pool.ActiveConnections, pool.IdleConnections);
}
}

private BeanstalkClient CreateClient() {
return new BeanstalkClient(TestConfig.Host, TestConfig.Port);
}
Expand Down
3 changes: 3 additions & 0 deletions Beanstalk.Client/BeanstalkClient.cs
Expand Up @@ -48,6 +48,9 @@ public BeanstalkClient(IConnectionPool pool)
}

public BeanstalkClient(ISocket socket) {
if (socket == null) {
throw new ArgumentNullException("socket");
}
_socket = socket;
InitSocket();
_currentTube = "default";
Expand Down
16 changes: 16 additions & 0 deletions Beanstalk.Client/Net/ConnectionPool.cs
Expand Up @@ -107,6 +107,22 @@ private class Available {
}
}

public int ActiveConnections {
get {
lock(_availableSockets) {
return _busySockets.Count;
}
}
}

public int IdleConnections {
get {
lock(_availableSockets) {
return _availableSockets.Count;
}
}
}

public ISocket GetSocket() {
lock(_availableSockets) {
ISocket socket = null;
Expand Down
4 changes: 3 additions & 1 deletion Beanstalk.Client/Net/IConnectionPool.cs
Expand Up @@ -19,6 +19,8 @@

namespace Droog.Beanstalk.Client.Net {
public interface IConnectionPool {
ISocket GetSocket();
int ActiveConnections { get; }
int IdleConnections { get; }
ISocket GetSocket();
}
}

0 comments on commit a998314

Please sign in to comment.