diff --git a/lib/SampleService/core/notification.py b/lib/SampleService/core/notification.py index b5bd9462..6dbeab80 100644 --- a/lib/SampleService/core/notification.py +++ b/lib/SampleService/core/notification.py @@ -98,6 +98,12 @@ def __init__(self, bootstrap_servers: str, topic: str): self._closed = False def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): + """ + Send a notification that a new sample version has been created. + + :param sample_id: The sample ID. + :param sample_ver: The version of the sample. + """ if sample_ver < 1: raise ValueError('sample_ver must be > 0') msg = {self._EVENT_TYPE: self._NEW_SAMPLE, @@ -106,6 +112,11 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): self._send_message(msg) def notify_sample_acl_change(self, sample_id: UUID): + """ + Send a notification for a sample ACL change. + + :param sample_id: The sample ID. + """ msg = {self._EVENT_TYPE: self._ACL_CHANGE, self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id'))} self._send_message(msg) diff --git a/lib/SampleService/core/samples.py b/lib/SampleService/core/samples.py index 5bb85649..f8e5d8fa 100644 --- a/lib/SampleService/core/samples.py +++ b/lib/SampleService/core/samples.py @@ -242,6 +242,8 @@ def replace_sample_acls( count = -1 except _OwnerChangedError: count += 1 + if self._kafka: + self._kafka.notify_sample_acl_change(id_) # TODO change owner. Probably needs a request/accept flow. diff --git a/test/SampleService_test.py b/test/SampleService_test.py index b1f99a21..9f755169 100644 --- a/test/SampleService_test.py +++ b/test/SampleService_test.py @@ -355,6 +355,7 @@ def service(auth, arango, workspace, kafka): def sample_port(service, arango, workspace, kafka): clear_db_and_recreate(arango) workspace.clear_db() + # _clear_kafka_messages(kafka) # too expensive to run after every test # kafka.clear_all_topics() # too expensive to run after every test yield service @@ -455,7 +456,22 @@ def _check_kafka_messages(kafka, expected_msgs, topic=KAFKA_TOPIC): kc.close() +def _clear_kafka_messages(kafka, topic=KAFKA_TOPIC): + kc = KafkaConsumer( + topic, + bootstrap_servers=f'localhost:{kafka.port}', + auto_offset_reset='earliest', + group_id='foo') # quiets warnings + + try: + kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot + # Need to commit here? doesn't seem like it + finally: + kc.close() + + def test_create_and_get_sample_with_version(sample_port, kafka): + _clear_kafka_messages(kafka) url = f'http://localhost:{sample_port}' # version 1 @@ -1031,7 +1047,8 @@ def test_get_sample_fail_admin_permissions(sample_port): 'necessary administration privileges to run method get_sample') -def test_get_and_replace_acls(sample_port): +def test_get_and_replace_acls(sample_port, kafka): + _clear_kafka_messages(kafka) url = f'http://localhost:{sample_port}' ret = requests.post(url, headers=get_authorized_headers(TOKEN1), json={ @@ -1156,6 +1173,16 @@ def test_get_and_replace_acls(sample_port): 'read': [USER2] }) + _check_kafka_messages( + kafka, + [ + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 1}, + {'event_type': 'ACL_CHANGE', 'sample_id': id_}, + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 2}, + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 3}, + {'event_type': 'ACL_CHANGE', 'sample_id': id_}, + ]) + def test_get_acls_as_admin(sample_port): url = f'http://localhost:{sample_port}' diff --git a/test/core/samples_test.py b/test/core/samples_test.py index 896ef0c1..69ac6f61 100644 --- a/test/core/samples_test.py +++ b/test/core/samples_test.py @@ -497,8 +497,9 @@ def _replace_sample_acls(user: UserID, as_admin): lu = create_autospec(KBaseUserLookup, spec_set=True, instance=True) meta = create_autospec(MetadataValidatorSet, spec_set=True, instance=True) ws = create_autospec(WS, spec_set=True, instance=True) - samples = Samples( - storage, lu, meta, ws, now=nw, uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef')) + kafka = create_autospec(KafkaNotifier, spec_set=True, instance=True) + samples = Samples(storage, lu, meta, ws, kafka, now=nw, + uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef')) id_ = UUID('1234567890abcdef1234567890abcde0') lu.invalid_users.return_value = [] @@ -523,6 +524,39 @@ def _replace_sample_acls(user: UserID, as_admin): ((UUID('1234567890abcdef1234567890abcde0'), SampleACL(u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')])), {})] + kafka.notify_sample_acl_change.assert_called_once_with(id_) + + +def test_replace_sample_acls_without_notifier(): + storage = create_autospec(ArangoSampleStorage, spec_set=True, instance=True) + lu = create_autospec(KBaseUserLookup, spec_set=True, instance=True) + meta = create_autospec(MetadataValidatorSet, spec_set=True, instance=True) + ws = create_autospec(WS, spec_set=True, instance=True) + samples = Samples(storage, lu, meta, ws, now=nw, + uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef')) + id_ = UUID('1234567890abcdef1234567890abcde0') + + lu.invalid_users.return_value = [] + + storage.get_sample_acls.return_value = SampleACL( + u('someuser'), + [u('otheruser'), u('y')], + [u('anotheruser'), u('ur mum')], + [u('Fungus J. Pustule Jr.'), u('x')]) + + samples.replace_sample_acls(id_, u('y'), SampleACL( + u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')])) + + assert lu.invalid_users.call_args_list == [ + (([u(x) for x in ['x', 'y', 'z', 'a', 'b', 'c']],), {})] + + assert storage.get_sample_acls.call_args_list == [ + ((UUID('1234567890abcdef1234567890abcde0'),), {})] + + assert storage.replace_sample_acls.call_args_list == [ + ((UUID('1234567890abcdef1234567890abcde0'), + SampleACL(u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')])), {})] + def test_replace_sample_acls_with_owner_change(): storage = create_autospec(ArangoSampleStorage, spec_set=True, instance=True)