From 0c910964951eeafaa1367496fd37a76b741c4202 Mon Sep 17 00:00:00 2001 From: Gavin Date: Fri, 29 May 2020 17:20:30 -0700 Subject: [PATCH] Integrate kafka notifier into service That was a major pain. * deleting a topic & contents takes ~2s locally, tripled test time when run after every test * Kafka consumer polling is pretty slow, and can't be done mid test if assert _close_to_now is used. * It seems as though not closing clients can cause weird errors/hangs in other clients? Not 100% sure about this, was making too many changes too rapidly trying to figure out what was going on. Closing doesn't seem to hurt though. also flake8 mysteriously started whining about new stuff even though it wasn't updated. --- Pipfile | 1 + Pipfile.lock | 88 ++++++++++++- deploy.cfg | 9 +- lib/SampleService/core/config.py | 21 ++- lib/SampleService/core/notification.py | 12 +- test/SampleService_test.py | 175 ++++++++++++++++--------- test/kafka_controller.py | 8 +- test/mongo_controller.py | 6 +- 8 files changed, 242 insertions(+), 78 deletions(-) diff --git a/Pipfile b/Pipfile index 2293da8e..9e9b6b00 100644 --- a/Pipfile +++ b/Pipfile @@ -11,6 +11,7 @@ coveralls = "*" flake8 = "*" pymongo = "*" semver = "*" +ipython = "*" [packages] python-arango = "==5.0.0" diff --git a/Pipfile.lock b/Pipfile.lock index b778b6f0..19ce50e2 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "95de0d8b679b366d403b7f535051b612c4b6e8ff469ffc07fb31dfeed3a0fa67" + "sha256": "78a2d3911fee178c81c2107f5d91f41bbe05004e61b6a97711c9559d342f16be" }, "pipfile-spec": 6, "requires": { @@ -313,6 +313,13 @@ ], "version": "==19.3.0" }, + "backcall": { + "hashes": [ + "sha256:38ecd85be2c1e78f77fd91700c76e14667dc21e2713b63876c0eb901196e01e4", + "sha256:bbbf4b1e5cd2bdb08f915895b51081c041bac22394fdfcfdfbe9f14b77c08bf2" + ], + "version": "==0.1.0" + }, "certifi": { "hashes": [ "sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304", @@ -371,6 +378,13 @@ "index": "pypi", "version": "==2.0.0" }, + "decorator": { + "hashes": [ + "sha256:41fa54c2a0cc4ba648be4fd43cff00aedf5b9465c9bf18d64325bc225f08f760", + "sha256:e3a62f0520172440ca0dcc823749319382e377f37f140a0b99ef45fecb84bfe7" + ], + "version": "==4.4.2" + }, "docopt": { "hashes": [ "sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491" @@ -400,6 +414,28 @@ "markers": "python_version < '3.8'", "version": "==1.6.0" }, + "ipython": { + "hashes": [ + "sha256:5b241b84bbf0eb085d43ae9d46adf38a13b45929ca7774a740990c2c242534bb", + "sha256:f0126781d0f959da852fb3089e170ed807388e986a8dd4e6ac44855845b0fb1c" + ], + "index": "pypi", + "version": "==7.14.0" + }, + "ipython-genutils": { + "hashes": [ + "sha256:72dd37233799e619666c9f639a9da83c34013a73e8bbc79a7a6348d93c61fab8", + "sha256:eb2e116e75ecef9d4d228fdc66af54269afa26ab4463042e33785b887c628ba8" + ], + "version": "==0.2.0" + }, + "jedi": { + "hashes": [ + "sha256:cd60c93b71944d628ccac47df9a60fec53150de53d42dc10a7fc4b5ba6aae798", + "sha256:df40c97641cb943661d2db4c33c2e1ff75d491189423249e989bcea4464f3030" + ], + "version": "==0.17.0" + }, "mccabe": { "hashes": [ "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42", @@ -448,6 +484,28 @@ ], "version": "==20.4" }, + "parso": { + "hashes": [ + "sha256:158c140fc04112dc45bca311633ae5033c2c2a7b732fa33d0955bad8152a8dd0", + "sha256:908e9fae2144a076d72ae4e25539143d40b8e3eafbaeae03c1bfe226f4cdf12c" + ], + "version": "==0.7.0" + }, + "pexpect": { + "hashes": [ + "sha256:0b48a55dcb3c05f3329815901ea4fc1537514d6ba867a152b581d69ae3710937", + "sha256:fc65a43959d153d0114afe13997d439c22823a27cefceb5ff35c2178c6784c0c" + ], + "markers": "sys_platform != 'win32'", + "version": "==4.8.0" + }, + "pickleshare": { + "hashes": [ + "sha256:87683d47965c1da65cdacaf31c8441d12b8044cdec9aca500cd78fc2c683afca", + "sha256:9649af414d74d4df115d5d718f82acb59c9d418196b7b4290ed47a12ce62df56" + ], + "version": "==0.7.5" + }, "pluggy": { "hashes": [ "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", @@ -455,6 +513,20 @@ ], "version": "==0.13.1" }, + "prompt-toolkit": { + "hashes": [ + "sha256:563d1a4140b63ff9dd587bda9557cffb2fe73650205ab6f4383092fb882e7dc8", + "sha256:df7e9e63aea609b1da3a65641ceaf5bc7d05e0a04de5bd45d05dbeffbabf9e04" + ], + "version": "==3.0.5" + }, + "ptyprocess": { + "hashes": [ + "sha256:923f299cc5ad920c68f2bc0bc98b75b9f838b93b599941a6b63ddbc2476394c0", + "sha256:d7cc528d76e76342423ca640335bd3633420dc1366f258cb31d05e865ef5ca1f" + ], + "version": "==0.6.0" + }, "py": { "hashes": [ "sha256:5e27081401262157467ad6e7f851b7aa402c5852dbcb3dae06768434de5752aa", @@ -476,6 +548,13 @@ ], "version": "==2.2.0" }, + "pygments": { + "hashes": [ + "sha256:647344a061c249a3b74e230c739f434d7ea4d8b1d5f3721bc0f3558049b38f44", + "sha256:ff7a40b4860b727ab48fad6360eb351cc1b33cbf9b15a0f689ca5353e9463324" + ], + "version": "==2.6.1" + }, "pymongo": { "hashes": [ "sha256:01b4e10027aef5bb9ecefbc26f5df3368ce34aef81df43850f701e716e3fe16d", @@ -581,6 +660,13 @@ ], "version": "==1.15.0" }, + "traitlets": { + "hashes": [ + "sha256:70b4c6a1d9019d7b4f6846832288f86998aa3b9207c6821f3578a6a6a467fe44", + "sha256:d023ee369ddd2763310e4c3eae1ff649689440d4ae59d7485eb4cfbbe3e359f7" + ], + "version": "==4.3.3" + }, "typed-ast": { "hashes": [ "sha256:0666aa36131496aed8f7be0410ff974562ab7eeac11ef351def9ea6fa28f6355", diff --git a/deploy.cfg b/deploy.cfg index 7fb950dc..180b48bd 100644 --- a/deploy.cfg +++ b/deploy.cfg @@ -70,4 +70,11 @@ schema-collection = {{ schema_collection }} # A URL pointing to a configuration file for any metadata validators to be installed on startup. # See the readme file for a description of the file contents. -metadata-validator-config-url = {{ metadata_validator_config_url }} \ No newline at end of file +metadata-validator-config-url = {{ metadata_validator_config_url }} + +# bootstrap.servers parameter for Kafka notifications. Leave blank to disable the Kafka +# notifier. +kafka-bootstrap-servers = {{ kafka_bootstrap_servers }} +# The topic to which the Kafka notifier should write. Required if kafka-bootstrap-servers is +# provided. +kafka-topic = {{ kafka_topic }} \ No newline at end of file diff --git a/lib/SampleService/core/config.py b/lib/SampleService/core/config.py index dd8a1e74..b19ac18a 100644 --- a/lib/SampleService/core/config.py +++ b/lib/SampleService/core/config.py @@ -21,6 +21,7 @@ from SampleService.core.storage.arango_sample_storage import ArangoSampleStorage \ as _ArangoSampleStorage from SampleService.core.arg_checkers import check_string as _check_string +from SampleService.core.notification import KafkaNotifier as _KafkaNotifer from SampleService.core.user_lookup import KBaseUserLookup from SampleService.core.workspace import WS as _WS @@ -67,13 +68,17 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: ws_token = _check_string_req(config.get('workspace-read-admin-token'), 'config param workspace-read-admin-token') + kafka_servers = _check_string(config.get('kafka-bootstrap-servers'), + 'config param kafka-bootstrap-servers', + optional=True) + kafka_topic = None + if kafka_servers: # have to start the server twice to test no kafka scenario + kafka_topic = _check_string(config.get('kafka-topic'), 'config param kafka-topic') + metaval_url = _check_string(config.get('metadata-validator-config-url'), 'config param metadata-validator-config-url', optional=True) - # build the validators before trying to connect to arango - metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet() - # meta params may have info that shouldn't be logged so don't log any for now. # Add code to deal with this later if needed print(f''' @@ -95,10 +100,15 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: auth-full-admin-roles: {', '.join(full_roles)} auth-read-admin-roles: {', '.join(read_roles)} workspace-url: {ws_url} - workspace-read-admin-token: [REDACTED FOR YOUR PLEASURE] + workspace-read-admin-token: [REDACTED FOR YOUR ULTIMATE PLEASURE] + kafka-bootstrap-servers: {kafka_servers} + kafka-topic: {kafka_topic} metadata-validators-config-url: {metaval_url} ''') + # build the validators before trying to connect to arango + metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet() + arangoclient = _arango.ArangoClient(hosts=arango_url) arango_db = arangoclient.db( arango_db, username=arango_user, password=arango_pwd, verify=True) @@ -114,9 +124,10 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]: col_schema, ) storage.start_consistency_checker() + kafka = _KafkaNotifer(kafka_servers, _cast(str, kafka_topic)) if kafka_servers else None user_lookup = KBaseUserLookup(auth_root_url, auth_token, full_roles, read_roles) ws = _WS(_Workspace(ws_url, token=ws_token)) - return Samples(storage, user_lookup, metaval, ws), user_lookup + return Samples(storage, user_lookup, metaval, ws, kafka), user_lookup def split_value(d: Dict[str, str], key: str): diff --git a/lib/SampleService/core/notification.py b/lib/SampleService/core/notification.py index 67ccb417..a565150b 100644 --- a/lib/SampleService/core/notification.py +++ b/lib/SampleService/core/notification.py @@ -94,6 +94,7 @@ def __init__(self, bootstrap_servers: str, topic: str): # presumably this can be removed once idempotence is supported max_in_flight_requests_per_connection=1, ) + self._closed = False def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): if sample_ver < 1: @@ -104,11 +105,20 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int): self._send_message(msg) def _send_message(self, message): + if self._closed: + raise ValueError('client is closed') future = self._prod.send(self._topic, _json.dumps(message).encode('utf-8')) # ensure the message was send correctly, or if not throw an exeption in the correct thread future.get(timeout=35) # this is very difficult to test - # TODO KAFKA notify on sample save + def close(self): + """ + Close the notifier. + """ + # handle with context at some point + self._prod.close() + self._closed = True + # TODO KAFKA notify on ACL change # TODO KAFKA notify on new link # TODO KAFKA notify on expired link via update diff --git a/test/SampleService_test.py b/test/SampleService_test.py index 1cb8064b..b57431b3 100644 --- a/test/SampleService_test.py +++ b/test/SampleService_test.py @@ -32,7 +32,11 @@ from installed_clients.WorkspaceClient import Workspace as Workspace from core import test_utils -from core.test_utils import assert_ms_epoch_close_to_now, assert_exception_correct, find_free_port +from core.test_utils import ( + assert_ms_epoch_close_to_now, + assert_exception_correct, + find_free_port +) from arango_controller import ArangoController from mongo_controller import MongoController from workspace_controller import WorkspaceController @@ -83,8 +87,10 @@ USER_NO_TOKEN2 = 'usernt2' USER_NO_TOKEN3 = 'usernt3' +KAFKA_TOPIC = 'sampleservice' -def create_deploy_cfg(auth_port, arango_port, workspace_port): + +def create_deploy_cfg(auth_port, arango_port, workspace_port, kafka_port): cfg = ConfigParser() ss = 'SampleService' cfg.add_section(ss) @@ -106,6 +112,9 @@ def create_deploy_cfg(auth_port, arango_port, workspace_port): cfg[ss]['workspace-url'] = f'http://localhost:{workspace_port}' cfg[ss]['workspace-read-admin-token'] = TOKEN_WS_READ_ADMIN + cfg[ss]['kafka-bootstrap-servers'] = f'localhost:{kafka_port}' + cfg[ss]['kafka-topic'] = KAFKA_TOPIC + cfg[ss]['sample-collection'] = TEST_COL_SAMPLE cfg[ss]['version-collection'] = TEST_COL_VERSION cfg[ss]['version-edge-collection'] = TEST_COL_VER_EDGE @@ -319,7 +328,7 @@ def kafka(): del_temp = test_utils.get_delete_temp_files() print('shutting down kafka, delete_temp_files={}'.format(del_temp)) - kc.destroy(del_temp, dump_logs_to_stdout=True) + kc.destroy(del_temp, dump_logs_to_stdout=False) @fixture(scope='module') @@ -328,7 +337,7 @@ def service(auth, arango, workspace, kafka): clear_db_and_recreate(arango) # this is completely stupid. The state is calculated on import so there's no way to # test the state creation normally. - cfgpath = create_deploy_cfg(auth.port, arango.port, workspace.port) + cfgpath = create_deploy_cfg(auth.port, arango.port, workspace.port, kafka.port) os.environ['KB_DEPLOYMENT_CONFIG'] = cfgpath from SampleService import SampleServiceServer Thread(target=SampleServiceServer.start_server, kwargs={'port': portint}, daemon=True).start() @@ -346,7 +355,7 @@ def service(auth, arango, workspace, kafka): def sample_port(service, arango, workspace, kafka): clear_db_and_recreate(arango) workspace.clear_db() - kafka.clear_all_topics() + # kafka.clear_all_topics() # too expensive to run after every test yield service @@ -389,6 +398,9 @@ def test_init_fail(): cfg['workspace-url'] = 'crap' init_fail(cfg, MissingParameterError('config param workspace-read-admin-token')) cfg['workspace-read-admin-token'] = 'crap' + cfg['kafka-bootstrap-servers'] = 'crap' + init_fail(cfg, MissingParameterError('config param kafka-topic')) + cfg['kafka-topic'] = 'crap' # get_validators is tested elsewhere, just make sure it'll error out cfg['metadata-validator-config-url'] = 'https://kbase.us/services' init_fail(cfg, ValueError( @@ -423,10 +435,30 @@ def get_authorized_headers(token): return {'authorization': token, 'accept': 'application/json'} -def test_create_and_get_sample_with_version(sample_port): +def _check_kafka_messages(kafka, expected_msgs): + kc = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=f'localhost:{kafka.port}', + auto_offset_reset='earliest', + group_id='foo') # quiets warnings + + try: + res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot + assert len(res) == 1 + assert next(iter(res.keys())).topic == KAFKA_TOPIC + records = next(iter(res.values())) + assert len(records) == len(expected_msgs) + for i, r in enumerate(records): + assert json.loads(r.value) == expected_msgs[i] + # TODO KAFKA this needs a reset before other tests, somehow. commit? + finally: + kc.close() + + +def test_create_and_get_sample_with_version(sample_port, kafka): url = f'http://localhost:{sample_port}' - # verison 1 + # version 1 ret = requests.post(url, headers=get_authorized_headers(TOKEN1), json={ 'method': 'SampleService.create_sample', 'version': '1.1', @@ -525,6 +557,12 @@ def test_create_and_get_sample_with_version(sample_port): 'meta_user': {'a': {'b': 'd'}}}] } + _check_kafka_messages( + kafka, [ + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 1}, + {'event_type': 'NEW_SAMPLE', 'sample_id': id_, 'sample_ver': 2} + ]) + def test_create_sample_as_admin(sample_port): _create_sample_as_admin(sample_port, None, TOKEN2, USER2) @@ -738,7 +776,7 @@ def test_create_sample_fail_no_nodes(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 30001 Illegal input parameter: sample node tree ' + + 'Sample service error code 30001 Illegal input parameter: sample node tree ' + 'must be present and a list') @@ -826,14 +864,14 @@ def test_create_sample_fail_permissions(sample_port): def test_create_sample_fail_admin_bad_user_name(sample_port): _create_sample_fail_admin_as_user( sample_port, 'bad\tuser', - f'Sample service error code 30001 Illegal input parameter: userid contains ' + + 'Sample service error code 30001 Illegal input parameter: userid contains ' + 'control characters') def test_create_sample_fail_admin_no_such_user(sample_port): _create_sample_fail_admin_as_user( sample_port, USER4 + 'impostor', - f'Sample service error code 50000 No such user: user4impostor') + 'Sample service error code 50000 No such user: user4impostor') def _create_sample_fail_admin_as_user(sample_port, user, expected): @@ -883,7 +921,7 @@ def test_create_sample_fail_admin_permissions(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user3 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user3 does not have the ' + 'necessary administration privileges to run method create_sample') @@ -988,7 +1026,7 @@ def test_get_sample_fail_admin_permissions(sample_port): # print(ret.text) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user4 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user4 does not have the ' + 'necessary administration privileges to run method get_sample') @@ -1266,7 +1304,7 @@ def test_get_acls_fail_admin_permissions(sample_port): }) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 20000 Unauthorized: User user4 does not have the ' + + 'Sample service error code 20000 Unauthorized: User user4 does not have the ' + 'necessary administration privileges to run method get_sample_acls') @@ -1391,7 +1429,7 @@ def test_replace_acls_fail_bad_user(sample_port): }) assert ret.status_code == 500 assert ret.json()['error']['message'] == ( - f'Sample service error code 50000 No such user: a, philbin_j_montgomery_iii') + 'Sample service error code 50000 No such user: a, philbin_j_montgomery_iii') def test_replace_acls_fail_user_in_2_acls(sample_port): @@ -1619,12 +1657,12 @@ def test_create_links_and_get_links_from_sample_basic(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res # get links from sample 2 ret = requests.post(url, headers=get_authorized_headers(TOKEN4), json={ @@ -1875,12 +1913,12 @@ def test_create_data_link_as_admin(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res def test_get_links_from_sample_exclude_workspaces(sample_port, workspace): @@ -1974,12 +2012,12 @@ def test_get_links_from_sample_exclude_workspaces(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res def test_get_links_from_sample_as_admin(sample_port, workspace): @@ -2057,14 +2095,14 @@ def test_create_link_fail(sample_port, workspace): _replace_acls(url, id_, TOKEN3, {'write': [USER4]}) _create_link_fail( # fails if permission granted is admin sample_port, TOKEN4, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot ' + + 'Sample service error code 20000 Unauthorized: User user4 cannot ' + f'administrate sample {id_}') _replace_acls(url, id_, TOKEN3, {'admin': [USER4]}) wscli.set_permissions({'id': 1, 'new_permission': 'r', 'users': [USER4]}) _create_link_fail( # fails if permission granted is write sample_port, TOKEN4, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot write to upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot write to upa 1/1/1') wscli.save_objects({'id': 1, 'objects': [ {'name': 'bar', 'data': {}, 'type': 'Trivial.Object-1.0'}, @@ -2082,12 +2120,12 @@ def test_create_link_fail(sample_port, workspace): 'upa': '1/1/1', 'as_admin': 1, 'as_user': 'foo\bbar'}, - f'Sample service error code 30001 Illegal input parameter: ' + + 'Sample service error code 30001 Illegal input parameter: ' + 'userid contains control characters') _create_link_fail( sample_port, TOKEN3, {'id': id_, 'version': 1, 'node': 'foo', 'upa': '1/1/1', 'as_user': USER4, 'as_admin': 'f'}, - f'Sample service error code 20000 Unauthorized: User user3 does not have ' + + 'Sample service error code 20000 Unauthorized: User user3 does not have ' + 'the necessary administration privileges to run method create_data_link') _create_link_fail( sample_port, @@ -2098,7 +2136,7 @@ def test_create_link_fail(sample_port, workspace): 'upa': '1/1/1', 'as_user': 'fake', 'as_admin': 'f'}, - f'Sample service error code 50000 No such user: fake') + 'Sample service error code 50000 No such user: fake') def test_create_link_fail_link_exists(sample_port, workspace): @@ -2255,11 +2293,11 @@ def _expire_data_link(sample_port, workspace, dataid): assert_ms_epoch_close_to_now(ret.json()['result'][0]['effective_time']) links = ret.json()['result'][0]['links'] assert len(links) == 2 - for l in links: - if l['dataid'] == 'fake': - current_link = l + for link in links: + if link['dataid'] == 'fake': + current_link = link else: - expired_link = l + expired_link = link assert_ms_epoch_close_to_now(expired_link['expired']) assert_ms_epoch_close_to_now(expired_link['created'] + 1000) del expired_link['created'] @@ -2428,7 +2466,7 @@ def test_expire_data_link_fail(sample_port, workspace): wscli.set_permissions({'id': 1, 'new_permission': 'w', 'users': [USER4]}) _expire_data_link_fail( sample_port, TOKEN4, {'upa': '1/1/1', 'dataid': 'yay'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot ' + + 'Sample service error code 20000 Unauthorized: User user4 cannot ' + f'administrate sample {id1}') # admin tests @@ -2561,12 +2599,12 @@ def test_get_links_from_data(sample_port, workspace): ] assert len(res) == len(expected_links) - for l in res: - assert_ms_epoch_close_to_now(l['created']) - del l['created'] + for link in res: + assert_ms_epoch_close_to_now(link['created']) + del link['created'] - for l in expected_links: - assert l in res + for link in expected_links: + assert link in res # get links from object 1/1/1 ret = requests.post(url, headers=get_authorized_headers(TOKEN3), json={ @@ -2793,10 +2831,10 @@ def test_get_links_from_data_fail(sample_port, workspace): "value of 'foo' is not a valid epoch millisecond timestamp") _get_link_from_data_fail( sample_port, TOKEN4, {'upa': '1/1/1'}, - f'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') _get_link_from_data_fail( sample_port, TOKEN3, {'upa': '1/2/1'}, - f'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') + 'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') # admin tests (also tests missing / deleted objects) _get_link_from_data_fail( @@ -3110,14 +3148,14 @@ def test_get_sample_via_data_fail(sample_port, workspace): 'Sample service error code 30000 Missing input parameter: version') _get_sample_via_data_fail( sample_port, TOKEN4, {'upa': '1/1/1', 'id': id1, 'version': 1}, - f'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') + 'Sample service error code 20000 Unauthorized: User user4 cannot read upa 1/1/1') _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/2/1', 'id': id1, 'version': 1}, - f'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') + 'Sample service error code 50040 No such workspace data: Object 1/2/1 does not exist') badid = uuid.uuid4() _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/1/1', 'id': str(badid), 'version': 1}, - f'Sample service error code 50050 No such data link: There is no link from UPA 1/1/1 ' + + 'Sample service error code 50050 No such data link: There is no link from UPA 1/1/1 ' + f'to sample {badid}') _get_sample_via_data_fail( sample_port, TOKEN3, {'upa': '1/1/1', 'id': str(id1), 'version': 2}, @@ -3167,7 +3205,7 @@ def test_user_lookup_build_fail_bad_auth_url(sample_port, auth): def test_user_lookup_build_fail_not_auth_url(): _user_lookup_build_fail( - f'https://ci.kbase.us/services', + 'https://ci.kbase.us/services', TOKEN1, IOError('Non-JSON response from KBase auth server, status code: 404')) @@ -3380,21 +3418,24 @@ def test_kafka_notifier_new_sample(sample_port, kafka): bootstrap_servers=f'localhost:{kafka.port}', auto_offset_reset='earliest', group_id='foo') # quiets warnings - - id_ = uuid.uuid4() - - kn.notify_new_sample_version(id_, 6) - - res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot - assert len(res) == 1 - assert next(iter(res.keys())).topic == 'mytopic' + 242 * 'a' - records = next(iter(res.values())) - assert len(records) == 1 - assert json.loads(records[0].value) == { - 'event_type': 'NEW_SAMPLE', - 'sample_id': str(id_), - 'sample_ver': 6 - } + try: + id_ = uuid.uuid4() + + kn.notify_new_sample_version(id_, 6) + + res = kc.poll(timeout_ms=2000) # 1s not enough? Seems like a lot + assert len(res) == 1 + assert next(iter(res.keys())).topic == 'mytopic' + 242 * 'a' + records = next(iter(res.values())) + assert len(records) == 1 + assert json.loads(records[0].value) == { + 'event_type': 'NEW_SAMPLE', + 'sample_id': str(id_), + 'sample_ver': 6 + } + finally: + kc.close() + kn.close() def test_kafka_notifier_init_fail(): @@ -3424,6 +3465,10 @@ def test_kafka_notifier_notify_new_sample_version_fail(sample_port, kafka): _kafka_notifier_notify_new_sample_version_fail(kn, uuid.uuid4(), -3, ValueError( 'sample_ver must be > 0')) + kn.close() + _kafka_notifier_notify_new_sample_version_fail(kn, uuid.uuid4(), 1, ValueError( + 'client is closed')) + def _kafka_notifier_notify_new_sample_version_fail(notifier, sample, version, expected): with raises(Exception) as got: diff --git a/test/kafka_controller.py b/test/kafka_controller.py index b0104c23..1b767e66 100644 --- a/test/kafka_controller.py +++ b/test/kafka_controller.py @@ -148,6 +148,8 @@ def clear_topic(self, topic: str): """ Remove all records from a topic. + Note this takes about 2 seconds. + :param topic: the topic to clear. """ exe = self._bin_dir.joinpath(self._KAFKA_TOPIC_EXE) @@ -159,6 +161,8 @@ def clear_topic(self, topic: str): def clear_all_topics(self): """ Remove all records from all topics. + + Note this takes about 2 seconds per topic. """ cons = KafkaConsumer(bootstrap_servers=[f'localhost:{self.port}'], group_id='foo') for topic in cons.topics(): @@ -194,8 +198,8 @@ def _print_logs(self, file_, name, dump_logs_to_stdout): if dump_logs_to_stdout: print(f'\n{name} logs:') with open(file_.name) as f: - for l in f: - print(l) + for line in f: + print(line) def main(): diff --git a/test/mongo_controller.py b/test/mongo_controller.py index 47805ce7..13d9a64b 100644 --- a/test/mongo_controller.py +++ b/test/mongo_controller.py @@ -76,9 +76,9 @@ def __init__(self, mongoexe: Path, root_temp_dir: Path, use_wired_tiger: bool = # get some info about the db self.db_version = self.client.server_info()['version'] - self.index_version = 2 if (semver.compare(self.db_version, '3.4.0') >= 0) else 1 - self.includes_system_indexes = (semver.compare(self.db_version, '3.2.0') < 0 - and not use_wired_tiger) + s = semver.VersionInfo.parse + self.index_version = 2 if (s(self.db_version) >= s('3.4.0')) else 1 + self.includes_system_indexes = (s(self.db_version) < s('3.2.0') and not use_wired_tiger) def destroy(self, delete_temp_files: bool) -> None: """