Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/SampleService/core/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ def __init__(self, bootstrap_servers: str, topic: str):
self._closed = False

def notify_new_sample_version(self, sample_id: UUID, sample_ver: int):
"""
Send a notification that a new sample version has been created.

:param sample_id: The sample ID.
:param sample_ver: The version of the sample.
"""
if sample_ver < 1:
raise ValueError('sample_ver must be > 0')
msg = {self._EVENT_TYPE: self._NEW_SAMPLE,
Expand All @@ -106,6 +112,11 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int):
self._send_message(msg)

def notify_sample_acl_change(self, sample_id: UUID):
"""
Send a notification for a sample ACL change.

:param sample_id: The sample ID.
"""
msg = {self._EVENT_TYPE: self._ACL_CHANGE,
self._SAMPLE_ID: str(_not_falsy(sample_id, 'sample_id'))}
self._send_message(msg)
Expand Down
2 changes: 2 additions & 0 deletions lib/SampleService/core/samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ def replace_sample_acls(
count = -1
except _OwnerChangedError:
count += 1
if self._kafka:
self._kafka.notify_sample_acl_change(id_)

# TODO change owner. Probably needs a request/accept flow.

Expand Down
29 changes: 28 additions & 1 deletion test/SampleService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def service(auth, arango, workspace, kafka):
def sample_port(service, arango, workspace, kafka):
clear_db_and_recreate(arango)
workspace.clear_db()
# _clear_kafka_messages(kafka) # too expensive to run after every test
# kafka.clear_all_topics() # too expensive to run after every test
yield service

Expand Down Expand Up @@ -455,7 +456,22 @@ def _check_kafka_messages(kafka, expected_msgs, topic=KAFKA_TOPIC):
kc.close()


def _clear_kafka_messages(kafka, topic=KAFKA_TOPIC):
kc = KafkaConsumer(
topic,
bootstrap_servers=f'localhost:{kafka.port}',
auto_offset_reset='earliest',
group_id='foo') # quiets warnings

try:
kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot
# Need to commit here? doesn't seem like it
finally:
kc.close()


def test_create_and_get_sample_with_version(sample_port, kafka):
_clear_kafka_messages(kafka)
url = f'http://localhost:{sample_port}'

# version 1
Expand Down Expand Up @@ -1031,7 +1047,8 @@ def test_get_sample_fail_admin_permissions(sample_port):
'necessary administration privileges to run method get_sample')


def test_get_and_replace_acls(sample_port):
def test_get_and_replace_acls(sample_port, kafka):
_clear_kafka_messages(kafka)
url = f'http://localhost:{sample_port}'

ret = requests.post(url, headers=get_authorized_headers(TOKEN1), json={
Expand Down Expand Up @@ -1156,6 +1173,16 @@ def test_get_and_replace_acls(sample_port):
'read': [USER2]
})

_check_kafka_messages(
kafka,
[
{'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 1},
{'event_type': 'ACL_CHANGE', 'sample_id': id_},
{'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 2},
{'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 3},
{'event_type': 'ACL_CHANGE', 'sample_id': id_},
])


def test_get_acls_as_admin(sample_port):
url = f'http://localhost:{sample_port}'
Expand Down
38 changes: 36 additions & 2 deletions test/core/samples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,9 @@ def _replace_sample_acls(user: UserID, as_admin):
lu = create_autospec(KBaseUserLookup, spec_set=True, instance=True)
meta = create_autospec(MetadataValidatorSet, spec_set=True, instance=True)
ws = create_autospec(WS, spec_set=True, instance=True)
samples = Samples(
storage, lu, meta, ws, now=nw, uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef'))
kafka = create_autospec(KafkaNotifier, spec_set=True, instance=True)
samples = Samples(storage, lu, meta, ws, kafka, now=nw,
uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef'))
id_ = UUID('1234567890abcdef1234567890abcde0')

lu.invalid_users.return_value = []
Expand All @@ -523,6 +524,39 @@ def _replace_sample_acls(user: UserID, as_admin):
((UUID('1234567890abcdef1234567890abcde0'),
SampleACL(u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')])), {})]

kafka.notify_sample_acl_change.assert_called_once_with(id_)


def test_replace_sample_acls_without_notifier():
storage = create_autospec(ArangoSampleStorage, spec_set=True, instance=True)
lu = create_autospec(KBaseUserLookup, spec_set=True, instance=True)
meta = create_autospec(MetadataValidatorSet, spec_set=True, instance=True)
ws = create_autospec(WS, spec_set=True, instance=True)
samples = Samples(storage, lu, meta, ws, now=nw,
uuid_gen=lambda: UUID('1234567890abcdef1234567890abcdef'))
id_ = UUID('1234567890abcdef1234567890abcde0')

lu.invalid_users.return_value = []

storage.get_sample_acls.return_value = SampleACL(
u('someuser'),
[u('otheruser'), u('y')],
[u('anotheruser'), u('ur mum')],
[u('Fungus J. Pustule Jr.'), u('x')])

samples.replace_sample_acls(id_, u('y'), SampleACL(
u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')]))

assert lu.invalid_users.call_args_list == [
(([u(x) for x in ['x', 'y', 'z', 'a', 'b', 'c']],), {})]

assert storage.get_sample_acls.call_args_list == [
((UUID('1234567890abcdef1234567890abcde0'),), {})]

assert storage.replace_sample_acls.call_args_list == [
((UUID('1234567890abcdef1234567890abcde0'),
SampleACL(u('someuser'), [u('x'), u('y')], [u('z'), u('a')], [u('b'), u('c')])), {})]


def test_replace_sample_acls_with_owner_change():
storage = create_autospec(ArangoSampleStorage, spec_set=True, instance=True)
Expand Down