Skip to content

Commit

Permalink
only spawn the number of threads we want and send data from main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
tjfontaine committed Nov 25, 2008
1 parent 6a9dc01 commit 810b76a
Showing 1 changed file with 70 additions and 47 deletions.
117 changes: 70 additions & 47 deletions zz.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import getopt, os, sys, string, signal, struct, time

try:
from multiprocessing import Process as Thread, Queue
from multiprocessing import Process as Thread, Queue, Pipe
from Queue import Empty
MULTIPROCESSING = 'multiprocessing'
except Exception, ex:
try:
from processing import Process as Thread, Queue
from processing import Process as Thread, Queue, Pipe
from Queue import Empty
MULTIPROCESSING = 'processing'
except Exception, ex:
Expand All @@ -46,28 +46,37 @@

class BaseWorker(Thread):
ext = '.dummy'
def __init__(self, place, src, start, size, compression, queue):
def __init__(self, threadid, compression, queue, pipe):
Thread.__init__(self)
self.place = place
self.status = True
self.src = src
self.offset = start
self.fsize = size
self.threadid = threadid
self.comp = compression
self.queue = queue
self.pipe = pipe

def run(self):
src = open(self.src, 'rb')
src.seek(self.offset)
count = 0
def get_item(self):
try:
item = self.pipe.recv()
if item == 'STOP':
self.running = False
return None
else:
return item
except EOFError:
return None

self.raw_data = src.read(self.fsize)
data = self.compobj.compress(self.raw_data)
data += self.compobj.flush()
src.close()
self.queue.put((self.place, self.header(), self.suffix(), data))
data = None
self.raw_data = None
def run(self):
self.running = True
while self.running:
item = self.get_item()
if item:
compobj = self.get_compobj()
(self.raw_data, place) = item
self.fsize = len(self.raw_data)
data = compobj.compress(self.raw_data)
data += compobj.flush()
self.queue.put((self.threadid, place, self.header(), self.suffix(), data))
data = None
self.raw_data = None

class GzipWorker(BaseWorker):
try:
Expand All @@ -77,9 +86,9 @@ class GzipWorker(BaseWorker):
enabled = False

ext = '.gz'
def __init__(self, place, src, start, size, compression, queue):
BaseWorker.__init__(self, place, src, start, size, compression, queue)
self.compobj = self.zlib.compressobj(self.comp, self.zlib.DEFLATED, -self.zlib.MAX_WBITS, self.zlib.DEF_MEM_LEVEL, 0)

def get_compobj(self):
return self.zlib.compressobj(self.comp, self.zlib.DEFLATED, -self.zlib.MAX_WBITS, self.zlib.DEF_MEM_LEVEL, 0)

def header(self):
return GZIP_HEADER
Expand All @@ -96,9 +105,9 @@ class Bzip2Worker(BaseWorker):
enabled = False

ext = '.bz2'
def __init__(self, place, src, start, size, compression, queue):
BaseWorker.__init__(self, place, src, start, size, compression, queue)
self.compobj = self.bz2.BZ2Compressor(self.comp)

def get_compobj(self):
return self.bz2.BZ2Compressor(self.comp)

def header(self):
return ''
Expand Down Expand Up @@ -228,6 +237,8 @@ def __init__(self, opts):

self.source_size = os.stat(opts.source).st_size

self.source = open(opts.source, 'rb')

self.prepare_threads()

b = datetime.now()
Expand All @@ -243,16 +254,25 @@ def __init__(self, opts):

self.log(self.opts.timing, 'Total Time: '+str(e - b))

def cleanup(self):
for t in self.thread_started:
try:
t.join()
except:
self.log(self.opts.verbose, "Thread %d never started" % t.place)
self.source.close()

if os.path.exists(self.opts.destination):
self.cleanup()

def cleanup(self, err=False):
self.log(self.opts.verbose, 'Joining all threads')
for t,p in self.threads:
p.send('STOP')
p.close()
t.join()

if self.result_file:
self.result_file.close()

if err and os.path.exists(self.opts.destination):
os.remove(self.opts.destination)

self.source.close()

def prepare_threads(self):
self.thread_queue = []
self.completed = []
Expand All @@ -276,8 +296,7 @@ def prepare_threads(self):
else:
size = bsize

t = self.opts.worker(len(self.thread_queue), self.opts.source, count, size, self.opts.compression, self.event_queue)
self.thread_queue.append(t)
self.thread_queue.append((len(self.thread_queue), count, size))
self.completed.append(None)
self.log(self.opts.verbose, 'Added Worker %d (offset:%d, size:%d)' % (len(self.thread_queue), count, size))
count += size
Expand All @@ -292,35 +311,37 @@ def get_item(self):
def run_queue(self):
item = self.get_item()
while item:
(place, header, suffix, data) = item
self.log(self.opts.verbose, 'Thread Completed Piece %d' % (place+1))
(threadid, place, header, suffix, data) = item
self.log(self.opts.verbose, 'Completed Piece %d' % (place+1))
self.completed[place] = (header, suffix, data)
if len(self.thread_queue) > 0:
t = self.thread_queue.pop()
self.thread_started.append(t)
self.log(self.opts.verbose, 'Thread Started Piece %d' % (t.place+1))
t.start()
self.send_next_block(threadid)
self.combine()
item = self.get_item()

def send_next_block(self, threadid):
(place, offset, size) = self.thread_queue.pop()
self.log(self.opts.verbose, 'Started Piece %d' % (place+1))
self.source.seek(offset)
self.threads[threadid][1].send((self.source.read(size), place))

def start(self):
self.thread_queue.reverse()
self.last_started = 0
self.thread_started = []
self.threads = []

for i in range(self.opts.threads):
t = self.thread_queue.pop()
self.thread_started.append(t)
threadid = len(self.threads)
(parent, client) = Pipe()
t = self.opts.worker(threadid, self.opts.compression, self.event_queue, client)
self.threads.append((t, parent))
t.start()
self.send_next_block(threadid)

while self.last_completed < len(self.completed):
self.run_queue()
self.log(self.opts.verbose, 'Progress %d/%d' % (self.last_completed, len(self.completed)))

self.log(self.opts.verbose, 'Joining all threads')
for t in self.thread_started:
t.join()

def log(self, display, message):
if display: sys.stderr.write('[%s] %s%s' % (datetime.now(), message, os.linesep))

Expand All @@ -335,7 +356,9 @@ def combine(self):

while(next_block < len(self.completed) and self.completed[next_block]):
t = self.completed[next_block]
self.completed[next_block] = None
(header, suffix, data) = t
t = None
self.log(self.opts.verbose, "Combined %s" % (next_block+1))
src = self.result_file
src.write(header)
Expand Down

0 comments on commit 810b76a

Please sign in to comment.