diff --git a/azure-pipelines-templates/type_checking.yml b/azure-pipelines-templates/type_checking.yml new file mode 100644 index 00000000..dbabd657 --- /dev/null +++ b/azure-pipelines-templates/type_checking.yml @@ -0,0 +1,48 @@ +# Copyright (c) 2025, Shotgun Software Inc. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# - Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# - Neither the name of the Shotgun Software Inc nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +jobs: +- job: type_checking + displayName: Type Checking (beta) + pool: + vmImage: 'ubuntu-latest' + + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: 3.9 + addToPath: True + architecture: 'x64' + + - script: | + pip install --upgrade pip setuptools wheel + pip install --upgrade mypy + displayName: Install dependencies + + # Placeholder to future static type checking. For now we just run mypy and skip all known errors. + - bash: mypy shotgun_api3/shotgun.py --follow-imports skip --pretty --no-strict-optional --disable-error-code arg-type --disable-error-code assignment --disable-error-code return --disable-error-code return-value --disable-error-code attr-defined + displayName: Run type checking diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 52e6cfa9..0e465bf2 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -52,6 +52,7 @@ pr: # Jobs run in parallel. jobs: - template: azure-pipelines-templates/code_style_validation.yml +- template: azure-pipelines-templates/type_checking.yml # These are jobs templates, they allow to reduce the redundancy between # variations of the same build. We pass in the image name diff --git a/shotgun_api3/shotgun.py b/shotgun_api3/shotgun.py index 2a1108aa..39167971 100644 --- a/shotgun_api3/shotgun.py +++ b/shotgun_api3/shotgun.py @@ -29,6 +29,8 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ +from __future__ import annotations # Requried for 3.7 + import base64 import copy import datetime @@ -53,10 +55,11 @@ from typing import ( Any, BinaryIO, + Dict, Iterable, - Literal, + List, Optional, - TypedDict, + Tuple, TypeVar, Union, ) @@ -100,21 +103,25 @@ T = TypeVar("T") +if sys.version_info < (3, 9): + OrderItem = Dict + GroupingItem = Dict + BaseEntity = Dict +else: + from typing import TypedDict -class OrderItem(TypedDict): - field_name: str - direction: str - - -class GroupingItem(TypedDict): - field: str - type: str - direction: str + class OrderItem(TypedDict): + field_name: str + direction: str + class GroupingItem(TypedDict): + field: str + type: str + direction: str -class BaseEntity(TypedDict, total=False): - id: int - type: str + class BaseEntity(TypedDict, total=False): + id: int + type: str # ---------------------------------------------------------------------------- @@ -202,7 +209,7 @@ class ServerCapabilities(object): the future. Therefore, usage of this class is discouraged. """ - def __init__(self, host: str, meta: dict[str, Any]) -> None: + def __init__(self, host: str, meta: Dict[str, Any]) -> None: """ ServerCapabilities.__init__ @@ -249,7 +256,7 @@ def _ensure_python_version_supported(self) -> None: if sys.version_info < (3, 7): raise ShotgunError("This module requires Python version 3.7 or higher.") - def _ensure_support(self, feature: dict[str, Any], raise_hell: bool = True) -> bool: + def _ensure_support(self, feature: Dict[str, Any], raise_hell: bool = True) -> bool: """ Checks the server version supports a given feature, raises an exception if it does not. @@ -421,7 +428,7 @@ def __init__(self, sg: "Shotgun"): self.auth_token: Optional[str] = None self.sudo_as_login: Optional[str] = None # Authentication parameters to be folded into final auth_params dict - self.extra_auth_params: Optional[dict[str, Any]] = None + self.extra_auth_params: Optional[Dict[str, Any]] = None # uuid as a string self.session_uuid: Optional[str] = None self.scheme: Optional[str] = None @@ -738,7 +745,7 @@ def __init__( self.config.user_password = None self.config.auth_token = None - def _split_url(self, base_url: str) -> tuple[Optional[str], Optional[str]]: + def _split_url(self, base_url: str) -> Tuple[Optional[str], Optional[str]]: """ Extract the hostname:port and username/password/token from base_url sent when connect to the API. @@ -770,7 +777,7 @@ def _split_url(self, base_url: str) -> tuple[Optional[str], Optional[str]]: # API Functions @property - def server_info(self) -> dict[str, Any]: + def server_info(self) -> Dict[str, Any]: """ Property containing server information. @@ -823,7 +830,7 @@ def close(self) -> None: self._close_connection() return - def info(self) -> dict[str, Any]: + def info(self) -> Dict[str, Any]: """ Get API-related metadata from the Shotgun server. @@ -857,13 +864,13 @@ def info(self) -> dict[str, Any]: def find_one( self, entity_type: str, - filters: Union[list, tuple, dict[str, Any]], - fields: Optional[list[str]] = None, - order: Optional[list[OrderItem]] = None, - filter_operator: Optional[Literal["all", "any"]] = None, + filters: Union[List, Tuple, Dict[str, Any]], + fields: Optional[List[str]] = None, + order: Optional[List[OrderItem]] = None, + filter_operator: Optional[str] = None, retired_only: bool = False, include_archived_projects: bool = True, - additional_filter_presets: Optional[list[dict[str, Any]]] = None, + additional_filter_presets: Optional[List[Dict[str, Any]]] = None, ) -> Optional[BaseEntity]: """ Shortcut for :meth:`~shotgun_api3.Shotgun.find` with ``limit=1`` so it returns a single @@ -937,16 +944,16 @@ def find_one( def find( self, entity_type: str, - filters: Union[list, tuple, dict[str, Any]], - fields: Optional[list[str]] = None, - order: Optional[list[OrderItem]] = None, - filter_operator: Optional[Literal["all", "any"]] = None, + filters: Union[List, Tuple, Dict[str, Any]], + fields: Optional[List[str]] = None, + order: Optional[List[OrderItem]] = None, + filter_operator: Optional[str] = None, limit: int = 0, retired_only: bool = False, page: int = 0, include_archived_projects: bool = True, - additional_filter_presets: Optional[list[dict[str, Any]]] = None, - ) -> list[BaseEntity]: + additional_filter_presets: Optional[List[Dict[str, Any]]] = None, + ) -> List[BaseEntity]: """ Find entities matching the given filters. @@ -1136,14 +1143,14 @@ def find( def _construct_read_parameters( self, entity_type: str, - fields: Optional[list[str]], - filters: dict[str, Any], + fields: Optional[List[str]], + filters: Dict[str, Any], retired_only: bool, - order: Optional[list[dict[str, Any]]], + order: Optional[List[Dict[str, Any]]], include_archived_projects: bool, - additional_filter_presets: Optional[list[dict[str, Any]]], - ) -> dict[str, Any]: - params: dict[str, Any] = {} + additional_filter_presets: Optional[List[Dict[str, Any]]], + ) -> Dict[str, Any]: + params: Dict[str, Any] = {} params["type"] = entity_type params["return_fields"] = fields or ["id"] params["filters"] = filters @@ -1174,8 +1181,8 @@ def _construct_read_parameters( return params def _add_project_param( - self, params: dict[str, Any], project_entity - ) -> dict[str, Any]: + self, params: Dict[str, Any], project_entity + ) -> Dict[str, Any]: if project_entity and self.server_caps.ensure_per_project_customization(): params["project"] = project_entity @@ -1186,9 +1193,9 @@ def _translate_update_params( self, entity_type: str, entity_id: int, - data: dict, - multi_entity_update_modes: Optional[dict], - ) -> dict[str, Any]: + data: Dict, + multi_entity_update_modes: Optional[Dict], + ) -> Dict[str, Any]: global SHOTGUN_API_DISABLE_ENTITY_OPTIMIZATION def optimize_field(field_dict): @@ -1211,12 +1218,12 @@ def optimize_field(field_dict): def summarize( self, entity_type: str, - filters: Union[list, dict[str, Any]], - summary_fields: list[dict[str, str]], + filters: Union[List, Dict[str, Any]], + summary_fields: List[Dict[str, str]], filter_operator: Optional[str] = None, - grouping: Optional[list[GroupingItem]] = None, + grouping: Optional[List[GroupingItem]] = None, include_archived_projects: bool = True, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Summarize field data returned by a query. @@ -1419,9 +1426,9 @@ def summarize( def create( self, entity_type: str, - data: dict[str, Any], - return_fields: Optional[list] = None, - ) -> dict[str, Any]: + data: Dict[str, Any], + return_fields: Optional[List[str]] = None, + ) -> Dict[str, Any]: """ Create a new entity of the specified ``entity_type``. @@ -1508,8 +1515,8 @@ def update( self, entity_type: str, entity_id: int, - data: dict[str, Any], - multi_entity_update_modes: Optional[dict[str, Any]] = None, + data: Dict[str, Any], + multi_entity_update_modes: Optional[Dict[str, Any]] = None, ) -> BaseEntity: """ Update the specified entity with the supplied data. @@ -1631,7 +1638,7 @@ def revive(self, entity_type: str, entity_id: int) -> bool: return self._call_rpc("revive", params) - def batch(self, requests: list[dict[str, Any]]) -> list[dict[str, Any]]: + def batch(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Make a batch request of several :meth:`~shotgun_api3.Shotgun.create`, :meth:`~shotgun_api3.Shotgun.update`, and :meth:`~shotgun_api3.Shotgun.delete` calls. @@ -1750,9 +1757,9 @@ def work_schedule_read( self, start_date: str, end_date: str, - project: Optional[dict[str, Any]] = None, - user: Optional[dict[str, Any]] = None, - ) -> dict[str, Any]: + project: Optional[Dict[str, Any]] = None, + user: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: """ Return the work day rules for a given date range. @@ -1826,10 +1833,10 @@ def work_schedule_update( date: str, working: bool, description: Optional[str] = None, - project: Optional[dict[str, Any]] = None, - user: Optional[dict[str, Any]] = None, + project: Optional[Dict[str, Any]] = None, + user: Optional[Dict[str, Any]] = None, recalculate_field: Optional[str] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Update the work schedule for a given date. @@ -1883,7 +1890,7 @@ def work_schedule_update( return self._call_rpc("work_schedule_update", params) - def follow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any]: + def follow(self, user: Dict[str, Any], entity: Dict[str, Any]) -> Dict[str, Any]: """ Add the entity to the user's followed entities. @@ -1911,7 +1918,7 @@ def follow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any] return self._call_rpc("follow", params) - def unfollow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, Any]: + def unfollow(self, user: Dict[str, Any], entity: Dict[str, Any]) -> Dict[str, Any]: """ Remove entity from the user's followed entities. @@ -1938,7 +1945,7 @@ def unfollow(self, user: dict[str, Any], entity: dict[str, Any]) -> dict[str, An return self._call_rpc("unfollow", params) - def followers(self, entity: dict[str, Any]) -> list[dict[str, Any]]: + def followers(self, entity: Dict[str, Any]) -> List[Dict[str, Any]]: """ Return all followers for an entity. @@ -1968,10 +1975,10 @@ def followers(self, entity: dict[str, Any]) -> list[dict[str, Any]]: def following( self, - user: dict[str, Any], - project: Optional[dict[str, Any]] = None, + user: Dict[str, Any], + project: Optional[Dict[str, Any]] = None, entity_type: Optional[str] = None, - ) -> list[BaseEntity]: + ) -> List[BaseEntity]: """ Return all entity instances a user is following. @@ -2004,7 +2011,7 @@ def following( def schema_entity_read( self, project_entity: Optional[BaseEntity] = None - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Return all active entity types, their display names, and their visibility. @@ -2039,7 +2046,7 @@ def schema_entity_read( The returned display names for this method will be localized when the ``localize`` Shotgun config property is set to ``True``. See :ref:`localization` for more information. """ - params: dict[str, Any] = {} + params: Dict[str, Any] = {} params = self._add_project_param(params, project_entity) @@ -2050,7 +2057,7 @@ def schema_entity_read( def schema_read( self, project_entity: Optional[BaseEntity] = None - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Get the schema for all fields on all entities. @@ -2113,7 +2120,7 @@ def schema_read( The returned display names for this method will be localized when the ``localize`` Shotgun config property is set to ``True``. See :ref:`localization` for more information. """ - params: dict[str, Any] = {} + params: Dict[str, Any] = {} params = self._add_project_param(params, project_entity) @@ -2127,7 +2134,7 @@ def schema_field_read( entity_type: str, field_name: Optional[str] = None, project_entity: Optional[BaseEntity] = None, - ) -> dict[str, dict[str, Any]]: + ) -> Dict[str, Dict[str, Any]]: """ Get schema for all fields on the specified entity type or just the field name specified if provided. @@ -2196,7 +2203,7 @@ def schema_field_create( entity_type: str, data_type: str, display_name: str, - properties: Optional[dict[str, Any]] = None, + properties: Optional[Dict[str, Any]] = None, ) -> str: """ Create a field for the specified entity type. @@ -2238,7 +2245,7 @@ def schema_field_update( self, entity_type: str, field_name: str, - properties: dict[str, Any], + properties: Dict[str, Any], project_entity: Optional[BaseEntity] = None, ) -> bool: """ @@ -2348,7 +2355,7 @@ def set_session_uuid(self, session_uuid: str) -> None: def share_thumbnail( self, - entities: list[dict[str, Any]], + entities: List[Dict[str, Any]], thumbnail_path: Optional[str] = None, source_entity: Optional[BaseEntity] = None, filmstrip_thumbnail: bool = False, @@ -2838,7 +2845,7 @@ def _upload_to_sg( def _get_attachment_upload_info( self, is_thumbnail: bool, filename: str, is_multipart_upload: bool - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Internal function to get the information needed to upload a file to Cloud storage. @@ -2887,7 +2894,7 @@ def _get_attachment_upload_info( def download_attachment( self, - attachment: Union[dict[str, Any], Literal[False]] = False, + attachment: Union[Dict[str, Any], bool] = False, file_path: Optional[str] = None, attachment_id: Optional[int] = None, ) -> Union[str, bytes, None]: @@ -3100,7 +3107,7 @@ def get_attachment_download_url( def authenticate_human_user( self, user_login: str, user_password: str, auth_token: Optional[str] = None - ) -> Union[dict[str, Any], None]: + ) -> Union[Dict[str, Any], None]: """ Authenticate Shotgun HumanUser. @@ -3160,7 +3167,7 @@ def authenticate_human_user( raise def update_project_last_accessed( - self, project: dict[str, Any], user: Optional[dict[str, Any]] = None + self, project: Dict[str, Any], user: Optional[Dict[str, Any]] = None ) -> None: """ Update a Project's ``last_accessed_by_current_user`` field to the current timestamp. @@ -3208,8 +3215,8 @@ def update_project_last_accessed( self._parse_records(record)[0] def note_thread_read( - self, note_id: int, entity_fields: Optional[dict[str, Any]] = None - ) -> list[dict[str, Any]]: + self, note_id: int, entity_fields: Optional[Dict[str, Any]] = None + ) -> List[Dict[str, Any]]: """ Return the full conversation for a given note, including Replies and Attachments. @@ -3287,10 +3294,10 @@ def note_thread_read( def text_search( self, text: str, - entity_types: dict[str, Any], - project_ids: Optional[list] = None, + entity_types: Dict[str, Any], + project_ids: Optional[List] = None, limit: Optional[int] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Search across the specified entity types for the given text. @@ -3386,11 +3393,11 @@ def activity_stream_read( self, entity_type: str, entity_id: int, - entity_fields: Optional[dict[str, Any]] = None, + entity_fields: Optional[Dict[str, Any]] = None, min_id: Optional[int] = None, max_id: Optional[int] = None, limit: Optional[int] = None, - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Retrieve activity stream data from Shotgun. @@ -3524,8 +3531,8 @@ def nav_search_string( def nav_search_entity( self, root_path: str, - entity: dict[str, Any], - seed_entity_field: Optional[dict[str, Any]] = None, + entity: Dict[str, Any], + seed_entity_field: Optional[Dict[str, Any]] = None, ): """ Search function adapted to work with the navigation hierarchy. @@ -3570,7 +3577,7 @@ def get_session_token(self) -> str: return session_token - def preferences_read(self, prefs: Optional[list] = None) -> dict[str, Any]: + def preferences_read(self, prefs: Optional[List] = None) -> Dict[str, Any]: """ Get a subset of the site preferences. @@ -3593,7 +3600,7 @@ def preferences_read(self, prefs: Optional[list] = None) -> dict[str, Any]: return self._call_rpc("preferences_read", {"prefs": prefs}) - def user_subscriptions_read(self) -> list: + def user_subscriptions_read(self) -> List: """ Get the list of user subscriptions. @@ -3606,7 +3613,7 @@ def user_subscriptions_read(self) -> list: return self._call_rpc("user_subscriptions_read", None) def user_subscriptions_create( - self, users: list[dict[str, Union[str, list[str], None]]] + self, users: List[Dict[str, Union[str, List[str], None]]] ) -> bool: """ Assign subscriptions to users. @@ -3799,7 +3806,7 @@ def _call_rpc( return results[0] return results - def _auth_params(self) -> dict[str, Any]: + def _auth_params(self) -> Dict[str, Any]: """ Return a dictionary of the authentication parameters being used. """ @@ -3854,7 +3861,7 @@ def _auth_params(self) -> dict[str, Any]: return auth_params - def _sanitize_auth_params(self, params: dict[str, Any]) -> dict[str, Any]: + def _sanitize_auth_params(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Given an authentication parameter dictionary, sanitize any sensitive information and return the sanitized dict copy. @@ -3867,7 +3874,7 @@ def _sanitize_auth_params(self, params: dict[str, Any]) -> dict[str, Any]: def _build_payload( self, method: str, params, include_auth_params: bool = True - ) -> dict[str, Any]: + ) -> Dict[str, Any]: """ Build the payload to be send to the rpc endpoint. """ @@ -3897,8 +3904,8 @@ def _encode_payload(self, payload) -> bytes: return json.dumps(payload, ensure_ascii=False).encode("utf-8") def _make_call( - self, verb: str, path: str, body, headers: Optional[dict[str, Any]] - ) -> tuple[tuple[int, str], dict[str, Any], str]: + self, verb: str, path: str, body, headers: Optional[Dict[str, Any]] + ) -> Tuple[Tuple[int, str], Dict[str, Any], str]: """ Make an HTTP call to the server. @@ -3949,8 +3956,8 @@ def _make_call( time.sleep(rpc_attempt_interval) def _http_request( - self, verb: str, path: str, body, headers: dict[str, Any] - ) -> tuple[tuple[int, str], dict[str, Any], str]: + self, verb: str, path: str, body, headers: Dict[str, Any] + ) -> Tuple[Tuple[int, str], Dict[str, Any], str]: """ Make the actual HTTP request. """ @@ -3988,7 +3995,7 @@ def _make_upload_request( raise return result - def _parse_http_status(self, status: tuple) -> None: + def _parse_http_status(self, status: Tuple) -> None: """ Parse the status returned from the http request. @@ -4007,8 +4014,8 @@ def _parse_http_status(self, status: tuple) -> None: return def _decode_response( - self, headers: dict[str, Any], body: str - ) -> Union[str, dict[str, Any]]: + self, headers: Dict[str, Any], body: str + ) -> Union[str, Dict[str, Any]]: """ Decode the response from the server from the wire format to a python data structure. @@ -4229,7 +4236,7 @@ def _close_connection(self) -> None: # ======================================================================== # Utility - def _parse_records(self, records: list) -> list: + def _parse_records(self, records: List) -> List: """ Parse 'records' returned from the api to do local modifications: @@ -4334,11 +4341,11 @@ def _build_thumb_url(self, entity_type: str, entity_id: int) -> str: def _dict_to_list( self, - d: Optional[dict[str, Any]], + d: Optional[Dict[str, Any]], key_name: str = "field_name", value_name: str = "value", extra_data=None, - ) -> list[dict[str, Any]]: + ) -> List[Dict[str, Any]]: """ Utility function to convert a dict into a list dicts using the key_name and value_name keys. @@ -4355,7 +4362,9 @@ def _dict_to_list( ret.append(d) return ret - def _dict_to_extra_data(self, d: Optional[dict], key_name="value") -> dict: + def _dict_to_extra_data( + self, d: Optional[Dict[str, Any]], key_name="value" + ) -> Dict[str, Any]: """ Utility function to convert a dict into a dict compatible with the extra_data arg of _dict_to_list. @@ -4385,7 +4394,7 @@ def _upload_file_to_storage(self, path: str, storage_url: str) -> None: LOG.debug("File uploaded to Cloud storage: %s", filename) def _multipart_upload_file_to_storage( - self, path: str, upload_info: dict[str, Any] + self, path: str, upload_info: Dict[str, Any] ) -> None: """ Internal function to upload a file to the Cloud storage in multiple parts. @@ -4429,7 +4438,7 @@ def _multipart_upload_file_to_storage( LOG.debug("File uploaded in multiple parts to Cloud storage: %s", path) def _get_upload_part_link( - self, upload_info: dict[str, Any], filename: str, part_number: int + self, upload_info: Dict[str, Any], filename: str, part_number: int ) -> str: """ Internal function to get the url to upload the next part of a file to the @@ -4528,7 +4537,7 @@ def _upload_data_to_storage( return etag def _complete_multipart_upload( - self, upload_info: dict[str, Any], filename: str, etags: Iterable[str] + self, upload_info: Dict[str, Any], filename: str, etags: Iterable[str] ) -> None: """ Internal function to complete a multi-part upload to the Cloud storage. @@ -4605,7 +4614,7 @@ def _requires_direct_s3_upload( else: return False - def _send_form(self, url: str, params: dict[str, Any]) -> str: + def _send_form(self, url: str, params: Dict[str, Any]) -> str: """ Utility function to send a Form to Shotgun and process any HTTP errors that could occur. @@ -4737,7 +4746,7 @@ def https_request(self, request): return self.http_request(request) -def _translate_filters(filters: Union[list, tuple], filter_operator) -> dict[str, Any]: +def _translate_filters(filters: Union[List, Tuple], filter_operator) -> Dict[str, Any]: """ Translate filters params into data structure expected by rpc call. """ @@ -4746,7 +4755,7 @@ def _translate_filters(filters: Union[list, tuple], filter_operator) -> dict[str return _translate_filters_dict(wrapped_filters) -def _translate_filters_dict(sg_filter: dict[str, Any]) -> dict[str, Any]: +def _translate_filters_dict(sg_filter: Dict[str, Any]) -> Dict[str, Any]: new_filters = {} filter_operator = sg_filter.get("filter_operator") @@ -4814,8 +4823,8 @@ def _version_str(version) -> str: def _optimize_filter_field( - field_value: Union[dict, list], recursive: bool = True -) -> Union[dict, list]: + field_value: Union[Dict[str, Any], List], recursive: bool = True +) -> Union[Dict, List]: """ For an FPT entity, returns a new dictionary with only the type, id, and other allowed keys.