diff --git a/shotgun_api3/shotgun.py b/shotgun_api3/shotgun.py index e3d1b8a5..c18807f0 100755 --- a/shotgun_api3/shotgun.py +++ b/shotgun_api3/shotgun.py @@ -186,6 +186,12 @@ def ensure_per_project_customization(self): 'label': 'project parameter' }, True) + def ensure_support_for_additional_filter_presets(self): + """Wrapper for ensure_support""" + return self._ensure_support({ + 'version': (7, 0, 0), + 'label': 'additional_filter_presets parameter' + }, True) def __str__(self): return "ServerCapabilities: host %s, version %s, is_dev %s"\ @@ -528,7 +534,8 @@ def info(self): return self._call_rpc("info", None, include_auth_params=False) def find_one(self, entity_type, filters, fields=None, order=None, - filter_operator=None, retired_only=False, include_archived_projects=True): + filter_operator=None, retired_only=False, include_archived_projects=True, + additional_filter_presets=None): """Calls the find() method and returns the first result, or None. :param entity_type: Required, entity type (string) to find. @@ -553,12 +560,24 @@ def find_one(self, entity_type, filters, fields=None, order=None, :param retired_only: Optional, flag to return only entities that have been retried. Defaults to False which returns only entities which have not been retired. - + + :param additional_filter_presets: Optional list of presets to + further filter the result set, list has the form: + [{"preset_name": , : , ... }] + + Note that these filters are ANDed together and ANDed with the 'filter' + argument. + + For details on supported presets and the format of this parameter, + please consult the API documentation: + https://github.com/shotgunsoftware/python-api/wiki/Reference%3A-Filter-Syntax + :returns: Dictionary of requested Shotgun fields and values. """ results = self.find(entity_type, filters, fields, order, - filter_operator, 1, retired_only, include_archived_projects=include_archived_projects) + filter_operator, 1, retired_only, include_archived_projects=include_archived_projects, + additional_filter_presets=additional_filter_presets) if results: return results[0] @@ -566,7 +585,7 @@ def find_one(self, entity_type, filters, fields=None, order=None, def find(self, entity_type, filters, fields=None, order=None, filter_operator=None, limit=0, retired_only=False, page=0, - include_archived_projects=True): + include_archived_projects=True, additional_filter_presets=None): """Find entities matching the given filters. :param entity_type: Required, entity type (string) to find. @@ -593,7 +612,18 @@ def find(self, entity_type, filters, fields=None, order=None, have not been retired. :param include_archived_projects: Optional, flag to include entities - whose projects have been archived + whose projects have been archived. + + :param additional_filter_presets: Optional list of presets to + further filter the result set, list has the form: + [{"preset_name": , : , ... }] + + Note that these filters are ANDed together and ANDed with the 'filter' + argument. + + For details on supported presets and the format of this parameter, + please consult the API documentation: + https://github.com/shotgunsoftware/python-api/wiki/Reference%3A-Filter-Syntax :returns: list of the dicts for each entity with the requested fields, and their id and type. @@ -617,13 +647,16 @@ def find(self, entity_type, filters, fields=None, order=None, # So we only need to check the server version if it is False self.server_caps.ensure_include_archived_projects() + if additional_filter_presets: + self.server_caps.ensure_support_for_additional_filter_presets() params = self._construct_read_parameters(entity_type, fields, filters, retired_only, order, - include_archived_projects) + include_archived_projects, + additional_filter_presets) if limit and limit <= self.config.records_per_page: params["paging"]["entities_per_page"] = limit @@ -667,7 +700,8 @@ def _construct_read_parameters(self, filters, retired_only, order, - include_archived_projects): + include_archived_projects, + additional_filter_presets): params = {} params["type"] = entity_type params["return_fields"] = fields or ["id"] @@ -677,6 +711,9 @@ def _construct_read_parameters(self, params["paging"] = { "entities_per_page": self.config.records_per_page, "current_page": 1 } + if additional_filter_presets: + params["additional_filter_presets"] = additional_filter_presets; + if include_archived_projects is False: # Defaults to True on the server, so only pass it if it's False params["include_archived_projects"] = False @@ -695,7 +732,6 @@ def _construct_read_parameters(self, params['sorts'] = sort_list return params - def _add_project_param(self, params, project_entity): if project_entity and self.server_caps.ensure_per_project_customization(): @@ -1889,8 +1925,7 @@ def text_search(self, text, entity_types, project_ids=None, limit=None): api_entity_types = {} for (entity_type, filter_list) in entity_types.iteritems(): - - + if isinstance(filter_list, (list, tuple)): resolved_filters = _translate_filters(filter_list, filter_operator=None) api_entity_types[entity_type] = resolved_filters diff --git a/tests/test_api.py b/tests/test_api.py index 748d3849..d59014ad 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,6 +14,7 @@ import unittest import urlparse import urllib2 +import warnings import shotgun_api3 from shotgun_api3.lib.httplib2 import Http, SSLHandshakeError @@ -196,11 +197,11 @@ def test_upload_download(self): # test invalid requests INVALID_S3_URL = "https://sg-media-usor-01.s3.amazonaws.com/ada3de3ee3873875e1dd44f2eb0882c75ae36a4a/cd31346421dbeef781e0e480f259a3d36652d7f2/IMG_0465.MOV?AWSAccessKeyId=AKIAIQGOBSVN3FSQ5QFA&Expires=1371789959&Signature=SLbzv7DuVlZ8XAoOSQQAiGpF3u8%3D" - self.assertRaises(shotgun_api3.ShotgunFileDownloadError, + self.assertRaises(shotgun_api3.ShotgunFileDownloadError, self.sg.download_attachment, {"url": INVALID_S3_URL}) INVALID_ATTACHMENT_ID = 99999999 - self.assertRaises(shotgun_api3.ShotgunFileDownloadError, + self.assertRaises(shotgun_api3.ShotgunFileDownloadError, self.sg.download_attachment, INVALID_ATTACHMENT_ID) self.assertRaises(TypeError, self.sg.download_attachment, @@ -318,7 +319,7 @@ def test_upload_thumbnail_for_task(self): self.version['id'], {'image': None}) expected_clear_thumbnail = {'id': self.version['id'], 'image': None, 'type': 'Version'} self.assertEqual(expected_clear_thumbnail, response_clear_thumbnail) - + def test_upload_thumbnail_with_upload_function(self): """Upload thumbnail via upload function test""" path = os.path.abspath(os.path.expanduser(os.path.join(os.path.dirname(__file__), "sg_logo.jpg"))) @@ -326,11 +327,11 @@ def test_upload_thumbnail_with_upload_function(self): # upload thumbnail thumb_id = self.sg.upload("Task", self.task['id'], path, 'image') self.assertTrue(isinstance(thumb_id, int)) - + #upload filmstrip thumbnail f_thumb_id = self.sg.upload("Task", self.task['id'], path, 'filmstrip_image') self.assertTrue(isinstance(f_thumb_id, int)) - + def test_linked_thumbnail_url(self): this_dir, _ = os.path.split(__file__) path = os.path.abspath(os.path.expanduser( @@ -475,7 +476,7 @@ def test_summary_values(self): """Test summarize return data""" # create three unique shots - shot_prefix = uuid.uuid4().hex + shot_prefix = uuid.uuid4().hex shots = [] @@ -492,7 +493,7 @@ def test_summary_values(self): "sg_cut_duration": 100, "project": self.project } - + shot_data_3 = { "code": "%s Shot 3" % shot_prefix, "sg_status_list": "fin", @@ -511,14 +512,14 @@ def test_summary_values(self): shots.append(self.sg.create("Shot", shot_data_2)) shots.append(self.sg.create("Shot", shot_data_3)) shots.append(self.sg.create("Shot", shot_data_4)) - + summaries = [{'field': 'id', 'type': 'count'}, {'field': 'sg_cut_duration', 'type': 'sum'}] - grouping = [{'direction': 'asc', - 'field': 'sg_status_list', + grouping = [{'direction': 'asc', + 'field': 'sg_status_list', 'type': 'exact'}] - filters = [['project', 'is', self.project], + filters = [['project', 'is', self.project], ['code', 'starts_with', shot_prefix]] result = self.sg.summarize('Shot', filters=filters, @@ -1389,7 +1390,7 @@ def test_include_archived_projects(self): result = self.sg.find_one('Shot', [['id','is',self.shot['id']]]) self.assertEquals(self.shot['id'], result['id']) - # archive project + # archive project self.sg.update('Project', self.project['id'], {'archived':True}) # setting defaults to True, so we should get result @@ -1409,7 +1410,7 @@ def setUp(self): def test_follow(self): '''Test follow method''' - + if not self.sg.server_caps.version or self.sg.server_caps.version < (5, 1, 22): return @@ -1418,22 +1419,22 @@ def test_follow(self): def test_unfollow(self): '''Test unfollow method''' - + if not self.sg.server_caps.version or self.sg.server_caps.version < (5, 1, 22): return - + result = self.sg.unfollow(self.human_user, self.shot) assert(result['unfollowed']) - + def test_followers(self): '''Test followers method''' - + if not self.sg.server_caps.version or self.sg.server_caps.version < (5, 1, 22): return - + result = self.sg.follow(self.human_user, self.shot) assert(result['followed']) - + result = self.sg.followers(self.shot) self.assertEqual( 1, len(result) ) self.assertEqual( self.human_user['id'], result[0]['id'] ) @@ -1531,7 +1532,7 @@ def test_sha2_error_with_strict(self, mock_request): # save the original state original_env_val = os.environ.pop("SHOTGUN_FORCE_CERTIFICATE_VALIDATION", None) os.environ["SHOTGUN_FORCE_CERTIFICATE_VALIDATION"] = "1" - + # ensure we're starting with the right values self.sg.config.no_ssl_validation = False shotgun_api3.shotgun.NO_SSL_VALIDATION = False @@ -1577,12 +1578,12 @@ def test_sanitized_auth_params(self, mock_open): class TestScriptUserSudoAuth(base.LiveTestBase): def setUp(self): super(TestScriptUserSudoAuth, self).setUp('ApiUser') - + def test_user_is_creator(self): """ Test 'sudo_as_login' option: on create, ensure appropriate user is set in created-by """ - + if not self.sg.server_caps.version or self.sg.server_caps.version < (5, 3, 12): return @@ -1591,7 +1592,7 @@ def test_user_is_creator(self): self.config.api_key, http_proxy=self.config.http_proxy, sudo_as_login=self.config.human_login ) - + data = { 'project': self.project, 'code':'JohnnyApple_Design01_FaceFinal', @@ -1610,7 +1611,7 @@ def test_user_is_creator(self): class TestHumanUserSudoAuth(base.TestBase): def setUp(self): super(TestHumanUserSudoAuth, self).setUp('HumanUser') - + def test_human_user_sudo_auth_fails(self): """ Test 'sudo_as_login' option for HumanUser. @@ -1642,7 +1643,7 @@ class TestHumanUserAuth(base.HumanUserAuthLiveTestBase): """ Testing the username/password authentication method """ - + def test_humanuser_find(self): """Called find, find_one for known entities as human user""" filters = [] @@ -1768,7 +1769,7 @@ def test_logged_in_user(self): login=self.config.human_login, password=self.config.human_password, http_proxy=self.config.http_proxy) - + initial = sg.find_one('Project', [['id','is',self.project['id']]], ['last_accessed_by_current_user']) sg.update_project_last_accessed(self.project) @@ -1788,7 +1789,7 @@ def test_pass_in_user(self): login=self.config.human_login, password=self.config.human_password, http_proxy=self.config.http_proxy ) - + initial = sg.find_one('Project', [['id','is',self.project['id']]], ['last_accessed_by_current_user']) time.sleep(1) @@ -1827,23 +1828,23 @@ class TestActivityStream(base.LiveTestBase): """ Unit tests for the activity_stream_read() method """ - + def setUp(self): super(TestActivityStream, self).setUp() - self._prefix = uuid.uuid4().hex + self._prefix = uuid.uuid4().hex - self._shot = self.sg.create("Shot", {"code": "%s activity stream test" % self._prefix, + self._shot = self.sg.create("Shot", {"code": "%s activity stream test" % self._prefix, "project": self.project}) - - self._note = self.sg.create("Note", {"content": "Test!", - "project": self.project, + + self._note = self.sg.create("Note", {"content": "Test!", + "project": self.project, "note_links": [self._shot]}) # check that if the created_by is a script user, we want to ensure # that event log generation is enabled for this user. If it has been - # disabled, these tests will fail because the activity stream is + # disabled, these tests will fail because the activity stream is # connected to events. In this case, print a warning to the user - d = self.sg.find_one("Shot", + d = self.sg.find_one("Shot", [["id", "is", self._shot["id"] ]], ["created_by.ApiUser.generate_event_log_entries"]) @@ -1853,34 +1854,34 @@ def setUp(self): "tests has got the generate event log entries setting set to " "off. This will cause the activity stream tests to fail. " "Please enable event log generation for the script user.") - + def tearDown(self): batch_data = [] batch_data.append({"request_type": "delete", "entity_type": self._note["type"], "entity_id": self._note["id"] - }) + }) batch_data.append({"request_type": "delete", "entity_type": self._shot["type"], "entity_id": self._shot["id"] }) self.sg.batch(batch_data) - + super(TestActivityStream, self).tearDown() def test_simple(self): """ Test activity stream """ - + if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.activity_stream_read(self._shot["type"], + + result = self.sg.activity_stream_read(self._shot["type"], self._shot["id"]) - - expected_keys = ["earliest_update_id", + + expected_keys = ["earliest_update_id", "entity_id", "entity_type", "latest_update_id", @@ -1890,20 +1891,20 @@ def test_simple(self): self.assertEqual(len(result["updates"]), 2) self.assertEqual(result["entity_type"], "Shot") self.assertEqual(result["entity_id"], self._shot["id"]) - + def test_limit(self): """ Test limited activity stream """ - + if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.activity_stream_read(self._shot["type"], - self._shot["id"], + + result = self.sg.activity_stream_read(self._shot["type"], + self._shot["id"], limit=1) - + self.assertEqual(len(result["updates"]), 1) self.assertEqual(result["updates"][0]["update_type"], "create") self.assertEqual(result["updates"][0]["meta"]["entity_type"], "Note") @@ -1913,24 +1914,24 @@ def test_extra_fields(self): """ Test additional fields for activity stream """ - + if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.activity_stream_read(self._shot["type"], - self._shot["id"], - entity_fields={"Shot": ["created_by.HumanUser.image"], + + result = self.sg.activity_stream_read(self._shot["type"], + self._shot["id"], + entity_fields={"Shot": ["created_by.HumanUser.image"], "Note": ["content"]}) - + self.assertEqual(len(result["updates"]), 2) - self.assertEqual(set(result["updates"][0]["primary_entity"].keys()), + self.assertEqual(set(result["updates"][0]["primary_entity"].keys()), set(["content", "id", "name", "status", "type"])) - - self.assertEqual(set(result["updates"][1]["primary_entity"].keys()), + + self.assertEqual(set(result["updates"][1]["primary_entity"].keys()), set(["created_by.HumanUser.image", "id", "name", @@ -1941,103 +1942,103 @@ class TestNoteThreadRead(base.LiveTestBase): """ Unit tests for the note_thread_read method """ - + def setUp(self): super(TestNoteThreadRead, self).setUp() - + # get path to our std attahcment this_dir, _ = os.path.split(__file__) self._thumbnail_path = os.path.abspath(os.path.join(this_dir, "sg_logo.jpg")) - - + + def _check_note(self, data, note_id, additional_fields): - + # check the expected fields expected_fields = set(["content", "created_at", "created_by", "id", "type"] + additional_fields) - + self.assertEqual(expected_fields, set(data.keys())) - + # check that the data matches the data we get from a find call - note_data = self.sg.find_one("Note", - [["id", "is", note_id]], + note_data = self.sg.find_one("Note", + [["id", "is", note_id]], list(expected_fields)) self.assertEqual(note_data, data) - + def _check_reply(self, data, reply_id, additional_fields): - + # check the expected fields expected_fields = set(["content", "created_at", "user", "id", "type"] + additional_fields) self.assertEqual(expected_fields, set(data.keys())) - + # check that the data matches the data we get from a find call - reply_data = self.sg.find_one("Reply", - [["id", "is", reply_id]], + reply_data = self.sg.find_one("Reply", + [["id", "is", reply_id]], list(expected_fields)) - + # the reply stream adds an image to the user fields in order # to include thumbnails for users, so remove this before we compare # against the shotgun find data. The image is tested elsewhere. del data["user"]["image"] - + self.assertEqual(reply_data, data) - + def _check_attachment(self, data, attachment_id, additional_fields): - - - - + + + + # check the expected fields expected_fields = set(["created_at", "created_by", "id", "type"] + additional_fields) self.assertEqual(expected_fields, set(data.keys())) - + # check that the data matches the data we get from a find call - attachment_data = self.sg.find_one("Attachment", - [["id", "is", attachment_id]], + attachment_data = self.sg.find_one("Attachment", + [["id", "is", attachment_id]], list(expected_fields)) - + self.assertEqual(attachment_data, data) - + def test_simple(self): """ Test note reply thread API call """ if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - + # create note note = self.sg.create( "Note", {"content": "Test!", "project": self.project}) - + # for this test, we check that the replies returned also # contain the thumbnail associated with the user doing the # reply. For this, make sure that there is a thumbnail # associated with the current user - - d = self.sg.find_one("Note", - [["id", "is", note["id"]]], + + d = self.sg.find_one("Note", + [["id", "is", note["id"]]], ["created_by", "created_by.ApiUser.image"]) - + current_thumbnail = d["created_by.ApiUser.image"] - + if current_thumbnail is None: # upload thumbnail - self.sg.upload_thumbnail("ApiUser", - d["created_by"]["id"], + self.sg.upload_thumbnail("ApiUser", + d["created_by"]["id"], self._thumbnail_path) - d = self.sg.find_one("Note", - [["id", "is", note["id"]]], + d = self.sg.find_one("Note", + [["id", "is", note["id"]]], ["created_by", "created_by.ApiUser.image"]) - + current_thumbnail = d["created_by.ApiUser.image"] - + # get thread result = self.sg.note_thread_read(note["id"]) self.assertEqual(len(result), 1) self._check_note(result[0], note["id"], additional_fields=[]) - + # now add a reply reply = self.sg.create( "Reply", {"content": "Reply Content", "entity": note}) - + # get thread result = self.sg.note_thread_read(note["id"]) self.assertEqual(len(result), 2) @@ -2050,198 +2051,334 @@ def test_simple(self): url_obj_b = urlparse.urlparse(reply_thumb) self.assertEqual("%s/%s" % (url_obj_a.netloc, url_obj_a.path), "%s/%s" % (url_obj_b.netloc, url_obj_b.path),) - + # and check ther rest of the data self._check_note(result[0], note["id"], additional_fields=[]) self._check_reply(result[1], reply["id"], additional_fields=[]) - + # now upload an attachment - attachment_id = self.sg.upload(note["type"], note["id"], self._thumbnail_path) - + attachment_id = self.sg.upload(note["type"], note["id"], self._thumbnail_path) + # get thread result = self.sg.note_thread_read(note["id"]) self.assertEqual(len(result), 3) self._check_note(result[0], note["id"], additional_fields=[]) self._check_reply(result[1], reply["id"], additional_fields=[]) self._check_attachment(result[2], attachment_id, additional_fields=[]) - + def test_complex(self): """ Test note reply thread API call with additional params """ - + if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - additional_fields = { - "Note": ["created_by.HumanUser.image", - "addressings_to", - "playlist", + + additional_fields = { + "Note": ["created_by.HumanUser.image", + "addressings_to", + "playlist", "user" ], - "Reply": ["content"], + "Reply": ["content"], "Attachment": ["this_file"] } - + # create note - note = self.sg.create( "Note", {"content": "Test!", + note = self.sg.create( "Note", {"content": "Test!", "project": self.project, "addressings_to": [self.human_user]}) - + # get thread result = self.sg.note_thread_read(note["id"], additional_fields) - + self.assertEqual(len(result), 1) self._check_note(result[0], note["id"], additional_fields["Note"]) - + # now add a reply reply = self.sg.create( "Reply", {"content": "Reply Content", "entity": note}) - + # get thread - result = self.sg.note_thread_read(note["id"], additional_fields) + result = self.sg.note_thread_read(note["id"], additional_fields) self.assertEqual(len(result), 2) self._check_note(result[0], note["id"], additional_fields["Note"]) self._check_reply(result[1], reply["id"], additional_fields["Reply"]) - + # now upload an attachment attachment_id = self.sg.upload(note["type"], note["id"], self._thumbnail_path) # get thread - result = self.sg.note_thread_read(note["id"], additional_fields) + result = self.sg.note_thread_read(note["id"], additional_fields) self.assertEqual(len(result), 3) self._check_note(result[0], note["id"], additional_fields["Note"]) self._check_reply(result[1], reply["id"], additional_fields["Reply"]) - + self._check_attachment(result[2], attachment_id, additional_fields["Attachment"]) - + class TestTextSearch(base.LiveTestBase): """ Unit tests for the text_search() method """ - + def setUp(self): super(TestTextSearch, self).setUp() - + # create 5 shots and 5 assets to search for - self._prefix = uuid.uuid4().hex - + self._prefix = uuid.uuid4().hex + batch_data = [] for i in range(5): data = { "code":"%s Text Search %s" % (self._prefix, i), "project": self.project } batch_data.append( {"request_type": "create", "entity_type": "Shot", - "data": data} ) + "data": data} ) batch_data.append( {"request_type": "create", "entity_type": "Asset", - "data": data} ) + "data": data} ) data = self.sg.batch(batch_data) - + self._shot_ids = [x["id"] for x in data if x["type"] == "Shot"] self._asset_ids = [x["id"] for x in data if x["type"] == "Asset"] - + def tearDown(self): - + # clean up batch_data = [] for shot_id in self._shot_ids: batch_data.append({"request_type": "delete", "entity_type": "Shot", "entity_id": shot_id - }) + }) for asset_id in self._asset_ids: batch_data.append({"request_type": "delete", "entity_type": "Asset", "entity_id": asset_id }) - self.sg.batch(batch_data) - - super(TestTextSearch, self).tearDown() - + self.sg.batch(batch_data) + + super(TestTextSearch, self).tearDown() + def test_simple(self): """ Test basic global search """ if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - + result = self.sg.text_search("%s Text Search" % self._prefix, {"Shot" : [] } ) - + self.assertEqual(set(["matches", "terms"]), set(result.keys())) self.assertEqual(result["terms"], [self._prefix, "text", "search"]) matches = result["matches"] self.assertEqual(len(matches), 5) - + for match in matches: self.assertTrue(match["id"] in self._shot_ids) self.assertEqual(match["type"], "Shot") self.assertEqual(match["project_id"], self.project["id"]) self.assertEqual(match["image"], None) - + def test_limit(self): """ Test limited global search """ if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.text_search("%s Text Search" % self._prefix, {"Shot" : [] }, limit=3 ) + + result = self.sg.text_search("%s Text Search" % self._prefix, {"Shot" : [] }, limit=3 ) matches = result["matches"] self.assertEqual(len(matches), 3) - + def test_entity_filter(self): """ Test basic multi-type global search """ if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.text_search("%s Text Search" % self._prefix, + + result = self.sg.text_search("%s Text Search" % self._prefix, {"Shot": [], "Asset": [] } ) - + matches = result["matches"] - + self.assertEqual(set(["matches", "terms"]), set(result.keys())) self.assertEqual(len(matches), 10) - - + + def test_complex_entity_filter(self): """ Test complex multi-type global search """ if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return - - result = self.sg.text_search("%s Text Search" % self._prefix, - {"Shot": [["code", "ends_with", "3"]], + + result = self.sg.text_search("%s Text Search" % self._prefix, + {"Shot": [["code", "ends_with", "3"]], "Asset": [ - {"filter_operator": "any", + {"filter_operator": "any", "filters": [["code", "ends_with", "4"]] } ] }) - + matches = result["matches"] - + self.assertEqual(set(["matches", "terms"]), set(result.keys())) self.assertEqual(len(matches), 2) - + self.assertEqual(matches[0]["type"], "Shot") self.assertEqual(matches[0]["name"], "%s Text Search 3" % self._prefix) self.assertEqual(matches[1]["type"], "Asset") self.assertEqual(matches[1]["name"], "%s Text Search 4" % self._prefix) - - +class TestReadAdditionalFilterPresets(base.LiveTestBase): + """ + Unit tests for the additional_filter_presets read parameter + """ + + def test_simple_case(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = [{"preset_name": "LATEST", "latest_by": "ENTITIES_CREATED_AT"}] + + versions = self.sg.find("Version", filters, fields=fields, additional_filter_presets=additional_filters) + version = versions[0] + self.assertEqual("Version", version["type"]) + self.assertEqual(self.version["id"], version["id"]) + + + def test_find_one(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = [{"preset_name": "LATEST", "latest_by": "ENTITIES_CREATED_AT"}] + + version = self.sg.find_one("Version", filters, fields=fields, additional_filter_presets=additional_filters) + self.assertEqual("Version", version["type"]) + self.assertEqual(self.version["id"], version["id"]) + + def test_filter_with_no_name(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = [{}] + + self.assertRaises(shotgun_api3.Fault, + self.sg.find, + "Version", filters, fields=fields, additional_filter_presets=additional_filters) + + def test_invalid_filter(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = [{"preset_name" : "BAD_FILTER"}] + + self.assertRaises(shotgun_api3.Fault, + self.sg.find, + "Version", filters, fields=fields, additional_filter_presets=additional_filters) + + def test_filter_not_iterable(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = 3 + + self.assertRaises(shotgun_api3.Fault, + self.sg.find, + "Version", filters, fields=fields, additional_filter_presets=additional_filters) + + + def test_filter_not_list_of_iterable(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = [3] -def _has_unicode(data): + self.assertRaises(shotgun_api3.Fault, + self.sg.find, + "Version", filters, fields=fields, additional_filter_presets=additional_filters) + + + def test_multiple_latest_filters(self): + if self.sg_version < (7, 0, 0): + warnings.warn("Test bypassed because SG server used does not support this feature.", FutureWarning) + return + + filters = [ + ["project", "is", self.project], + ["id", "is", self.version["id"]] + ] + + fields = ["id"] + + additional_filters = ({"preset_name": "LATEST", "latest_by": "ENTITY_CREATED_AT"}, + {"preset_name": "LATEST", "latest_by": "PIPELINE_STEP_NUMBER_AND_ENTITIES_CREATED_AT"}) + + self.assertRaises(shotgun_api3.Fault, + self.sg.find, + "Version", filters, fields=fields, additional_filter_presets=additional_filters) + + +def _has_unicode(data): for k, v in data.items(): - if (isinstance(k, unicode)): + if isinstance(k, unicode): return True - if (isinstance(v, unicode)): + if isinstance(v, unicode): return True return False + def _get_path(url): """Returns path component of a url without the sheme, host, query, anchor, or any other additional elements.