diff --git a/lib/SampleService/core/notification.py b/lib/SampleService/core/notification.py index 4911c24a..60764fa1 100644 --- a/lib/SampleService/core/notification.py +++ b/lib/SampleService/core/notification.py @@ -62,6 +62,9 @@ class KafkaNotifier: _SAMPLE_VERSION = 'sample_ver' _NEW_SAMPLE = 'NEW_SAMPLE' _ACL_CHANGE = 'ACL_CHANGE' + _LINK_ID = 'link_id' + _NEW_LINK = 'NEW_LINK' + _EXPIRED_LINK = 'EXPIRED_LINK' def __init__(self, bootstrap_servers: str, topic: str): """ @@ -101,25 +104,49 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): """ Send a notification that a new sample version has been created. - :param sample_id: The sample ID. - :param sample_ver: The version of the sample. + :param sample_id: the sample ID. + :param sample_ver: the version of the sample. """ if sample_ver < 1: raise ValueError('sample_ver must be > 0') - msg = {self._EVENT_TYPE: self._NEW_SAMPLE, - self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id')), - self._SAMPLE_VERSION: sample_ver} - self._send_message(msg) + self._send_message({ + self._EVENT_TYPE: self._NEW_SAMPLE, + self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id')), + self._SAMPLE_VERSION: sample_ver + }) def notify_sample_acl_change(self, sample_id: UUID): """ Send a notification for a sample ACL change. - :param sample_id: The sample ID. + :param sample_id: the sample ID. """ - msg = {self._EVENT_TYPE: self._ACL_CHANGE, - self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id'))} - self._send_message(msg) + self._send_message({ + self._EVENT_TYPE: self._ACL_CHANGE, + self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id')) + }) + + def notify_new_link(self, link_id: UUID): + """ + Send a notification that a link has been created. + + :param link_id: the link ID. + """ + self._send_message({ + self._EVENT_TYPE: self._NEW_LINK, + self._LINK_ID: str(_not_falsy(link_id, 'link_id')) + }) + + def notify_expired_link(self, link_id: UUID): + """ + Send a notification that a link has been expired. + + :param link_id: the link ID. + """ + self._send_message({ + self._EVENT_TYPE: self._EXPIRED_LINK, + self._LINK_ID: str(_not_falsy(link_id, 'link_id')) + }) def _send_message(self, message): if self._closed: diff --git a/test/SampleService_test.py b/test/SampleService_test.py index 9f755169..2d9f4038 100644 --- a/test/SampleService_test.py +++ b/test/SampleService_test.py @@ -3522,3 +3522,67 @@ def _kafka_notifier_notify_acl_change_fail(notifier, sample, expected): with raises(Exception) as got: notifier.notify_sample_acl_change(sample) assert_exception_correct(got.value, expected) + + +def test_kafka_notifier_new_link(sample_port, kafka): + kn = KafkaNotifier(f'localhost:{kafka.port}', 'topictopic') + try: + id_ = uuid.uuid4() + + kn.notify_new_link(id_) + + _check_kafka_messages( + kafka, + [{'event_type': 'NEW_LINK', 'link_id': str(id_)}], + 'topictopic') + finally: + kn.close() + + +def test_kafka_notifier_new_link_fail(sample_port, kafka): + kn = KafkaNotifier(f'localhost:{kafka.port}', 'mytopic') + + _kafka_notifier_new_link_fail(kn, None, ValueError( + 'link_id cannot be a value that evaluates to false')) + + kn.close() + _kafka_notifier_new_link_fail(kn, uuid.uuid4(), ValueError( + 'client is closed')) + + +def _kafka_notifier_new_link_fail(notifier, sample, expected): + with raises(Exception) as got: + notifier.notify_new_link(sample) + assert_exception_correct(got.value, expected) + + +def test_kafka_notifier_expired_link(sample_port, kafka): + kn = KafkaNotifier(f'localhost:{kafka.port}', 'topictopic') + try: + id_ = uuid.uuid4() + + kn.notify_expired_link(id_) + + _check_kafka_messages( + kafka, + [{'event_type': 'EXPIRED_LINK', 'link_id': str(id_)}], + 'topictopic') + finally: + kn.close() + + +def test_kafka_notifier_expired_link_fail(sample_port, kafka): + kn = KafkaNotifier(f'localhost:{kafka.port}', 'mytopic') + + _kafka_notifier_expired_link_fail(kn, None, ValueError( + 'link_id cannot be a value that evaluates to false')) + + kn.close() + _kafka_notifier_expired_link_fail(kn, uuid.uuid4(), ValueError( + 'client is closed')) + + +def _kafka_notifier_expired_link_fail(notifier, sample, expected): + with raises(Exception) as got: + notifier.notify_expired_link(sample) + assert_exception_correct(got.value, expected)