diff --git a/Pipfile b/Pipfile index 2293da8e..9e9b6b00 100644 --- a/Pipfile +++ b/Pipfile @@ -11,6 +11,7 @@ coveralls = "*" flake8 = "*" pymongo = "*" semver = "*" +ipython = "*" [packages] python-arango = "==5.0.0" diff --git a/Pipfile.lock b/Pipfile.lock index b778b6f0..19ce50e2 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "95de0d8b679b366d403b7f535051b612c4b6e8ff469ffc07fb31dfeed3a0fa67" + "sha256": "78a2d3911fee178c81c2107f5d91f41bbe05004e61b6a97711c9559d342f16be" }, "pipfile-spec": 6, "requires": { @@ -313,6 +313,13 @@ ], "version": "==19.3.0" }, + "backcall": { + "hashes": [ + "sha256:38ecd85be2c1e78f77fd91700c76e14667dc21e2713b63876c0eb901196e01e4", + "sha256:bbbf4b1e5cd2bdb08f915895b51081c041bac22394fdfcfdfbe9f14b77c08bf2" + ], + "version": "==0.1.0" + }, "certifi": { "hashes": [ "sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304", @@ -371,6 +378,13 @@ "index": "pypi", "version": "==2.0.0" }, + "decorator": { + "hashes": [ + "sha256:41fa54c2a0cc4ba648be4fd43cff00aedf5b9465c9bf18d64325bc225f08f760", + "sha256:e3a62f0520172440ca0dcc823749319382e377f37f140a0b99ef45fecb84bfe7" + ], + "version": "==4.4.2" + }, "docopt": { "hashes": [ "sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491" @@ -400,6 +414,28 @@ "markers": "python_version < '3.8'", "version": "==1.6.0" }, + "ipython": { + "hashes": [ + "sha256:5b241b84bbf0eb085d43ae9d46adf38a13b45929ca7774a740990c2c242534bb", + "sha256:f0126781d0f959da852fb3089e170ed807388e986a8dd4e6ac44855845b0fb1c" + ], + "index": "pypi", + "version": "==7.14.0" + }, + "ipython-genutils": { + "hashes": [ + "sha256:72dd37233799e619666c9f639a9da83c34013a73e8bbc79a7a6348d93c61fab8", + "sha256:eb2e116e75ecef9d4d228fdc66af54269afa26ab4463042e33785b887c628ba8" + ], + "version": "==0.2.0" + }, + "jedi": { + "hashes": [ + "sha256:cd60c93b71944d628ccac47df9a60fec53150de53d42dc10a7fc4b5ba6aae798", + "sha256:df40c97641cb943661d2db4c33c2e1ff75d491189423249e989bcea4464f3030" + ], + "version": "==0.17.0" + }, "mccabe": { "hashes": [ "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42", @@ -448,6 +484,28 @@ ], "version": "==20.4" }, + "parso": { + "hashes": [ + "sha256:158c140fc04112dc45bca311633ae5033c2c2a7b732fa33d0955bad8152a8dd0", + "sha256:908e9fae2144a076d72ae4e25539143d40b8e3eafbaeae03c1bfe226f4cdf12c" + ], + "version": "==0.7.0" + }, + "pexpect": { + "hashes": [ + "sha256:0b48a55dcb3c05f3329815901ea4fc1537514d6ba867a152b581d69ae3710937", + "sha256:fc65a43959d153d0114afe13997d439c22823a27cefceb5ff35c2178c6784c0c" + ], + "markers": "sys_platform != 'win32'", + "version": "==4.8.0" + }, + "pickleshare": { + "hashes": [ + "sha256:87683d47965c1da65cdacaf31c8441d12b8044cdec9aca500cd78fc2c683afca", + "sha256:9649af414d74d4df115d5d718f82acb59c9d418196b7b4290ed47a12ce62df56" + ], + "version": "==0.7.5" + }, "pluggy": { "hashes": [ "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", @@ -455,6 +513,20 @@ ], "version": "==0.13.1" }, + "prompt-toolkit": { + "hashes": [ + "sha256:563d1a4140b63ff9dd587bda9557cffb2fe73650205ab6f4383092fb882e7dc8", + "sha256:df7e9e63aea609b1da3a65641ceaf5bc7d05e0a04de5bd45d05dbeffbabf9e04" + ], + "version": "==3.0.5" + }, + "ptyprocess": { + "hashes": [ + "sha256:923f299cc5ad920c68f2bc0bc98b75b9f838b93b599941a6b63ddbc2476394c0", + "sha256:d7cc528d76e76342423ca640335bd3633420dc1366f258cb31d05e865ef5ca1f" + ], + "version": "==0.6.0" + }, "py": { "hashes": [ "sha256:5e27081401262157467ad6e7f851b7aa402c5852dbcb3dae06768434de5752aa", @@ -476,6 +548,13 @@ ], "version": "==2.2.0" }, + "pygments": { + "hashes": [ + "sha256:647344a061c249a3b74e230c739f434d7ea4d8b1d5f3721bc0f3558049b38f44", + "sha256:ff7a40b4860b727ab48fad6360eb351cc1b33cbf9b15a0f689ca5353e9463324" + ], + "version": "==2.6.1" + }, "pymongo": { "hashes": [ "sha256:01b4e10027aef5bb9ecefbc26f5df3368ce34aef81df43850f701e716e3fe16d", @@ -581,6 +660,13 @@ ], "version": "==1.15.0" }, + "traitlets": { + "hashes": [ + "sha256:70b4c6a1d9019d7b4f6846832288f86998aa3b9207c6821f3578a6a6a467fe44", + "sha256:d023ee369ddd2763310e4c3eae1ff649689440d4ae59d7485eb4cfbbe3e359f7" + ], + "version": "==4.3.3" + }, "typed-ast": { "hashes": [ "sha256:0666aa36131496aed8f7be0410ff974562ab7eeac11ef351def9ea6fa28f6355", diff --git a/deploy.cfg b/deploy.cfg index 7fb950dc..180b48bd 100644 --- a/deploy.cfg +++ b/deploy.cfg @@ -70,4 +70,11 @@ schema-collection = {{ schema_collection }} # A URL pointing to a configuration file for any metadata validators to be installed on startup. # See the readme file for a description of the file contents. -metadata-validator-config-url = {{ metadata_validator_config_url }} \ No newline at end of file +metadata-validator-config-url = {{ metadata_validator_config_url }} + +# bootstrap.servers parameter for Kafka notifications. Leave blank to disable the Kafka +# notifier. +kafka-bootstrap-servers = {{ kafka_bootstrap_servers }} +# The topic to which the Kafka notifier should write. Required if kafka-bootstrap-servers is +# provided. +kafka-topic = {{ kafka_topic }} \ No newline at end of file diff --git a/lib/SampleService/core/config.py b/lib/SampleService/core/config.py index dd8a1e74..b19ac18a 100644 --- a/lib/SampleService/core/config.py +++ b/lib/SampleService/core/config.py @@ -21,6 +21,7 @@ from SampleService.core.storage.arango_sample_storage import ArangoSampleStorage \ as _ArangoSampleStorage from SampleService.core.arg_checkers import check_string as _check_string +from SampleService.core.notification import KafkaNotifier as _KafkaNotifer from SampleService.core.user_lookup import KBaseUserLookup from SampleService.core.workspace import WS as _WS @@ -67,13 +68,17 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: ws_token = _check_string_req(config.get('workspace-read-admin-token'), 'config param workspace-read-admin-token') + kafka_servers = _check_string(config.get('kafka-bootstrap-servers'), + 'config param kafka-bootstrap-servers', + optional=True) + kafka_topic = None + if kafka_servers: # have to start the server twice to test no kafka scenario + kafka_topic = _check_string(config.get('kafka-topic'), 'config param kafka-topic') + metaval_url = _check_string(config.get('metadata-validator-config-url'), 'config param metadata-validator-config-url', optional=True) - # build the validators before trying to connect to arango - metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet() - # meta params may have info that shouldn't be logged so don't log any for now. # Add code to deal with this later if needed print(f''' @@ -95,10 +100,15 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: auth-full-admin-roles: {', '.join(full_roles)} auth-read-admin-roles: {', '.join(read_roles)} workspace-url: {ws_url} - workspace-read-admin-token: [REDACTED FOR YOUR PLEASURE] + workspace-read-admin-token: [REDACTED FOR YOUR ULTIMATE PLEASURE] + kafka-bootstrap-servers: {kafka_servers} + kafka-topic: {kafka_topic} metadata-validators-config-url: {metaval_url} ''') + # build the validators before trying to connect to arango + metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet() + arangoclient = _arango.ArangoClient(hosts=arango_url) arango_db = arangoclient.db( arango_db, username=arango_user, password=arango_pwd, verify=True) @@ -114,9 +124,10 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: col_schema, ) storage.start_consistency_checker() + kafka = _KafkaNotifer(kafka_servers, _cast(str, kafka_topic)) if kafka_servers else None user_lookup = KBaseUserLookup(auth_root_url, auth_token, full_roles, read_roles) ws = _WS(_Workspace(ws_url, token=ws_token)) - return Samples(storage, user_lookup, metaval, ws), user_lookup + return Samples(storage, user_lookup, metaval, ws, kafka), user_lookup def split_value(d: Dict[str, str], key: str): diff --git a/lib/SampleService/core/notification.py b/lib/SampleService/core/notification.py index 67ccb417..a565150b 100644 --- a/lib/SampleService/core/notification.py +++ b/lib/SampleService/core/notification.py @@ -94,6 +94,7 @@ def __init__(self, bootstrap_servers: str, topic: str): # presumably this can be removed once idempotence is supported max_in_flight_requests_per_connection=1, ) + self._closed = False def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): if sample_ver < 1: @@ -104,11 +105,20 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): self._send_message(msg) def _send_message(self, message): + if self._closed: + raise ValueError('client is closed') future = self._prod.send(self._topic, _json.dumps(message).encode('utf-8')) # ensure the message was send correctly, or if not throw an exeption in the correct thread future.get(timeout=35) # this is very difficult to test - # TODO KAFKA notify on sample save + def close(self): + """ + Close the notifier. + """ + # handle with context at some point + self._prod.close() + self._closed = True + # TODO KAFKA notify on ACL change # TODO KAFKA notify on new link # TODO KAFKA notify on expired link via update diff --git a/test/SampleService_test.py b/test/SampleService_test.py index 1cb8064b..b57431b3 100644 --- a/test/SampleService_test.py +++ b/test/SampleService_test.py @@ -32,7 +32,11 @@ from installed_clients.WorkspaceClient import Workspace as Workspace from core import test_utils -from core.test_utils import assert_ms_epoch_close_to_now, assert_exception_correct, find_free_port +from core.test_utils import ( + assert_ms_epoch_close_to_now, + assert_exception_correct, + find_free_port +) from arango_controller import ArangoController from mongo_controller import MongoController from workspace_controller import WorkspaceController @@ -83,8 +87,10 @@ USER_NO_TOKEN2 = 'usernt2' USER_NO_TOKEN3 = 'usernt3' +KAFKA_TOPIC = 'sampleservice' -def create_deploy_cfg(auth_port, arango_port, workspace_port): + +def create_deploy_cfg(auth_port, arango_port, workspace_port, kafka_port): cfg = ConfigParser() ss = 'SampleService' cfg.add_section(ss) @@ -106,6 +112,9 @@ def create_deploy_cfg(auth_port, arango_port, workspace_port): cfg[ss]['workspace-url'] = f'http://localhost:{workspace_port}' cfg[ss]['workspace-read-admin-token'] = TOKEN_WS_READ_ADMIN + cfg[ss]['kafka-bootstrap-servers'] = f'localhost:{kafka_port}' + cfg[ss]['kafka-topic'] = KAFKA_TOPIC + cfg[ss]['sample-collection'] = TEST_COL_SAMPLE cfg[ss]['version-collection'] = TEST_COL_VERSION cfg[ss]['version-edge-collection'] = TEST_COL_VER_EDGE @@ -319,7 +328,7 @@ def kafka(): del_temp = test_utils.get_delete_temp_files() print('shutting down kafka, delete_temp_files={}'.format(del_temp)) - kc.destroy(del_temp, dump_logs_to_stdout=True) + kc.destroy(del_temp, dump_logs_to_stdout=False) @fixture(scope='module') @@ -328,7 +337,7 @@ def service(auth, arango, workspace, kafka): clear_db_and_recreate(arango) # this is completely stupid. The state is calculated on import so there's no way to # test the state creation normally. - cfgpath = create_deploy_cfg(auth.port, arango.port, workspace.port) + cfgpath = create_deploy_cfg(auth.port, arango.port, workspace.port, kafka.port) os.environ['KB_DEPLOYMENT_CONFIG'] = cfgpath from SampleService import SampleServiceServer Thread(target=SampleServiceServer.start_server, kwargs={'port': portint}, daemon=True).start() @@ -346,7 +355,7 @@ def service(auth, arango, workspace, kafka): def sample_port(service, arango, workspace, kafka): clear_db_and_recreate(arango) workspace.clear_db() - kafka.clear_all_topics() + # kafka.clear_all_topics() # too expensive to run after every test yield service @@ -389,6 +398,9 @@ def test_init_fail(): cfg['workspace-url'] = 'crap' init_fail(cfg, MissingParameterError('config param workspace-read-admin-token')) cfg['workspace-read-admin-token'] = 'crap' + cfg['kafka-bootstrap-servers'] = 'crap' + init_fail(cfg, MissingParameterError('config param kafka-topic')) + cfg['kafka-topic'] = 'crap' # get_validators is tested elsewhere, just make sure it'll error out cfg['metadata-validator-config-url'] = 'https://kbase.us/services' init_fail(cfg, ValueError( @@ -423,10 +435,30 @@ def get_authorized_headers(token): return {'authorization': token, 'accept': 'application/json'} -def test_create_and_get_sample_with_version(sample_port): +def _check_kafka_messages(kafka, expected_msgs): + kc = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=f'localhost:{kafka.port}', + auto_offset_reset='earliest', + group_id='foo') # quiets warnings + + try: + res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot + assert len(res) == 1 + assert next(iter(res.keys())).topic == KAFKA_TOPIC + records = next(iter(res.values())) + assert len(records) == len(expected_msgs) + for i, r in enumerate(records): + assert json.loads(r.value) == expected_msgs[i] + # TODO KAFKA this needs a reset before other tests, somehow. commit? + finally: + kc.close() + + +def test_create_and_get_sample_with_version(sample_port, kafka): url = f'http://localhost:{sample_port}' - # verison 1 + # version 1 ret = requests.post(url, headers=get_authorized_headers(TOKEN1), json={ 'method': 'SampleService.create_sample', 'version': '1.1', @@ -525,6 +557,12 @@ def test_create_and_get_sample_with_version(sample_port): 'meta_user': {'a': {'b': 'd'}}}] } + _check_kafka_messages( + kafka, [ + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 1}, + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 2} + ]) + def test_create_sample_as_admin(sample_port): _create_sample_as_admin(sample_port, None, TOKEN2, USER2) @@ -738,7 +776,7 @@ def test_create_sample_fail_no_nodes(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 30001 Illegal input parameter: sample node tree ' + + 'Sample service error code 30001 Illegal input parameter: sample node tree ' + 'must be present and a list') @@ -826,14 +864,14 @@ def test_create_sample_fail_permissions(sample_port): def test_create_sample_fail_admin_bad_user_name(sample_port): _create_sample_fail_admin_as_user( sample_port, 'bad\tuser', - f'Sample service error code 30001 Illegal input parameter: userid contains ' + + 'Sample service error code 30001 Illegal input parameter: userid contains ' + 'control characters') def test_create_sample_fail_admin_no_such_user(sample_port): _create_sample_fail_admin_as_user( sample_port, USER4 + 'impostor', - f'Sample service error code 50000 No such user: user4impostor') + 'Sample service error code 50000 No such user: user4impostor') def _create_sample_fail_admin_as_user(sample_port, user, expected): @@ -883,7 +921,7 @@ def test_create_sample_fail_admin_permissions(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user3 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user3 does not have the ' + 'necessary administration privileges to run method create_sample') @@ -988,7 +1026,7 @@ def test_get_sample_fail_admin_permissions(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user4 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user4 does not have the ' + 'necessary administration privileges to run method get_sample') @@ -1266,7 +1304,7 @@ def test_get_acls_fail_admin_permissions(sample_port): }) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user4 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user4 does not have the ' + 'necessary administration privileges to run method get_sample_acls') @@ -1391,7 +1429,7 @@ def test_replace_acls_fail_bad_user(sample_port): }) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 50000 No such user: a, philbin_j_montgomery_iii') + 'Sample service error code 50000 No such user: a, philbin_j_montgomery_iii') def test_replace_acls_fail_user_in_2_acls(sample_port): @@ -1619,12 +1657,12 @@ def test_create_links_and_get_links_from_sample_basic(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res # get links from sample 2 ret = requests.post(url, headers=get_authorized_headers(TOKEN4), json={ @@ -1875,12 +1913,12 @@ def test_create_data_link_as_admin(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res def test_get_links_from_sample_exclude_workspaces(sample_port, workspace): @@ -1974,12 +2012,12 @@ def test_get_links_from_sample_exclude_workspaces(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res def test_get_links_from_sample_as_admin(sample_port, workspace): @@ -2057,14 +2095,14 @@ def test_create_link_fail(sample_port, workspace): _replace_acls(url, id_, TOKEN3, {'write': [USER4]}) _create_link_fail( # fails if permission granted is admin sample_port, TOKEN4, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot ' + + 'Sample service error code 20000 Unauthorized: User user4 cannot ' + f'administrate sample {id_}') _replace_acls(url, id_, TOKEN3, {'admin': [USER4]}) wscli.set_permissions({'id': 1, 'new_permission': 'r', 'users': [USER4]}) _create_link_fail( # fails if permission granted is write sample_port, TOKEN4, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot write to upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot write to upa 1/1/1') wscli.save_objects({'id': 1, 'objects': [ {'name': 'bar', 'data': {}, 'type': 'Trivial.Object-1.0'}, @@ -2082,12 +2120,12 @@ def test_create_link_fail(sample_port, workspace): 'upa': '1/1/1', 'as_admin': 1, 'as_user': 'foo\bbar'}, - f'Sample service error code 30001 Illegal input parameter: ' + + 'Sample service error code 30001 Illegal input parameter: ' + 'userid contains control characters') _create_link_fail( sample_port, TOKEN3, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1', 'as_user': USER4, 'as_admin': 'f'}, - f'Sample service error code 20000 Unauthorized: User user3 does not have ' + + 'Sample service error code 20000 Unauthorized: User user3 does not have ' + 'the necessary administration privileges to run method create_data_link') _create_link_fail( sample_port, @@ -2098,7 +2136,7 @@ def test_create_link_fail(sample_port, workspace): 'upa': '1/1/1', 'as_user': 'fake', 'as_admin': 'f'}, - f'Sample service error code 50000 No such user: fake') + 'Sample service error code 50000 No such user: fake') def test_create_link_fail_link_exists(sample_port, workspace): @@ -2255,11 +2293,11 @@ def _expire_data_link(sample_port, workspace, dataid): assert_ms_epoch_close_to_now(ret.json()['result'][0]['effective_time']) links = ret.json()['result'][0]['links'] assert len(links) == 2 - for l in links: - if l['dataid'] == 'fake': - current_link = l + for link in links: + if link['dataid'] == 'fake': + current_link = link else: - expired_link = l + expired_link = link assert_ms_epoch_close_to_now(expired_link['expired']) assert_ms_epoch_close_to_now(expired_link['created'] + 1000) del expired_link['created'] @@ -2428,7 +2466,7 @@ def test_expire_data_link_fail(sample_port, workspace): wscli.set_permissions({'id': 1, 'new_permission': 'w', 'users': [USER4]}) _expire_data_link_fail( sample_port, TOKEN4, {'upa': '1/1/1', 'dataid': 'yay'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot ' + + 'Sample service error code 20000 Unauthorized: User user4 cannot ' + f'administrate sample {id1}') # admin tests @@ -2561,12 +2599,12 @@ def test_get_links_from_data(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res # get links from object 1/1/1 ret = requests.post(url, headers=get_authorized_headers(TOKEN3), json={ @@ -2793,10 +2831,10 @@ def test_get_links_from_data_fail(sample_port, workspace): "value of 'foo' is not a valid epoch millisecond timestamp") _get_link_from_data_fail( sample_port, TOKEN4, {'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') _get_link_from_data_fail( sample_port, TOKEN3, {'upa': '1/2/1'}, - f'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') + 'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') # admin tests (also tests missing / deleted objects) _get_link_from_data_fail( @@ -3110,14 +3148,14 @@ def test_get_sample_via_data_fail(sample_port, workspace): 'Sample service error code 30000 Missing input parameter: version') _get_sample_via_data_fail( sample_port, TOKEN4, {'upa': '1/1/1', 'id': id1, 'version': 1}, - f'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/2/1', 'id': id1, 'version': 1}, - f'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') + 'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') badid = uuid.uuid4() _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/1/1', 'id': str(badid), 'version': 1}, - f'Sample service error code 50050 No such data link: There is no link from UPA 1/1/1 ' + + 'Sample service error code 50050 No such data link: There is no link from UPA 1/1/1 ' + f'to sample {badid}') _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/1/1', 'id': str(id1), 'version': 2}, @@ -3167,7 +3205,7 @@ def test_user_lookup_build_fail_bad_auth_url(sample_port, auth): def test_user_lookup_build_fail_not_auth_url(): _user_lookup_build_fail( - f'https://ci.kbase.us/services', + 'https://ci.kbase.us/services', TOKEN1, IOError('Non-JSON response from KBase auth server, status code: 404')) @@ -3380,21 +3418,24 @@ def test_kafka_notifier_new_sample(sample_port, kafka): bootstrap_servers=f'localhost:{kafka.port}', auto_offset_reset='earliest', group_id='foo') # quiets warnings - - id_ = uuid.uuid4() - - kn.notify_new_sample_version(id_, 6) - - res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot - assert len(res) == 1 - assert next(iter(res.keys())).topic == 'mytopic' + 242 * 'a' - records = next(iter(res.values())) - assert len(records) == 1 - assert json.loads(records[0].value) == { - 'event_type': 'NEW_SAMPLE', - 'sample_id': str(id_), - 'sample_ver': 6 - } + try: + id_ = uuid.uuid4() + + kn.notify_new_sample_version(id_, 6) + + res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot + assert len(res) == 1 + assert next(iter(res.keys())).topic == 'mytopic' + 242 * 'a' + records = next(iter(res.values())) + assert len(records) == 1 + assert json.loads(records[0].value) == { + 'event_type': 'NEW_SAMPLE', + 'sample_id': str(id_), + 'sample_ver': 6 + } + finally: + kc.close() + kn.close() def test_kafka_notifier_init_fail(): @@ -3424,6 +3465,10 @@ def test_kafka_notifier_notify_new_sample_version_fail(sample_port, kafka): _kafka_notifier_notify_new_sample_version_fail(kn, uuid.uuid4(), -3, ValueError( 'sample_ver must be > 0')) + kn.close() + _kafka_notifier_notify_new_sample_version_fail(kn, uuid.uuid4(), 1, ValueError( + 'client is closed')) + def _kafka_notifier_notify_new_sample_version_fail(notifier, sample, version, expected): with raises(Exception) as got: diff --git a/test/kafka_controller.py b/test/kafka_controller.py index b0104c23..1b767e66 100644 --- a/test/kafka_controller.py +++ b/test/kafka_controller.py @@ -148,6 +148,8 @@ def clear_topic(self, topic: str): """ Remove all records from a topic. + Note this takes about 2 seconds. + :param topic: the topic to clear. """ exe = self._bin_dir.joinpath(self._KAFKA_TOPIC_EXE) @@ -159,6 +161,8 @@ def clear_topic(self, topic: str): def clear_all_topics(self): """ Remove all records from all topics. + + Note this takes about 2 seconds per topic. """ cons = KafkaConsumer(bootstrap_servers=[f'localhost:{self.port}'], group_id='foo') for topic in cons.topics(): @@ -194,8 +198,8 @@ def _print_logs(self, file_, name, dump_logs_to_stdout): if dump_logs_to_stdout: print(f'\n{name} logs:') with open(file_.name) as f: - for l in f: - print(l) + for line in f: + print(line) def main(): diff --git a/test/mongo_controller.py b/test/mongo_controller.py index 47805ce7..13d9a64b 100644 --- a/test/mongo_controller.py +++ b/test/mongo_controller.py @@ -76,9 +76,9 @@ def __init__(self, mongoexe: Path, root_temp_dir: Path, use_wired_tiger: bool = # get some info about the db self.db_version = self.client.server_info()['version'] - self.index_version = 2 if (semver.compare(self.db_version, '3.4.0') >= 0) else 1 - self.includes_system_indexes = (semver.compare(self.db_version, '3.2.0') < 0 - and not use_wired_tiger) + s = semver.VersionInfo.parse + self.index_version = 2 if (s(self.db_version) >= s('3.4.0')) else 1 + self.includes_system_indexes = (s(self.db_version) < s('3.2.0') and not use_wired_tiger) def destroy(self, delete_temp_files: bool) -> None: """