Permalink
Browse files

MB-6867: extend Queue class for python 2.4

Conflicts:

	Makefile.am
	t/pump_test.py

Change-Id: I83c0eb71e532220f7e1ace2f3ee8f68c9ee07768
Reviewed-on: http://review.couchbase.org/21514
Reviewed-by: Pavel Paulau <pavel.paulau@gmail.com>
Tested-by: Bin Cui <bin.cui@gmail.com>
  • Loading branch information...
1 parent bdf2e84 commit e8a267b67e169cde1815ec4514c74ea7660d491c @bcui6611 bcui6611 committed with Farshid Ghods Oct 10, 2012
Showing with 48 additions and 4 deletions.
  1. +1 −0 Makefile.am
  2. +39 −0 cbqueue.py
  3. +4 −2 pump.py
  4. +2 −0 pump_sfd.py
  5. +2 −2 t/pump_test.py
View
@@ -7,6 +7,7 @@ pythonlibdir=$(libdir)/python
nobase_pythonlib_DATA= \
buckets.py \
cbcollections.py \
+ cbqueue.py \
cluster_stats.py \
collector.py \
diskqueue_stats.py \
View
@@ -0,0 +1,39 @@
+""" Extend Queue class for python 2.4"""
+import threading
+import sys
+from Queue import Queue
+
+if sys.version >= "2.5":
+ class PumpQueue(Queue):
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+else:
+ class PumpQueue(Queue):
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+ self.all_tasks_done = threading.Condition(self.mutex)
+ self.unfinished_tasks = 0
+
+ def _put(self, item):
+ Queue._put(self, item)
+ self.unfinished_tasks += 1
+
+ def task_done(self):
+ self.all_tasks_done.acquire()
+ try:
+ unfinished = self.unfinished_tasks - 1
+ if unfinished <= 0:
+ if unfinished < 0:
+ raise ValueError('task_done() called too many times')
+ self.all_tasks_done.notifyAll()
+ self.unfinished_tasks = unfinished
+ finally:
+ self.all_tasks_done.release()
+
+ def join(self):
+ self.all_tasks_done.acquire()
+ try:
+ while self.unfinished_tasks:
+ self.all_tasks_done.wait()
+ finally:
+ self.all_tasks_done.release()
View
@@ -4,7 +4,6 @@
import copy
import httplib
import logging
-import Queue
import re
import simplejson as json
import string
@@ -17,6 +16,8 @@
import memcacheConstants
from cbcollections import defaultdict
+from cbqueue import PumpQueue
+
# TODO: (1) optionally log into backup directory
LOGGING_FORMAT = '%(asctime)s: %(threadName)s %(message)s'
@@ -274,7 +275,8 @@ def start_workers(self, queue_size):
if self.queue:
return
- self.queue = Queue.Queue(queue_size)
+ self.queue = PumpQueue(queue_size)
+
threads = [threading.Thread(target=PumpingStation.run_worker,
name="w" + str(i), args=(self, i))
for i in range(self.opts.threads)]
View
@@ -14,6 +14,8 @@
import pump
from cbcollections import defaultdict
+from cbqueue import PumpQueue
+
SFD_SCHEME = "couchstore-files://"
SFD_VBUCKETS = 1024
SFD_REV_META = ">QII" # cas, exp, flg
View
@@ -8,7 +8,6 @@
import glob
import logging
import os
-import Queue
import select
import simplejson as json
import shutil
@@ -33,6 +32,7 @@
from memcacheConstants import *
from cbcollections import defaultdict
+from cbqueue import PumpQueue
# TODO: (1) test multiple buckets.
# TODO: (1) test TAP ttl / time-to-live field.
@@ -167,7 +167,7 @@ def __init__(self, port):
def reset(self, test=None):
self.test = test
self.sessions = {}
- self.queue = Queue.Queue(1000)
+ self.queue = PumpQueue(1000)
def host_port(self):
return self.host + ":" + str(self.port)

0 comments on commit e8a267b

Please sign in to comment.