Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[fix] API object seems not to be thread-safe so each worker should ha…

…ve own api object.
  • Loading branch information...
commit 08e3c747253709af981102e68756aa17917734a4 1 parent 8c59d7c
@yssk22 authored
View
2  setup.py
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
setup(name='tweethandler',
- version='0.2.0',
+ version='0.2.1',
description='logging handler for twitter',
license='MIT',
author='Yohei Sasaki',
View
2  test/test_tweethandler.py
@@ -61,7 +61,7 @@ def testEmitAsync(self):
self.setUpHandler(async = True)
log = self.mklog('foo')
self.logger.debug(log)
- logging.shutdown()
+ self.handler.close()
self.assertEqual(log, self.getLastLog()[0])
def testEmit(self):
View
44 tweethandler/tweethandler.py
@@ -39,7 +39,8 @@ def __init__(self,
self._ignore_twerror = ignore_twerror
self._pool = None
if async:
- self._pool = _ExecPool()
+ self._pool = _APIPool(ckey, csecret, akey, asecret,
+ max_retries = max_retries)
self._pool.start()
def close(self):
@@ -55,7 +56,7 @@ def emit(self, record):
self._compact_record(record)
if record.levelno >= self._dm_threshold:
self._send_dm(record)
- self._api_call(self._api.update_status,
+ self._api_call('update_status',
self.format(record))
except TweepError, e:
if not self._ignore_twerror:
@@ -73,22 +74,32 @@ def _send_dm(self, record):
msg = self.format(record)
if type(self._dm_to) is list:
for d in self._dm_to:
- self._api_call(self._api.send_direct_message,
+ self._api_call('send_direct_message',
d, text = msg)
else:
- self._api_call(self._api.send_direct_message,
+ self._api_call('send_direct_message',
self._dm_to, text = msg)
- def _api_call(self, func, *args, **kwargs):
+ def _api_call(self, fname, *args, **kwargs):
if self._pool:
- self._pool.dispatch(func, *args, **kwargs)
+ self._pool.dispatch(fname, *args, **kwargs)
else:
+ func = getattr(self._api, fname)
func(*args, **kwargs)
_StopWorker = object()
-class _ExecPool:
- def __init__(self, numthreads = 10):
+class _APIPool:
+ def __init__(self,
+ ckey, csecret, akey, asecret,
+ max_retries = 5,
+ numthreads = 10):
+ self._ckey = ckey
+ self._csecret = csecret
+ self._akey = akey
+ self._asecret = asecret
+ self._max_retries = max_retries
+
self._numthreads = numthreads
self._threads = []
self._queue = Queue.Queue()
@@ -100,14 +111,15 @@ def start(self):
self._started = True
for i in range(self._numthreads):
+ # prepare api object
name = "_ExecPool-%s-%s" % (id(self), i)
newth = threading.Thread(target = self._worker,
name = name)
self._threads.append(newth)
newth.start()
- def dispatch(self, func, *args, **kwargs):
- o = (func, args, kwargs)
+ def dispatch(self, fname, *args, **kwargs):
+ o = (fname, args, kwargs)
self._queue.put(o)
if not self._started:
self.start()
@@ -125,10 +137,16 @@ def stop(self):
self._started = False
def _worker(self):
- current = threading.currentThread()
+ auth = OAuthHandler(self._ckey, self._csecret)
+ auth.set_access_token(self._akey, self._asecret)
+ api = API(auth, retry_count = self._max_retries)
o = self._queue.get()
while o is not _StopWorker:
- func, args, kwargs = o
- func(*args, **kwargs)
+ fname, args, kwargs = o
+ try:
+ func = getattr(api, fname)
+ func(*args, **kwargs)
+ except:
+ pass
o = self._queue.get()
# ends of threads
Please sign in to comment.
Something went wrong with that request. Please try again.