Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Client(object):
def __init__(self, write_key=None, host=None, debug=False,
max_queue_size=10000, send=True, on_error=None, flush_at=100,
flush_interval=0.5, gzip=False, max_retries=3,
sync_mode=False, timeout=15):
sync_mode=False, timeout=15, thread=1):
require('write_key', write_key, string_types)

self.queue = queue.Queue(max_queue_size)
Expand All @@ -45,21 +45,25 @@ def __init__(self, write_key=None, host=None, debug=False,
self.log.setLevel(logging.DEBUG)

if sync_mode:
self.consumer = None
self.consumers = None
else:
self.consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
flush_at=flush_at, flush_interval=flush_interval,
gzip=gzip, retries=max_retries, timeout=timeout)

# if we've disabled sending, just don't start the consumer
# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the interpreter is
# destroyed before the daemon thread finishes execution. However, it
# is *not* the same as flushing the queue! To guarantee all messages
# have been delivered, you'll still need to call flush().
if send:
# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the interpreter is
# destroyed before the daemon thread finishes execution. However, it
# is *not* the same as flushing the queue! To guarantee all messages
# have been delivered, you'll still need to call flush().
atexit.register(self.join)
self.consumer.start()
for n in range(thread):
self.consumers = []
consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
flush_at=flush_at, flush_interval=flush_interval,
gzip=gzip, retries=max_retries, timeout=timeout)
self.consumers.append(consumer)

# if we've disabled sending, just don't start the consumer
if send:
consumer.start()

def identify(self, user_id=None, traits=None, context=None, timestamp=None,
anonymous_id=None, integrations=None, message_id=None):
Expand Down Expand Up @@ -263,12 +267,13 @@ def flush(self):

def join(self):
"""Ends the consumer thread once the queue is empty. Blocks execution until finished"""
self.consumer.pause()
try:
self.consumer.join()
except RuntimeError:
# consumer thread has not started
pass
for consumer in self.consumers:
consumer.pause()
try:
consumer.join()
except RuntimeError:
# consumer thread has not started
pass

def shutdown(self):
"""Flush all messages and cleanly shutdown the client"""
Expand Down
11 changes: 7 additions & 4 deletions analytics/test/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,14 @@ def test_shutdown(self):
# 1. client queue is empty
# 2. consumer thread has stopped
self.assertTrue(client.queue.empty())
self.assertFalse(client.consumer.is_alive())
for consumer in client.consumers:
self.assertFalse(consumer.is_alive())

def test_synchronous(self):
client = Client('testsecret', sync_mode=True)

success, message = client.identify('userId')
self.assertIsNone(client.consumer)
self.assertFalse(client.consumers)
self.assertTrue(client.queue.empty())
self.assertTrue(success)

Expand Down Expand Up @@ -333,8 +334,10 @@ def mock_post_fn(*args, **kwargs):

def test_user_defined_timeout(self):
client = Client('testsecret', timeout=10)
self.assertEquals(client.consumer.timeout, 10)
for consumer in client.consumers:
self.assertEquals(consumer.timeout, 10)

def test_default_timeout_15(self):
client = Client('testsecret')
self.assertEquals(client.consumer.timeout, 15)
for consumer in client.consumers:
self.assertEquals(consumer.timeout, 15)