From 00e679cf3701a3da46f0a3592720226c858721b8 Mon Sep 17 00:00:00 2001 From: "Diego M. Rodriguez" Date: Thu, 20 Apr 2017 17:14:46 +0200 Subject: [PATCH 1/3] [feat] add v3 Test/TestRun functionality Add methods for interacting with Tests and TestRuns through the v3 API including the related Resources and the following client methods: * create_test_run() * get_test_run() * get_test() * list_tests() * list_test_run_results() * list_test_run_results_ids() --- loadimpact3/clients.py | 23 +++- loadimpact3/resources.py | 244 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 263 insertions(+), 4 deletions(-) diff --git a/loadimpact3/clients.py b/loadimpact3/clients.py index ee5fcdd..825d0ce 100644 --- a/loadimpact3/clients.py +++ b/loadimpact3/clients.py @@ -36,8 +36,9 @@ MissingApiTokenError, NotFoundError, RateLimitError, ServerError, TimeoutError, UnauthorizedError) from .resources import ( - DataStore, UserScenario, UserScenarioValidation, - UserScenarioValidationResult, Organization, OrganizationProject) + DataStore, Test, TestRun, TestRunResultsIds, UserScenario, + UserScenarioValidation, UserScenarioValidationResult, + Organization, OrganizationProject, TestRunResults) try: from urlparse import urljoin @@ -123,6 +124,24 @@ def list_organizations(self): def list_organization_projects(self, org_id): return OrganizationProject.list(self, org_id) + def list_tests(self, project_id): + return Test.list(self, project_id=project_id) + + def get_test(self, resource_id): + return Test.get(self, resource_id) + + def get_test_run(self, resource_id): + return TestRun.get(self, resource_id) + + def create_test_run(self, data): + return TestRun.create(self, data) + + def list_test_run_result_ids(self, resource_id, data): + return TestRunResultsIds.list(self, resource_id, data) + + def list_test_run_results(self, resource_id, data): + return TestRunResults.list(self, resource_id, data) + @requests_exceptions_handling def delete(self, path, headers=None, params=None): """Make a DELETE request to the API. diff --git a/loadimpact3/resources.py b/loadimpact3/resources.py index 877cf97..5392b8f 100644 --- a/loadimpact3/resources.py +++ b/loadimpact3/resources.py @@ -18,14 +18,18 @@ from __future__ import absolute_import -__all__ = ['DataStore', 'LoadZone', +import hashlib + +import sys + +__all__ = ['DataStore', 'LoadZone', 'Test', 'UserScenario', 'UserScenarioValidation'] import json from .exceptions import CoercionError, ResponseParseError from .fields import ( - DataStoreListField, DateTimeField, Field, IntegerField, + DataStoreListField, DateTimeField, DictField, Field, IntegerField, UnicodeField, BooleanField) from pprint import pformat @@ -173,6 +177,39 @@ def list(cls, client, resource_id=None, project_id=None): raise ResponseParseError(e) +class ListWithParamsMixin(object): + list_content_type = 'application/json' + + fields = { + 'type': IntegerField, + 'offset': IntegerField, + 'ids': DictField, + } + + @classmethod + def _path(cls, resource_id): + return '{0}/{1}/result_ids'.format(cls.resource_name, resource_id) + + @classmethod + def list(cls, client, resource_id=None, data=None): + path = cls._path(resource_id=resource_id) + response = client.post(path, + headers={'Content-Type': cls.list_content_type}, + data=json.dumps(data)) + objects = [] + try: + name = cls.resource_response_objects_name + r = response.json() + r_obj = r.get(name) + for obj in r_obj: + instance = cls(client) + instance._set_fields(obj) + objects.append(instance) + return objects + except CoercionError as e: + raise ResponseParseError(e) + + class DataStore(Resource, ListMixin, GetMixin, CreateMixin, UpdateMixin, DeleteMixin): resource_name = 'data-stores' resource_response_object_name = 'data_store' @@ -287,6 +324,209 @@ def name_to_id(cls, name): raise ValueError("There's no load zone with name '%s'" % name) +class Test(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin): + resource_name = 'tests' + resource_response_object_name = 'test' + resource_response_objects_name = 'tests' + project_id = None + + fields = { + 'id': IntegerField, + 'name': (UnicodeField, Field.SERIALIZE), + 'config': (DictField, Field.SERIALIZE), + 'last_test_run_id': IntegerField, + } + + @classmethod + def _path(cls, resource_id=None): + if cls.project_id: + return '{0}?project_id={1}'.format(cls.resource_name, cls.project_id) + return super(Test, cls)._path(resource_id) + + def start_test_run(self): + """Start test run based on this test config. + + Returns: + TestRun with 'Created' status. + Raises: + ClientError from the client. + """ + return self.client.create_test_run({'test_id': self.id}) + + +class TestRun(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin): + resource_name = 'test-runs' + resource_response_object_name = 'test_run' + resource_response_objects_name = 'test_runs' + + STATUS_CREATED = -1 + STATUS_QUEUED = 0 + STATUS_INITIALIZING = 1 + STATUS_RUNNING = 2 + STATUS_FINISHED = 3 + STATUS_TIMED_OUT = 4 + STATUS_ABORTING_USER = 5 + STATUS_ABORTED_USER = 6 + STATUS_ABORTING_SYSTEM = 7 + STATUS_ABORTED_SYSTEM = 8 + STATUS_ABORTED_SCRIPT_ERROR = 9 + STATUS_ABORTING_THRESHOLD = 10 + STATUS_ABORTED_THRESHOLD = 11 + STATUS_FAILED_THRESHOLD = 12 + + fields = { + 'id': IntegerField, + 'queued': DateTimeField, + 'started': DateTimeField, + 'ended': DateTimeField, + 'status': IntegerField, + 'status_text': UnicodeField, + } + + def list_test_run_result_ids(self, data): + return self.client.list_test_run_result_ids(self.id, data) + + def is_done(self): + """Check whether test is done or not. + Returns: + True if test has completed, otherwise False. + Raises: + ResponseParseError: Unable to parse response (sync call) from API. + """ + self.sync() + if self.status in [self.STATUS_FINISHED, self.STATUS_TIMED_OUT, self.STATUS_ABORTED_USER, + self.STATUS_ABORTED_SYSTEM, self.STATUS_ABORTED_SCRIPT_ERROR, + self.STATUS_ABORTED_THRESHOLD, self.STATUS_FAILED_THRESHOLD]: + return True + return False + + +class TestRunResultsIds(Resource, ListWithParamsMixin): + resource_name = 'test-runs' + resource_response_object_name = 'test_run_result_ids' + resource_response_objects_name = 'test_run_result_ids' + + fields = { + 'type': IntegerField, + 'offset': IntegerField, + 'ids': DictField, + } + + # TestRunResults type codes. + TYPE_COMMON = 1 + TYPE_URL = 2 + TYPE_LIVE_FEEDBACK = 3 + TYPE_LOG = 4 + TYPE_CUSTOM_METRIC = 5 + TYPE_PAGE = 6 + TYPE_DTP = 7 + TYPE_SYSTEM = 8 + TYPE_SERVER_METRIC = 9 + TYPE_INTEGRATION = 10 + + TYPE_CODE_TO_TEXT_MAP = { + TYPE_COMMON: 'common', + TYPE_URL: 'url', + TYPE_LIVE_FEEDBACK: 'live_feedback', + TYPE_LOG: 'log', + TYPE_CUSTOM_METRIC: 'custom_metric', + TYPE_PAGE: 'page', + TYPE_DTP: 'dtp', + TYPE_SYSTEM: 'system', + TYPE_SERVER_METRIC: 'server_metric', + TYPE_INTEGRATION: 'integration' + } + + @classmethod + def _path(cls, resource_id): + return '{0}/{1}/result_ids'.format(cls.resource_name, resource_id) + + @classmethod + def results_type_code_to_text(cls, results_type_code): + return cls.TYPE_CODE_TO_TEXT_MAP.get(results_type_code, 'unknown') + + +class TestRunResults(Resource, ListWithParamsMixin): + resource_name = 'test-runs' + resource_response_object_name = 'test_run_results' + resource_response_objects_name = 'test_run_results' + + fields = { + 'id': UnicodeField, + 'sid': UnicodeField, + 'type': IntegerField, + 'offset': IntegerField, + 'data': DictField, + } + + @classmethod + def _path(cls, resource_id): + return '{0}/{1}/results'.format(cls.resource_name, resource_id) + + +class TestRunMetric(object): + ACCUMULATED_LOAD_TIME = '__li_accumulated_load_time' + ACTIVE_USERS = '__li_clients_active' + ACTIVE_CONNECTIONS = '__li_connections_active' + BANDWIDTH = '__li_bandwidth' + CONTENT_TYPES = '__li_content_type' + CONTENT_TYPES_LOAD_TIME = '__li_content_type_load_time' + FAILURE_RATE = '__li_failure_rate' + LIVE_FEEDBACK = '__li_live_feedback' + LOAD_GENERATOR_CPU_UTILIZATION = '__li_loadgen_cpu_utilization' + LOAD_GENERATOR_MEMORY_UTILIZATION = '__li_loadgen_memory_utilization' + LOG = '__li_log' + PROGRESS_PERCENT = '__li_progress_percent_total' + REQUESTS_PER_SECOND = '__li_requests_per_second' + TOTAL_BYTES_RECEIVED = '__li_total_rx_bytes' + TOTAL_REQUESTS = '__li_total_requests' + USER_LOAD_TIME = '__li_user_load_time' + USER_SCENARIO_REPETITION_SUCCESS_RATE = '__li_reps_succeeded_percent' + USER_SCENARIO_REPETITION_FAILURE_RATE = '__li_reps_failed_percent' + + @classmethod + def result_id_from_name(cls, name, load_zone_id=None, user_scenario_id=None): + if not load_zone_id: + return name + if not user_scenario_id: + return '%s:%s' % (name, str(load_zone_id)) + return '%s:%s:%s' % (name, str(load_zone_id), str(user_scenario_id)) + + @classmethod + def result_id_from_custom_metric_name(cls, custom_name, load_zone_id, + user_scenario_id): + if sys.version_info >= (3, 0): + custom_name = custom_name.encode('utf-8') + else: + if isinstance(custom_name, unicode): + custom_name = custom_name.encode('utf-8') + return '__custom_%s:%s:%s' % (hashlib.md5(custom_name).hexdigest(), + str(load_zone_id), str(user_scenario_id)) + + @classmethod + def result_id_for_page(cls, page_name, load_zone_id, user_scenario_id): + if sys.version_info >= (3, 0): + page_name = page_name.encode('utf-8') + else: + if isinstance(page_name, unicode): + page_name = page_name.encode('utf-8') + return '__li_page_%s:%s:%s' % (hashlib.md5(page_name).hexdigest(), + str(load_zone_id), str(user_scenario_id)) + + @classmethod + def result_id_for_url(cls, url, load_zone_id, user_scenario_id, + method='GET', status_code=200): + if sys.version_info >= (3, 0): + url = url.encode('utf-8') + else: + if isinstance(url, unicode): + url = url.encode('utf-8') + return '__li_url_%s:%s:%s:%s:%s' % (hashlib.md5(url).hexdigest(), + str(load_zone_id), + str(user_scenario_id), + str(status_code), method) + + class UserScenario(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin, UpdateMixin): resource_name = 'user-scenarios' From 7603df77e19ad65855243e1e4df5bbdf522b7758 Mon Sep 17 00:00:00 2001 From: "Diego M. Rodriguez" Date: Wed, 26 Apr 2017 11:18:41 +0200 Subject: [PATCH 2/3] [feat] add streaming of metrics from a TestRun Add `TestRun.result_stream()` for allowing the streaming of metrics during a TestRun, inspired on the existing apiv2 mechanism. --- loadimpact3/fields.py | 14 +++++ loadimpact3/resources.py | 104 +++++++++++++++++++++++++++++++++++- test/test_resources.py | 110 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 218 insertions(+), 10 deletions(-) diff --git a/loadimpact3/fields.py b/loadimpact3/fields.py index 760636a..d699413 100644 --- a/loadimpact3/fields.py +++ b/loadimpact3/fields.py @@ -91,6 +91,20 @@ def default(cls): return datetime.utcnow().replace(tzinfo=UTC()) +class TimeStampField(DateTimeField): + field_type = datetime + + @classmethod + def coerce(cls, value): + if not isinstance(value, cls.field_type): + try: + return datetime.fromtimestamp(value/10**6).replace(tzinfo=UTC()) + + except ValueError as e: + raise CoercionError(e) + return value + + class DictField(Field): field_type = dict diff --git a/loadimpact3/resources.py b/loadimpact3/resources.py index 5392b8f..8733532 100644 --- a/loadimpact3/resources.py +++ b/loadimpact3/resources.py @@ -19,6 +19,7 @@ from __future__ import absolute_import import hashlib +from time import sleep import sys @@ -30,7 +31,8 @@ from .exceptions import CoercionError, ResponseParseError from .fields import ( DataStoreListField, DateTimeField, DictField, Field, IntegerField, - UnicodeField, BooleanField) + UnicodeField, BooleanField, ListField, TimeStampField, FloatField) +from .utils import is_dict_different from pprint import pformat @@ -354,11 +356,72 @@ def start_test_run(self): return self.client.create_test_run({'test_id': self.id}) +class _TestRunResultStream(object): + def __init__(self, test_run, result_ids): + self.test_run = test_run + self.result_ids = result_ids + self._last = dict([(rid, {'offset': -1, 'data': {}}) for rid in result_ids]) + self._last_two = [] + self._series = {} + + @property + def series(self): + return self._series + + def __call__(self, poll_rate=3, post_polls=5): + def is_done(self): + if not self.test_run.is_done() or not self.is_done(): + return False + return True + + done = False + while not done or 0 < post_polls: + done = is_done(self) + if done: + post_polls -= 1 + q = ['%s|%d' % (rid, self._last.get(rid, {}).get('offset', -1)) + for rid in self.result_ids] + + results = TestRunResults.list(self.test_run.client, self.test_run.id, {'ids': ','.join(q)}) + change = {} + for result in results: + try: + if result.offset > self._last[result.sid]['offset']: + change[result.sid] = result.data[-1] + self._last[result.sid]['data'] = result.data[-1] + self._last[result.sid]['offset'] = result.offset + except (IndexError, KeyError): + continue + self._series.setdefault(result.id, []).extend(result.data) + + if 2 == len(self._last_two): + self._last_two.pop(0) + self._last_two.append(self._last) + if change: + yield {k: TestRunMetricPoint(None, **v) for k,v in change.iteritems()} + sleep(poll_rate) + + def __iter__(self): + return self.__call__() + + def is_done(self): + if 2 != len(self._last_two): + return False + if is_dict_different(self._last_two[0], self._last_two[1]): + return False + return True + + def _get(self, path, params): + return self.test_run.client.get(path, params=params) + + class TestRun(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin): resource_name = 'test-runs' resource_response_object_name = 'test_run' resource_response_objects_name = 'test_runs' + stream_class = _TestRunResultStream + STATUS_CREATED = -1 STATUS_QUEUED = 0 STATUS_INITIALIZING = 1 @@ -386,6 +449,24 @@ class TestRun(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin): def list_test_run_result_ids(self, data): return self.client.list_test_run_result_ids(self.id, data) + def result_stream(self, result_ids=None): + """Get access to result stream. + Args: + result_ids: List of result IDs to include in this stream. + Returns: + Test result stream object. + """ + if not result_ids: + load_zone_id = LoadZone.name_to_id(LoadZone.AGGREGATE_WORLD) + result_ids = [ + TestRunMetric.result_id_from_name(TestRunMetric.ACTIVE_USERS, load_zone_id), + TestRunMetric.result_id_from_name(TestRunMetric.REQUESTS_PER_SECOND, load_zone_id), + TestRunMetric.result_id_from_name(TestRunMetric.BANDWIDTH, load_zone_id), + TestRunMetric.result_id_from_name(TestRunMetric.USER_LOAD_TIME, load_zone_id), + TestRunMetric.result_id_from_name(TestRunMetric.FAILURE_RATE, load_zone_id) + ] + return self.__class__.stream_class(self, result_ids) + def is_done(self): """Check whether test is done or not. Returns: @@ -456,7 +537,7 @@ class TestRunResults(Resource, ListWithParamsMixin): 'sid': UnicodeField, 'type': IntegerField, 'offset': IntegerField, - 'data': DictField, + 'data': ListField, } @classmethod @@ -527,6 +608,25 @@ def result_id_for_url(cls, url, load_zone_id, user_scenario_id, str(status_code), method) +class TestRunMetricPoint(Resource): + fields = { + 'timestamp': TimeStampField, + 'data': DictField + } + + @classmethod + def _path(cls, resource_id=None, action=None): + raise NotImplementedError + + @property + def aggregate_function(self): + return self.data.keys()[0] + + @property + def value(self): + return self.data.values()[0] + + class UserScenario(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin, UpdateMixin): resource_name = 'user-scenarios' diff --git a/test/test_resources.py b/test/test_resources.py index ff82722..59b35ef 100644 --- a/test/test_resources.py +++ b/test/test_resources.py @@ -27,7 +27,7 @@ from loadimpact3.clients import Client from loadimpact3.fields import IntegerField from loadimpact3.resources import ( - DataStore, LoadZone, Resource, UserScenario, UserScenarioValidation) + DataStore, LoadZone, Resource, TestRun, UserScenario, UserScenarioValidation, TestRunMetric) class MockRequestsResponse(object): @@ -52,20 +52,26 @@ def __init__(self, response_status_code=200, **kwargs): self.last_request_args = None self.last_request_kwargs = None + def _get_nkwargs(self): + nkwargs = {} + response_body = self.kwargs.get('response_body') + if response_body: + if isinstance(response_body, dict): + nkwargs = response_body + elif isinstance(response_body, str): + nkwargs = json.loads(response_body) + + return nkwargs + def _requests_request(self, method, *args, **kwargs): self.last_request_method = method self.last_request_args = args self.last_request_kwargs = kwargs if isinstance(kwargs.get('data'), str): self.last_request_kwargs['data'] = json.loads(kwargs['data']) - nkwargs = {} - if self.kwargs.get('response_body'): - if isinstance(self.kwargs['response_body'], dict): - nkwargs = self.kwargs['response_body'] - elif isinstance(self.kwargs['response_body'], str): - nkwargs = json.loads(self.kwargs['response_body']) + return MockRequestsResponse(status_code=self.response_status_code, - **nkwargs) + **self._get_nkwargs()) class MockResource(Resource): @@ -247,3 +253,91 @@ def test_list(self): self.assertEqual(client.last_request_method, 'get') self.assertEqual(projects[0].id, 1) self.assertEqual(projects[0].name, 'My project') + + +class TestResourcesTest(unittest.TestCase): + + def test_get(self): + client = MockClient(response_body={'test': { + 'id': 1, + 'name': 'test1', + 'config': {'config_key_1': 'config_val_1'}, + 'last_test_run_id': 1001, + }}) + test_ = client.get_test(1) + self.assertEqual(client.last_request_method, 'get') + self.assertEqual(test_.id, 1) + self.assertEqual(test_.name, 'test1') + self.assertEqual(test_.last_test_run_id, 1001) + + def test_list(self): + client = MockClient(response_body={'tests': [{ + 'id': 1, + 'name': 'test1', + 'config': {'config_key_1': 'config_val_1'}, + 'last_test_run_id': 1001, + }]}) + tests = client.list_tests([1]) + self.assertEqual(client.last_request_method, 'get') + self.assertEqual(tests[0].id, 1) + self.assertEqual(tests[0].name, 'test1') + self.assertEqual(tests[0].last_test_run_id, 1001) + + +class TestResourcesTestRun(unittest.TestCase): + + def test_get(self): + client = MockClient(response_body={'test_run': { + 'id': 1, + 'status': TestRun.STATUS_CREATED, + 'status_text': 'created', + }}) + test_run = client.get_test_run(1) + self.assertEqual(client.last_request_method, 'get') + self.assertEqual(test_run.id, 1) + self.assertEqual(test_run.status, TestRun.STATUS_CREATED) + self.assertEqual(test_run.status_text, 'created') + + def test_create(self): + client = MockClient(response_body={'test_run': { + 'id': 1, + 'status': TestRun.STATUS_CREATED, + 'status_text': 'created', + }}) + test_run = client.create_test_run(1) + self.assertEqual(client.last_request_method, 'post') + self.assertEqual(test_run.id, 1) + self.assertEqual(test_run.status, TestRun.STATUS_CREATED) + self.assertEqual(test_run.status_text, 'created') + + +class TestStreaming(unittest.TestCase): + @staticmethod + def mocked_response_body(id_='__li_user_load_time:1', offset=1, data_len=1): + return { + "test_run_results": [ + { + "load_zone_id": 1, + "offset": offset, + "data": [{"timestamp": 1000 + i, "data": {"value": 2000 + i}} + for i in range(offset, offset+data_len)], + "id": id_.split(':')[0], + "sid": id_ + }] + } + + def test_streaming_basic(self): + metric_id = TestRunMetric.result_id_from_name(TestRunMetric.ACTIVE_USERS, load_zone_id=1) + client = MockClient(response_body=True) + client._get_nkwargs = MagicMock(side_effect=[ + self.mocked_response_body(id_=metric_id, offset=1), + self.mocked_response_body(id_=metric_id, offset=2), + ]) + test_run = TestRun(client, id=1, status=TestRun.STATUS_FINISHED) + test_run.sync = MagicMock() + + stream = test_run.result_stream([metric_id]) + for i, data in enumerate(stream(poll_rate=1), 1): + self.assertEqual(client.last_request_method, 'post') + self.assertEqual(data[metric_id].value, 2000 + i) + self.assertEqual(i, 2) From d09ff6bcdaca3e98fa20ca11327f6fc6752f89c6 Mon Sep 17 00:00:00 2001 From: "Diego M. Rodriguez" Date: Tue, 2 May 2017 16:03:31 +0200 Subject: [PATCH 3/3] [chore] update travis, documentation, python3 Update travis configuration re-enabling builds. Revise documentation and comments to reflect latest changes, and minor tweaks for python3 compatibility. --- .travis.yml | 2 +- README.md | 11 ++++++----- loadimpact3/fields.py | 3 +++ loadimpact3/resources.py | 22 +++++++++++++++++++--- test/test_resources.py | 31 ++++++++++++++++++++++++++++++- 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/.travis.yml b/.travis.yml index 96b81ca..118ad1e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ python: - "3.5" install: - python setup.py -q install - - pip install -r requirements.txt --use-mirrors + - pip install -r requirements.txt - pip install coverage==3.7.1 - pip install coveralls script: diff --git a/README.md b/README.md index b460d97..163769f 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ It has one dependency, the [requests](http://www.python-requests.org/) library. Install using `pip`: ```sh -pip install loadimpact +pip install loadimpact-v3 ``` [![PyPI](https://img.shields.io/pypi/v/loadimpact.svg)]() [![PyPI](https://img.shields.io/pypi/dm/loadimpact.svg)]() @@ -27,8 +27,8 @@ your [loadimpact.com account page](https://loadimpact.com/account/). You either enter your API token as an argument when creating the client: ```python -import loadimpact -client = loadimpact.ApiTokenClient(api_token='YOUR_API_TOKEN_GOES_HERE') +import loadimpact3 +client = loadimpact3.ApiTokenClient(api_token='YOUR_API_TOKEN_GOES_HERE') ``` or using environment variables: @@ -37,8 +37,8 @@ or using environment variables: export LOADIMPACT_API_TOKEN=YOUR_API_TOKEN_GOES_HERE ``` ```python -import loadimpact -client = loadimpact.ApiTokenClient() +import loadimpact3 +client = loadimpact3.ApiTokenClient() ``` ## Using an API client @@ -78,6 +78,7 @@ for result in validation_results: print("[{0}] {1}".format(result.timestamp, result.message)) print("Validation completed with status: {0}".format(validation.status_text)) +``` ### Uploading a data store (CSV file with parameterization data) For more information regarding parameterized data have a look at [this diff --git a/loadimpact3/fields.py b/loadimpact3/fields.py index d699413..042cb35 100644 --- a/loadimpact3/fields.py +++ b/loadimpact3/fields.py @@ -92,6 +92,9 @@ def default(cls): class TimeStampField(DateTimeField): + """ + Specific Field that is created by a JSON Unix epoch timestamp format. + """ field_type = datetime @classmethod diff --git a/loadimpact3/resources.py b/loadimpact3/resources.py index 8733532..850d5a3 100644 --- a/loadimpact3/resources.py +++ b/loadimpact3/resources.py @@ -369,6 +369,14 @@ def series(self): return self._series def __call__(self, poll_rate=3, post_polls=5): + """ + Poll the API for results of a TestRun and yield the differences + as `TestRunMetricPoint`s until there are no new results. + + :param poll_rate: seconds between API polls. + :param post_polls: number of polls with the same results to make + before marking the polling as completed. + """ def is_done(self): if not self.test_run.is_done() or not self.is_done(): return False @@ -379,6 +387,7 @@ def is_done(self): done = is_done(self) if done: post_polls -= 1 + # Build the query, requesting results only from each offset onward. q = ['%s|%d' % (rid, self._last.get(rid, {}).get('offset', -1)) for rid in self.result_ids] @@ -387,6 +396,7 @@ def is_done(self): for result in results: try: if result.offset > self._last[result.sid]['offset']: + # Store the changes for the metric and the offset. change[result.sid] = result.data[-1] self._last[result.sid]['data'] = result.data[-1] self._last[result.sid]['offset'] = result.offset @@ -398,7 +408,8 @@ def is_done(self): self._last_two.pop(0) self._last_two.append(self._last) if change: - yield {k: TestRunMetricPoint(None, **v) for k,v in change.iteritems()} + # Coerce the changes to TestRunMetricPoint before returning. + yield {k: TestRunMetricPoint(None, **v) for k, v in change.items()} sleep(poll_rate) def __iter__(self): @@ -609,6 +620,11 @@ def result_id_for_url(cls, url, load_zone_id, user_scenario_id, class TestRunMetricPoint(Resource): + """ + Individual point of a Test Run Metric. This class is provided by + convenience, as the API does not expose get/list methods but rather + is included as `data` on the output of TestRunResults. + """ fields = { 'timestamp': TimeStampField, 'data': DictField @@ -620,11 +636,11 @@ def _path(cls, resource_id=None, action=None): @property def aggregate_function(self): - return self.data.keys()[0] + return next(iter(self.data.keys())) @property def value(self): - return self.data.values()[0] + return next(iter(self.data.values())) class UserScenario(Resource, ListMixin, GetMixin, CreateMixin, DeleteMixin, diff --git a/test/test_resources.py b/test/test_resources.py index 59b35ef..d7099e0 100644 --- a/test/test_resources.py +++ b/test/test_resources.py @@ -311,9 +311,32 @@ def test_create(self): self.assertEqual(test_run.status_text, 'created') +class TestRunResultsIds(unittest.TestCase): + + def test_list(self): + client = MockClient(response_body={'test_run_result_ids': [{ + 'type': 1, + 'offset': 2, + 'ids': {'_li_foo': '', '_li_bar': ''}, + }]}) + test_run = client.list_test_run_result_ids(1, data={'types': '1'}) + self.assertEqual(client.last_request_method, 'post') + self.assertEqual(test_run[0].type, 1) + self.assertEqual(test_run[0].offset, 2) + self.assertEqual(len(test_run[0].ids), 2) + + class TestStreaming(unittest.TestCase): @staticmethod def mocked_response_body(id_='__li_user_load_time:1', offset=1, data_len=1): + """ + Generate the response body of a "test_run_results" API request. + + :param id_: metric name (including loadzone specifier, if needed). + :param offset: offset of the returned results. + :param data_len: number of data points to include. + :return: dict with the response body. + """ return { "test_run_results": [ { @@ -327,17 +350,23 @@ def mocked_response_body(id_='__li_user_load_time:1', offset=1, data_len=1): } def test_streaming_basic(self): + """ + Test the streaming of results for 1 metric with 2 data points. + """ metric_id = TestRunMetric.result_id_from_name(TestRunMetric.ACTIVE_USERS, load_zone_id=1) client = MockClient(response_body=True) client._get_nkwargs = MagicMock(side_effect=[ self.mocked_response_body(id_=metric_id, offset=1), self.mocked_response_body(id_=metric_id, offset=2), + # Use two responses with the same information, combined with stream(post_polls=1) + # so stream() is exhausted in a graceful way. + self.mocked_response_body(id_=metric_id, offset=2), ]) test_run = TestRun(client, id=1, status=TestRun.STATUS_FINISHED) test_run.sync = MagicMock() stream = test_run.result_stream([metric_id]) - for i, data in enumerate(stream(poll_rate=1), 1): + for i, data in enumerate(stream(poll_rate=1, post_polls=1), 1): self.assertEqual(client.last_request_method, 'post') self.assertEqual(data[metric_id].value, 2000 + i) self.assertEqual(i, 2)