diff --git a/slack/rtm/client.py b/slack/rtm/client.py index 8fef13671..900c7a0b0 100644 --- a/slack/rtm/client.py +++ b/slack/rtm/client.py @@ -5,9 +5,9 @@ import logging import random import collections -import concurrent import inspect import signal +from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, Callable, DefaultDict from ssl import SSLContext from threading import current_thread, main_thread @@ -107,6 +107,8 @@ def __init__( *, token: str, run_async: Optional[bool] = False, + # will be used only when run_async=False + run_sync_thread_pool_size: int = 3, auto_reconnect: Optional[bool] = True, ssl: Optional[SSLContext] = None, proxy: Optional[str] = None, @@ -119,6 +121,9 @@ def __init__( ): self.token = token.strip() self.run_async = run_async + self.thread_pool_executor = ThreadPoolExecutor( + max_workers=run_sync_thread_pool_size + ) self.auto_reconnect = auto_reconnect self.ssl = ssl self.proxy = proxy @@ -135,6 +140,16 @@ def __init__( self._last_message_id = 0 self._connection_attempts = 0 self._stopped = False + self._web_client = WebClient( + token=self.token, + base_url=self.base_url, + ssl=self.ssl, + proxy=self.proxy, + run_async=self.run_async, + loop=self._event_loop, + session=self._session, + headers=self.headers, + ) @staticmethod def run_on(*, event: str): @@ -195,8 +210,8 @@ def start(self) -> asyncio.Future: if self.run_async: return future - - return self._event_loop.run_until_complete(future) + else: + return self._event_loop.run_until_complete(future) def stop(self): """Closes the websocket connection and ensures it won't reconnect.""" @@ -351,7 +366,6 @@ async def _connect_and_read(self): client_err.SlackApiError, # TODO: Catch websocket exceptions thrown by aiohttp. ) as exception: - self._logger.debug(str(exception)) await self._dispatch_event(event="error", data=exception) if self.auto_reconnect and not self._stopped: await self._wait_exponentially(exception) @@ -433,12 +447,14 @@ async def _dispatch_event(self, event, data=None): # close/error callbacks. break - if inspect.iscoroutinefunction(callback): + if self.run_async or inspect.iscoroutinefunction(callback): await callback( rtm_client=self, web_client=self._web_client, data=data ) else: - self._execute_in_thread(callback, data) + await self._execute_in_thread( + callback=callback, web_client=self._web_client, data=data + ) except Exception as err: name = callback.__name__ module = callback.__module__ @@ -446,24 +462,12 @@ async def _dispatch_event(self, event, data=None): self._logger.error(msg) raise - def _execute_in_thread(self, callback, data): + async def _execute_in_thread(self, callback, web_client, data): """Execute the callback in another thread. Wait for and return the results.""" - web_client = WebClient( - token=self.token, - base_url=self.base_url, - ssl=self.ssl, - proxy=self.proxy, - headers=self.headers, + future = self.thread_pool_executor.submit( + callback, rtm_client=self, web_client=web_client, data=data ) - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit( - callback, rtm_client=self, web_client=web_client, data=data - ) - - while future.running(): - pass - - future.result() + return future.result() async def _retrieve_websocket_info(self): """Retrieves the WebSocket info from Slack. @@ -498,10 +502,18 @@ async def _retrieve_websocket_info(self): headers=self.headers, ) self._logger.debug("Retrieving websocket info.") - if self.connect_method in ["rtm.start", "rtm_start"]: - resp = await self._web_client.rtm_start() + use_rtm_start = self.connect_method in ["rtm.start", "rtm_start"] + if self.run_async: + if use_rtm_start: + resp = await self._web_client.rtm_start() + else: + resp = await self._web_client.rtm_connect() else: - resp = await self._web_client.rtm_connect() + if use_rtm_start: + resp = self._web_client.rtm_start() + else: + resp = self._web_client.rtm_connect() + url = resp.get("url") if url is None: msg = "Unable to retrieve RTM URL from Slack." @@ -513,7 +525,7 @@ async def _wait_exponentially(self, exception, max_wait_time=300): Calculate the number of seconds to wait and then add a random number of milliseconds to avoid coincidental - synchronized client retries. Wait up to the maximium amount + synchronized client retries. Wait up to the maximum amount of wait time specified via 'max_wait_time'. However, if Slack returned how long to wait use that. """ @@ -521,7 +533,16 @@ async def _wait_exponentially(self, exception, max_wait_time=300): (2 ** self._connection_attempts) + random.random(), max_wait_time ) try: - wait_time = exception.response["headers"]["Retry-After"] + headers = ( + exception.response["headers"] + if "headers" in exception.response + else None + ) + if headers and "Retry-After" in headers: + wait_time = headers["Retry-After"] + else: + # an error returned due to other unrecoverable reasons + raise exception except (KeyError, AttributeError): pass self._logger.debug("Waiting %s seconds before reconnecting.", wait_time) diff --git a/slack/web/__init__.py b/slack/web/__init__.py index e69de29bb..02dc7b0ac 100644 --- a/slack/web/__init__.py +++ b/slack/web/__init__.py @@ -0,0 +1,20 @@ +import platform +import sys + +import slack.version as ver + + +def get_user_agent(): + """Construct the user-agent header with the package info, + Python version and OS version. + + Returns: + The user agent string. + e.g. 'Python/3.6.7 slackclient/2.0.0 Darwin/17.7.0' + """ + # __name__ returns all classes, we only want the client + client = "{0}/{1}".format("slackclient", ver.__version__) + python_version = "Python/{v.major}.{v.minor}.{v.micro}".format(v=sys.version_info) + system_info = "{0}/{1}".format(platform.system(), platform.release()) + user_agent_string = " ".join([python_version, client, system_info]) + return user_agent_string diff --git a/slack/web/base_client.py b/slack/web/base_client.py index 748407951..1e7f1e831 100644 --- a/slack/web/base_client.py +++ b/slack/web/base_client.py @@ -1,9 +1,8 @@ """A Python module for interacting with Slack's Web API.""" # Standard Imports +import json from urllib.parse import urljoin -import platform -import sys import logging import asyncio from typing import Optional, Union @@ -15,9 +14,10 @@ from aiohttp import FormData, BasicAuth # Internal Imports +from slack.web import get_user_agent from slack.web.slack_response import SlackResponse -import slack.version as ver import slack.errors as err +from slack.web.urllib_client import UrllibWebClient class BaseClient: @@ -32,6 +32,7 @@ def __init__( ssl=None, proxy=None, run_async=False, + use_sync_aiohttp=False, session=None, headers: Optional[dict] = None, ): @@ -41,11 +42,16 @@ def __init__( self.ssl = ssl self.proxy = proxy self.run_async = run_async + self.use_sync_aiohttp = use_sync_aiohttp self.session = session self.headers = headers or {} self._logger = logging.getLogger(__name__) self._event_loop = loop + self.urllib_client = UrllibWebClient( + token=self.token, default_headers=self.headers, web_client=self, + ) + def _get_event_loop(self): """Retrieves the event loop or creates a new one.""" try: @@ -58,7 +64,7 @@ def _get_event_loop(self): def _get_headers( self, has_json: bool, has_files: bool, request_specific_headers: Optional[dict] ): - """Contructs the headers need for a request. + """Constructs the headers need for a request. Args: has_json (bool): Whether or not the request has json. has_files (bool): Whether or not the request has files. @@ -73,7 +79,7 @@ def _get_headers( } """ final_headers = { - "User-Agent": self._get_user_agent(), + "User-Agent": get_user_agent(), "Content-Type": "application/x-www-form-urlencoded;charset=utf-8", } @@ -115,7 +121,7 @@ def api_call( e.g. 'chat.postMessage' http_verb (str): HTTP Verb. e.g. 'POST' files (dict): Files to multipart upload. - e.g. {imageORfile: file_objectORfile_path} + e.g. {image OR file: file_object OR file_path} data: The body to attach to the request. If a dictionary is provided, form-encoding will take place. e.g. {'key1': 'value1', 'key2': 'value2'} @@ -160,18 +166,21 @@ def api_call( "auth": auth, } - if self._event_loop is None: - self._event_loop = self._get_event_loop() - - future = asyncio.ensure_future( - self._send(http_verb=http_verb, api_url=api_url, req_args=req_args), - loop=self._event_loop, - ) - - if self.run_async: - return future + if self.run_async or self.use_sync_aiohttp: + if self._event_loop is None: + self._event_loop = self._get_event_loop() - return self._event_loop.run_until_complete(future) + future = asyncio.ensure_future( + self._send(http_verb=http_verb, api_url=api_url, req_args=req_args), + loop=self._event_loop, + ) + if self.run_async: + return future + elif self.use_sync_aiohttp: + # Using this is no longer recommended - just keep this for backward-compatibility + return self._event_loop.run_until_complete(future) + else: + return self._sync_send(api_url=api_url, req_args=req_args) def _get_url(self, api_method): """Joins the base Slack URL and an API method to form an absolute URL. @@ -225,6 +234,7 @@ async def _send(self, http_verb, api_url, req_args): "http_verb": http_verb, "api_url": api_url, "req_args": req_args, + "use_sync_aiohttp": self.use_sync_aiohttp, } return SlackResponse(**{**data, **res}).validate() @@ -258,23 +268,38 @@ async def _request(self, *, http_verb, api_url, req_args): await session.close() return response - @staticmethod - def _get_user_agent(): - """Construct the user-agent header with the package info, - Python version and OS version. + def _sync_send(self, api_url, req_args): + params = req_args["params"] if "params" in req_args else None + data = req_args["data"] if "data" in req_args else None + files = req_args["files"] if "files" in req_args else None + json = req_args["json"] if "files" in req_args else None + headers = req_args["headers"] if "headers" in req_args else None + token = params.get("token") if params and "token" in params else None + body_params = {} + if params: + body_params.update(params) + if data: + body_params.update(data) + + return self.urllib_client.api_call( + token=token, + url=api_url, + query_params={}, + body_params=body_params, + files=files, + json_body=json, + additional_headers=headers, + ) - Returns: - The user agent string. - e.g. 'Python/3.6.7 slackclient/2.0.0 Darwin/17.7.0' - """ - # __name__ returns all classes, we only want the client - client = "{0}/{1}".format("slackclient", ver.__version__) - python_version = "Python/{v.major}.{v.minor}.{v.micro}".format( - v=sys.version_info + def _sync_request(self, api_url, req_args): + response, response_body = self.urllib_client._perform_http_request( + url=api_url, args=req_args, ) - system_info = "{0}/{1}".format(platform.system(), platform.release()) - user_agent_string = " ".join([python_version, client, system_info]) - return user_agent_string + return { + "status_code": int(response.status), + "headers": dict(response.headers), + "data": json.loads(response_body), + } @staticmethod def validate_slack_signature( diff --git a/slack/web/slack_response.py b/slack/web/slack_response.py index 8a04b693d..1fcefb78b 100644 --- a/slack/web/slack_response.py +++ b/slack/web/slack_response.py @@ -1,8 +1,9 @@ """A Python module for interacting and consuming responses from Slack.""" +import asyncio + # Standard Imports import logging -import asyncio # Internal Imports import slack.errors as e @@ -63,6 +64,7 @@ def __init__( data: dict, headers: dict, status_code: int, + use_sync_aiohttp: bool = True, # True for backward-compatibility ): self.http_verb = http_verb self.api_url = api_url @@ -72,6 +74,7 @@ def __init__( self.status_code = status_code self._initial_data = data self._client = client + self._use_sync_aiohttp = use_sync_aiohttp self._logger = logging.getLogger(__name__) def __str__(self): @@ -132,13 +135,27 @@ def __next__(self): {"cursor": self.data["response_metadata"]["next_cursor"]} ) - response = asyncio.get_event_loop().run_until_complete( - self._client._request( - http_verb=self.http_verb, - api_url=self.api_url, - req_args=self.req_args, + if self._use_sync_aiohttp: + # We no longer recommend going with this way + response = asyncio.get_event_loop().run_until_complete( + self._client._request( + http_verb=self.http_verb, + api_url=self.api_url, + req_args=self.req_args, + ) ) - ) + else: + if self._client is None: + # NOTE: It's not possible to support this due to circular import issues. + msg = ( + "Directly using UrllibWebClient doesn't support response pagination as iteration. " + "Use WebClient with run_async=False and use_sync_aiohttp=False." + ) + raise e.SlackRequestError(msg) + response = self._client._sync_request( + api_url=self.api_url, req_args=self.req_args + ) + self.data = response["data"] self.headers = response["headers"] self.status_code = response["status_code"] @@ -169,7 +186,7 @@ def validate(self): Raises: SlackApiError: The request to the Slack API failed. """ - if self.status_code == 200 and self.data.get("ok", False): + if self.status_code == 200 and self.data and self.data.get("ok", False): self._logger.debug("Received the following response: %s", self.data) return self msg = "The request to the Slack API failed." diff --git a/slack/web/urllib_client.py b/slack/web/urllib_client.py new file mode 100644 index 000000000..13146d5e3 --- /dev/null +++ b/slack/web/urllib_client.py @@ -0,0 +1,245 @@ +import copy +import io +import json +import logging +import mimetypes +import uuid +from http.client import HTTPResponse +from typing import BinaryIO, Dict, List, Union +from urllib.error import HTTPError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from slack.web import get_user_agent +from slack.web.slack_response import SlackResponse + + +class HttpErrorResponse(object): + pass + + +class UrllibWebClient: + logger = logging.getLogger(__name__) + + def __init__( + self, + *, + token: str = None, + default_headers: Dict[str, str] = dict(), + # Not type here to avoid ImportError: cannot import name 'WebClient' from partially initialized module + # 'slack.web.client' (most likely due to a circular import + web_client=None, + ): + """urllib-based API client. + + :param token: Slack API Token (either bot token or user token) + :param default_headers: request headers to add to all requests + :param web_client: WebClient instance for pagination + """ + self.token = token + self.default_headers = default_headers + self.web_client = web_client + + def api_call( + self, + *, + token: str = None, + url: str, + query_params: Dict[str, str] = dict(), + json_body: Dict = dict(), + body_params: Dict[str, str] = dict(), + files: Dict[str, io.BytesIO] = dict(), + additional_headers: Dict[str, str] = dict(), + ) -> SlackResponse: + """Performs a Slack API request and returns the result. + + :param token: Slack API Token (either bot token or user token) + :param url: a complete URL (e.g., https://www.slack.com/api/chat.postMessage) + :param query_params: query string + :param json_body: json data structure (it's still a dict at this point), + if you give this argument, body_params and files will be skipped + :param body_params: form params + :param files: files to upload + :param additional_headers: request headers to append + :return: API response + """ + files_to_close: List[BinaryIO] = [] + try: + if self.logger.level <= logging.DEBUG: + self.logger.debug( + f"Slack API Request - url: {url}, " + f"query_params: {query_params}, " + f"json_body: {json_body}, " + f"body_params: {body_params}, " + f"files: {files}, " + f"additional_headers: {additional_headers}" + ) + + request_data = {} + if files: + if body_params: + for k, v in body_params.items(): + request_data.update({k: v}) + + for k, v in files.items(): + if isinstance(v, str): + f: BinaryIO = open(v.encode("ascii", "ignore"), "rb") + files_to_close.append(f) + request_data.update({k: f}) + else: + request_data.update({k: v}) + + request_headers = self._build_request_headers( + token=token or self.token, + has_json=json is not None, + has_files=files is not None, + additional_headers=additional_headers, + ) + request_args = { + "headers": request_headers, + "data": request_data, + "params": body_params, + "files": files, + "json": json_body, + } + if query_params: + q = urlencode(query_params) + url = f"{url}&{q}" if "?" in url else f"{url}?{q}" + + response, response_body = self._perform_http_request( + url=url, args=request_args + ) + if response_body: + response_body_data: dict = json.loads(response_body) + else: + response_body_data: dict = None + + if query_params: + all_params = copy.copy(body_params) + all_params.update(query_params) + else: + all_params = body_params + request_args["params"] = all_params # for backward-compatibility + return SlackResponse( + client=self.web_client, + http_verb="POST", # you can use POST method for all the Web APIs + api_url=url, + req_args=request_args, + data=response_body_data, + headers=dict(response.headers), + status_code=response.status, + use_sync_aiohttp=False, + ).validate() + finally: + for f in files_to_close: + if not f.closed: + f.close() + + def _perform_http_request( + self, *, url: str, args: Dict[str, Dict[str, any]] + ) -> (Union[HTTPResponse, HttpErrorResponse], str): + """Performs an HTTP request and parses the response. + + :param url: a complete URL (e.g., https://www.slack.com/api/chat.postMessage) + :param args: args has "headers", "data", "params", and "json" + "headers": Dict[str, str] + "data": Dict[str, any] + "params": Dict[str, str], + "json": Dict[str, any], + :return: a tuple (HTTP response and its body) + """ + headers = args["headers"] + if args["json"]: + body = json.dumps(args["json"]) + headers["Content-Type"] = "application/json;charset=utf-8" + elif args["data"]: + boundary = f"--------------{uuid.uuid4()}" + sep_boundary = b"\r\n--" + boundary.encode("ascii") + end_boundary = sep_boundary + b"--\r\n" + body = io.BytesIO() + data = args["data"] + for key, value in data.items(): + readable = getattr(value, "readable", None) + if readable and value.readable(): + filename = "Uploaded file" + name_attr = getattr(value, "name", None) + if name_attr: + filename = ( + name_attr.decode("utf-8") + if isinstance(name_attr, bytes) + else name_attr + ) + if "filename" in data: + filename = data["filename"] + mimetype = ( + mimetypes.guess_type(filename)[0] or "application/octet-stream" + ) + title = ( + f'\r\nContent-Disposition: form-data; name="{key}"; filename="{filename}"\r\n' + + f"Content-Type: {mimetype}\r\n" + ) + value = value.read() + else: + title = f'\r\nContent-Disposition: form-data; name="{key}"\r\n' + value = str(value).encode("utf-8") + body.write(sep_boundary) + body.write(title.encode("utf-8")) + body.write(b"\r\n") + body.write(value) + + body.write(end_boundary) + body = body.getvalue() + headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" + headers["Content-Length"] = len(body) + elif args["params"]: + body = urlencode(args["params"]) + headers["Content-Type"] = "application/x-www-form-urlencoded;charset=utf-8" + else: + body = None + + if isinstance(body, str): + body = body.encode("utf-8") + + try: + # NOTE: Intentionally ignore the `http_verb` here + # Slack APIs accepts any API method requests with POST methods + req = Request(method="POST", url=url, data=body, headers=headers) + resp: HTTPResponse = urlopen(req) + charset = resp.headers.get_content_charset() + body: str = resp.read().decode(charset) # read the response body here + return resp, body + except HTTPError as e: + resp: HttpErrorResponse = HttpErrorResponse() + resp.status = e.code + resp.reason = e.reason + resp.headers = e.headers + charset = resp.headers.get_content_charset() + body: str = e.read().decode(charset) # read the response body here + if e.code == 429: + # for compatibility with aiohttp + resp.headers["Retry-After"] = resp.headers["retry-after"] + + return resp, body + + except Exception as err: + self.logger.error(f"Failed to send a request to Slack API server: {err}") + raise err + + def _build_request_headers( + self, token: str, has_json: bool, has_files: bool, additional_headers: dict, + ): + headers = { + "User-Agent": get_user_agent(), + "Content-Type": "application/x-www-form-urlencoded;charset=utf-8", + } + headers.update(self.default_headers) + if token: + headers.update({"Authorization": "Bearer {}".format(token)}) + if additional_headers: + headers.update(additional_headers) + if has_json: + headers.update({"Content-Type": "application/json;charset=utf-8"}) + if has_files: + # will be set afterwards + headers.pop("Content-Type", None) + return headers