From a9983140adb4dd4d508b4868999db6d1e8405ec1 Mon Sep 17 00:00:00 2001 From: Arne Claassen Date: Tue, 31 Aug 2010 22:49:52 -0700 Subject: [PATCH] working on some more connection pool tests.. ThreadPool.QueueWorkItem is useless --- .../PerformanceTests.cs | 66 +++++++++++++++++++ Beanstalk.Client/BeanstalkClient.cs | 3 + Beanstalk.Client/Net/ConnectionPool.cs | 16 +++++ Beanstalk.Client/Net/IConnectionPool.cs | 4 +- 4 files changed, 88 insertions(+), 1 deletion(-) diff --git a/Beanstalk.Client.IntegrationTest/PerformanceTests.cs b/Beanstalk.Client.IntegrationTest/PerformanceTests.cs index ec12e1f..5571e7e 100644 --- a/Beanstalk.Client.IntegrationTest/PerformanceTests.cs +++ b/Beanstalk.Client.IntegrationTest/PerformanceTests.cs @@ -22,6 +22,8 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Threading; +using Droog.Beanstalk.Client.Net; using Droog.Beanstalk.Client.Test; using NUnit.Framework; @@ -71,6 +73,70 @@ public class PerformanceTests { Console.WriteLine("delete: {0:0} items/sec", n / stopwatch.Elapsed.TotalSeconds); } } + + [Test] + public void Bulk_put_reserve_delete_with_multiple_connections() { + var pool = ConnectionPool.GetPool(TestConfig.Host, TestConfig.Port); + var goSignal = new ManualResetEvent(false); + var n = 10000; + var enqueue = 10; + var dequeue = 6; + var data = new List(); + for(var i = 0; i < n; i++) { + data.Add(("data-" + i).AsStream()); + } + var idx = -1; + var r = new Random(); + var enqueued = 0; + WaitCallback enqueueWorker = id => { + goSignal.WaitOne(); + Thread.Sleep((int)id * 100); + Console.WriteLine("enqueue worker {0:00} started", id); + var client = new BeanstalkClient(pool); + while(true) { + var i = Interlocked.Increment(ref idx); + if(i >= n) { + break; + } + var item = data[i]; + Interlocked.Increment(ref enqueued); + client.Put(100, TimeSpan.Zero, TimeSpan.FromMinutes(2), item, item.Length); + } + client.Dispose(); + Console.WriteLine("enqueue worker {0:00} finished", id); + }; + var dequeued = 0; + WaitCallback dequeueWorker = id => { + goSignal.WaitOne(); + Thread.Sleep(200 + (int)id * 100); + Console.WriteLine("dequeue worker {0:00} started", id); + var client = new BeanstalkClient(pool); + while(true) { + try { + var job = client.Reserve(TimeSpan.Zero); + client.Delete(job.Id); + Interlocked.Increment(ref dequeued); + } catch(TimedoutException) { + break; + } + } + client.Dispose(); + Console.WriteLine("dequeue worker {0:00} finished", id); + }; + for(var i = 0; i < dequeue; i++) { + ThreadPool.QueueUserWorkItem(dequeueWorker, i); + } + for(var i = 0; i < enqueue; i++) { + ThreadPool.QueueUserWorkItem(enqueueWorker, i); + } + Thread.Sleep(1000); + goSignal.Set(); + while(dequeued < n) { + Thread.Sleep(500); + Console.WriteLine("{0}>{1} - busy: {2}, idle: {3}", dequeued, enqueued, pool.ActiveConnections, pool.IdleConnections); + } + } + private BeanstalkClient CreateClient() { return new BeanstalkClient(TestConfig.Host, TestConfig.Port); } diff --git a/Beanstalk.Client/BeanstalkClient.cs b/Beanstalk.Client/BeanstalkClient.cs index e173a3e..6769f6e 100644 --- a/Beanstalk.Client/BeanstalkClient.cs +++ b/Beanstalk.Client/BeanstalkClient.cs @@ -48,6 +48,9 @@ public BeanstalkClient(IConnectionPool pool) } public BeanstalkClient(ISocket socket) { + if (socket == null) { + throw new ArgumentNullException("socket"); + } _socket = socket; InitSocket(); _currentTube = "default"; diff --git a/Beanstalk.Client/Net/ConnectionPool.cs b/Beanstalk.Client/Net/ConnectionPool.cs index 3fec42d..c7f4865 100644 --- a/Beanstalk.Client/Net/ConnectionPool.cs +++ b/Beanstalk.Client/Net/ConnectionPool.cs @@ -107,6 +107,22 @@ private class Available { } } + public int ActiveConnections { + get { + lock(_availableSockets) { + return _busySockets.Count; + } + } + } + + public int IdleConnections { + get { + lock(_availableSockets) { + return _availableSockets.Count; + } + } + } + public ISocket GetSocket() { lock(_availableSockets) { ISocket socket = null; diff --git a/Beanstalk.Client/Net/IConnectionPool.cs b/Beanstalk.Client/Net/IConnectionPool.cs index 133809d..f416426 100644 --- a/Beanstalk.Client/Net/IConnectionPool.cs +++ b/Beanstalk.Client/Net/IConnectionPool.cs @@ -19,6 +19,8 @@ namespace Droog.Beanstalk.Client.Net { public interface IConnectionPool { - ISocket GetSocket(); + int ActiveConnections { get; } + int IdleConnections { get; } + ISocket GetSocket(); } } \ No newline at end of file