Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to make the telemetry queue persistent #154

Merged
merged 2 commits into from Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -3,6 +3,7 @@ python:
- '2.7'
- '3.6'
script:
- pip install persist-queue
- python setup.py sdist
- python setup.py test
- django_tests/all_tests.sh
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Unreleased (will be 0.11.8)

- Allow to specify and endpoint to upload telemetry to.
- Add optional queue persistence to prevent telemetry loss in case of application crash.
- Add support for using `NullSender` with `AsynchronousQueue`.

## 0.11.7
Expand Down
7 changes: 2 additions & 5 deletions applicationinsights/channel/AsynchronousQueue.py
Expand Up @@ -6,14 +6,11 @@ class AsynchronousQueue(QueueBase):
will notify the sender that it needs to pick up items when it reaches :func:`max_queue_length`, or
when the consumer calls :func:`flush` via the :func:`flush_notification` event.
"""
def __init__(self, sender):
def __init__(self, *args, **kwargs):
"""Initializes a new instance of the class.

Args:
sender (:class:`SenderBase`) the sender object that will be used in conjunction with this queue.
"""
self._flush_notification = Event()
QueueBase.__init__(self, sender)
QueueBase.__init__(self, *args, **kwargs)

@property
def flush_notification(self):
Expand Down
29 changes: 25 additions & 4 deletions applicationinsights/channel/QueueBase.py
Expand Up @@ -5,19 +5,35 @@
# Python 3.x
from queue import Queue, Empty

try:
from persistqueue import Empty as PersistEmpty
from persistqueue import Queue as PersistQueue
except ImportError:
PersistEmpty = Empty
PersistQueue = None


class QueueBase(object):
"""The base class for all types of queues for use in conjunction with an implementation of :class:`SenderBase`.

The queue will notify the sender that it needs to pick up items when it reaches :func:`max_queue_length`,
or when the consumer calls :func:`flush`.
"""
def __init__(self, sender):
def __init__(self, sender, persistence_path=''):
"""Initializes a new instance of the class.

Args:
sender (:class:`SenderBase`) the sender object that will be used in conjunction with this queue.
persistence_path (str) if set, persist the queue on disk into the provided directory.
"""
self._queue = Queue()
if persistence_path and PersistQueue is None:
raise ValueError('persistence_path argument requires persist-queue dependency to be installed')
elif persistence_path:
self._queue = PersistQueue(persistence_path)
else:
self._queue = Queue()

self._persistence_path = persistence_path
self._max_queue_length = 500
self._sender = sender
if sender:
Expand Down Expand Up @@ -80,10 +96,15 @@ def get(self):
:class:`contracts.Envelope`. a telemetry envelope object or None if the queue is empty.
"""
try:
return self._queue.get_nowait()
except Empty:
item = self._queue.get_nowait()
except (Empty, PersistEmpty):
return None

if self._persistence_path:
self._queue.task_done()

return item

def flush(self):
"""Flushes the current queue by notifying the {#sender}. This method needs to be overridden by a concrete
implementations of the queue class.
Expand Down
8 changes: 0 additions & 8 deletions applicationinsights/channel/SynchronousQueue.py
Expand Up @@ -12,14 +12,6 @@ class SynchronousQueue(QueueBase):
queue.max_queue_length = 1
queue.put(1)
"""
def __init__(self, sender):
"""Initializes a new instance of the class.

Args:
sender (:class:`SenderBase`) the sender object that will be used in conjunction with this queue.
"""
QueueBase.__init__(self, sender)

def flush(self):
"""Flushes the current queue by by calling :func:`sender`'s :func:`send` method.
"""
Expand Down
30 changes: 30 additions & 0 deletions tests/applicationinsights_tests/channel_tests/TestQueueBase.py
@@ -1,11 +1,15 @@
import unittest

import sys, os, os.path
from shutil import rmtree
from tempfile import mkdtemp

rootDirectory = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..')
if rootDirectory not in sys.path:
sys.path.append(rootDirectory)

from applicationinsights import channel
from applicationinsights.channel.QueueBase import PersistQueue

class TestQueueBase(unittest.TestCase):
def test_construct(self):
Expand Down Expand Up @@ -47,6 +51,32 @@ def test_get_works_as_expected(self):
def test_flush_works_as_expected(self):
self.test_put_works_as_expected()

@unittest.skipIf(PersistQueue is None, reason='persist-queue missing')
def test_queue_persistence(self):
queue = channel.QueueBase(None, self.persistence_directory)
queue.put(1)
queue.put(2)
self.assertEqual(1, queue.get())
queue.put(3)
del queue
queue = channel.QueueBase(None, self.persistence_directory)
queue.put(4)
self.assertEqual(2, queue.get())
self.assertEqual(3, queue.get())
self.assertEqual(4, queue.get())
self.assertEqual(None, queue.get())

@unittest.skipIf(PersistQueue is not None, reason='persist-queue exists')
def test_queue_persistence_without_dependency(self):
with self.assertRaises(ValueError):
channel.QueueBase(None, self.persistence_directory)

def setUp(self):
self.persistence_directory = mkdtemp()

def tearDown(self):
rmtree(self.persistence_directory)

class InterceptableQueueBase(channel.QueueBase):
def __init__(self, sender):
channel.QueueBase.__init__(self, sender)
Expand Down