Skip to content

Commit

Permalink
Make use of pickle in ipc.pool work on py3 as well
Browse files Browse the repository at this point in the history
  • Loading branch information
kovidgoyal committed Mar 15, 2019
1 parent 20b9cc3 commit d3ccd43
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions src/calibre/utils/ipc/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'

import os, cPickle, sys
import os, sys
from threading import Thread
from collections import namedtuple
from Queue import Queue
Expand All @@ -16,6 +16,7 @@
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils import join_with_timeout
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_dumps, msgpack_loads, pickle_dumps, pickle_loads

Job = namedtuple('Job', 'id module func args kwargs')
Result = namedtuple('Result', 'value err traceback')
Expand Down Expand Up @@ -95,7 +96,7 @@ def __init__(self, p, conn, events, name):
self.name = name or ''

def __call__(self, job):
eintr_retry_call(self.conn.send_bytes, cPickle.dumps(job, -1))
eintr_retry_call(self.conn.send_bytes, pickle_dumps(job))
if job is not None:
self.job_id = job.id
t = Thread(target=self.recv, name='PoolWorker-'+self.name)
Expand All @@ -104,7 +105,7 @@ def __call__(self, job):

def recv(self):
try:
result = cPickle.loads(eintr_retry_call(self.conn.recv_bytes))
result = pickle_loads(eintr_retry_call(self.conn.recv_bytes))
wr = WorkerResult(self.job_id, result, False, self)
except Exception as err:
import traceback
Expand All @@ -130,7 +131,7 @@ def __init__(self, max_workers=None, name=None):
self.results = Queue()
self.tracker = Queue()
self.terminal_failure = None
self.common_data = cPickle.dumps(None, -1)
self.common_data = pickle_dumps(None)
self.worker_data = None
self.shutting_down = False

Expand Down Expand Up @@ -192,7 +193,7 @@ def create_worker(self):
p.stdin.flush(), p.stdin.close()
conn = eintr_retry_call(self.listener.accept)
w = Worker(p, conn, self.events, self.name)
if self.common_data != cPickle.dumps(None, -1):
if self.common_data != pickle_dumps(None):
w.set_common_data(self.common_data)
return w

Expand All @@ -211,7 +212,7 @@ def run(self):
from calibre.utils.ipc.server import create_listener
self.auth_key = os.urandom(32)
self.address, self.listener = create_listener(self.auth_key)
self.worker_data = cPickle.dumps((self.address, self.auth_key), -1)
self.worker_data = msgpack_dumps((self.address, self.auth_key))
if self.start_worker() is False:
return

Expand Down Expand Up @@ -243,12 +244,12 @@ def handle_event(self, event):
return False
self.results.put(worker_result)
else:
self.common_data = cPickle.dumps(event, -1)
self.common_data = pickle_dumps(event)
if len(self.common_data) > MAX_SIZE:
self.cd_file = PersistentTemporaryFile('pool_common_data')
with self.cd_file as f:
f.write(self.common_data)
self.common_data = cPickle.dumps(File(f.name), -1)
self.common_data = pickle_dumps(File(f.name))
for worker in self.available_workers:
try:
worker.set_common_data(self.common_data)
Expand Down Expand Up @@ -340,7 +341,7 @@ def worker_main(conn):
common_data = None
while True:
try:
job = cPickle.loads(eintr_retry_call(conn.recv_bytes))
job = pickle_loads(eintr_retry_call(conn.recv_bytes))
except EOFError:
break
except KeyboardInterrupt:
Expand All @@ -354,7 +355,9 @@ def worker_main(conn):
break
if not isinstance(job, Job):
if isinstance(job, File):
common_data = cPickle.load(open(job.name, 'rb'))
with lopen(job.name, 'rb') as f:
common_data = f.read()
common_data = pickle_loads(common_data)
else:
common_data = job
continue
Expand All @@ -374,7 +377,7 @@ def worker_main(conn):
import traceback
result = Result(None, as_unicode(err), traceback.format_exc())
try:
eintr_retry_call(conn.send_bytes, cPickle.dumps(result, -1))
eintr_retry_call(conn.send_bytes, pickle_dumps(result))
except EOFError:
break
except Exception:
Expand All @@ -388,7 +391,8 @@ def worker_main(conn):
def run_main(func):
from multiprocessing.connection import Client
from contextlib import closing
address, key = cPickle.loads(eintr_retry_call(sys.stdin.read))
stdin = getattr(sys.stdin, 'buffer', sys.stdin)
address, key = msgpack_loads(eintr_retry_call(stdin.read))
with closing(Client(address, authkey=key)) as conn:
raise SystemExit(func(conn))

Expand Down

0 comments on commit d3ccd43

Please sign in to comment.