Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions lib/SampleService/core/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class KafkaNotifier:
_SAMPLE_VERSION = 'sample_ver'
_NEW_SAMPLE = 'NEW_SAMPLE'
_ACL_CHANGE = 'ACL_CHANGE'
_LINK_ID = 'link_id'
_NEW_LINK = 'NEW_LINK'
_EXPIRED_LINK = 'EXPIRED_LINK'

def __init__(self, bootstrap_servers: str, topic: str):
"""
Expand Down Expand Up @@ -101,25 +104,49 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int):
"""
Send a notification that a new sample version has been created.

:param sample_id: The sample ID.
:param sample_ver: The version of the sample.
:param sample_id: the sample ID.
:param sample_ver: the version of the sample.
"""
if sample_ver < 1:
raise ValueError('sample_ver must be > 0')
msg = {self._EVENT_TYPE: self._NEW_SAMPLE,
self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id')),
self._SAMPLE_VERSION: sample_ver}
self._send_message(msg)
self._send_message({
self._EVENT_TYPE: self._NEW_SAMPLE,
self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id')),
self._SAMPLE_VERSION: sample_ver
})

def notify_sample_acl_change(self, sample_id: UUID):
"""
Send a notification for a sample ACL change.

:param sample_id: The sample ID.
:param sample_id: the sample ID.
"""
msg = {self._EVENT_TYPE: self._ACL_CHANGE,
self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id'))}
self._send_message(msg)
self._send_message({
self._EVENT_TYPE: self._ACL_CHANGE,
self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id'))
})

def notify_new_link(self, link_id: UUID):
"""
Send a notification that a link has been created.

:param link_id: the link ID.
"""
self._send_message({
self._EVENT_TYPE: self._NEW_LINK,
self._LINK_ID: str(_not_falsy(link_id, 'link_id'))
})

def notify_expired_link(self, link_id: UUID):
"""
Send a notification that a link has been expired.

:param link_id: the link ID.
"""
self._send_message({
self._EVENT_TYPE: self._EXPIRED_LINK,
self._LINK_ID: str(_not_falsy(link_id, 'link_id'))
})

def _send_message(self, message):
if self._closed:
Expand Down
64 changes: 64 additions & 0 deletions test/SampleService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3522,3 +3522,67 @@ def _kafka_notifier_notify_acl_change_fail(notifier, sample, expected):
with raises(Exception) as got:
notifier.notify_sample_acl_change(sample)
assert_exception_correct(got.value, expected)


def test_kafka_notifier_new_link(sample_port, kafka):
kn = KafkaNotifier(f'localhost:{kafka.port}', 'topictopic')
try:
id_ = uuid.uuid4()

kn.notify_new_link(id_)

_check_kafka_messages(
kafka,
[{'event_type': 'NEW_LINK', 'link_id': str(id_)}],
'topictopic')
finally:
kn.close()


def test_kafka_notifier_new_link_fail(sample_port, kafka):
kn = KafkaNotifier(f'localhost:{kafka.port}', 'mytopic')

_kafka_notifier_new_link_fail(kn, None, ValueError(
'link_id cannot be a value that evaluates to false'))

kn.close()
_kafka_notifier_new_link_fail(kn, uuid.uuid4(), ValueError(
'client is closed'))


def _kafka_notifier_new_link_fail(notifier, sample, expected):
with raises(Exception) as got:
notifier.notify_new_link(sample)
assert_exception_correct(got.value, expected)


def test_kafka_notifier_expired_link(sample_port, kafka):
kn = KafkaNotifier(f'localhost:{kafka.port}', 'topictopic')
try:
id_ = uuid.uuid4()

kn.notify_expired_link(id_)

_check_kafka_messages(
kafka,
[{'event_type': 'EXPIRED_LINK', 'link_id': str(id_)}],
'topictopic')
finally:
kn.close()


def test_kafka_notifier_expired_link_fail(sample_port, kafka):
kn = KafkaNotifier(f'localhost:{kafka.port}', 'mytopic')

_kafka_notifier_expired_link_fail(kn, None, ValueError(
'link_id cannot be a value that evaluates to false'))

kn.close()
_kafka_notifier_expired_link_fail(kn, uuid.uuid4(), ValueError(
'client is closed'))


def _kafka_notifier_expired_link_fail(notifier, sample, expected):
with raises(Exception) as got:
notifier.notify_expired_link(sample)
assert_exception_correct(got.value, expected)