Skip to content

Commit

Permalink
Revise WebClient/RTMClient internals for run_async=False
Browse files Browse the repository at this point in the history
* slackapi#530 Fixed by changing _execute_in_thread to be a coroutine
* slackapi#569 Resolved by removing a blocking loop (while future.running())
* slackapi#645 WebClient(run_async=False) no longer depends on asyncio by default
* slackapi#633 WebClient(run_async=False) doesn't internally depend on aiohttp
* slackapi#631 When run_async=True, RTM listner can be a normal function and WebClient is free from the event loop
* slackapi#630 WebClient no longer depends on aiohttp when run_async=False
* slackapi#497 Fixed when run_async=False / can be closed as we don't support run_async=True for this use case (in Flask)
  • Loading branch information
seratch committed May 14, 2020
1 parent baef25d commit b86a8fc
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 95 deletions.
3 changes: 3 additions & 0 deletions integration_tests/rtm/test_issue_569.py
Expand Up @@ -43,6 +43,9 @@ def run_cpu_monitor(self):
TestRTMClient.cpu_monitor.setDaemon(True)
TestRTMClient.cpu_monitor.start()

self.rtm_client = None
self.web_client = None

def tearDown(self):
# Reset the decorators by @RTMClient.run_on
RTMClient._callbacks = collections.defaultdict(list)
Expand Down
1 change: 0 additions & 1 deletion integration_tests/rtm/test_issue_605.py
@@ -1,4 +1,3 @@
import asyncio
import collections
import logging
import os
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/web/test_issue_378.py
Expand Up @@ -29,7 +29,7 @@ def test_issue_378(self):
self.assertIsNotNone(response)

@async_test
async def test_issue_378(self):
async def test_issue_378_async(self):
client = self.async_client
response = await client.users_setPhoto(image="tests/data/slack_logo_new.png")
self.assertIsNotNone(response)
1 change: 0 additions & 1 deletion integration_tests/web/test_issue_480.py
Expand Up @@ -62,4 +62,3 @@ async def test_issue_480_threads_async(self):
self.assertIsNotNone(response)
after = threading.active_count()
self.assertEqual(0, after - before)

5 changes: 2 additions & 3 deletions integration_tests/web/test_issue_560.py
@@ -1,4 +1,3 @@
import asyncio
import logging
import os
import unittest
Expand All @@ -17,7 +16,7 @@ class TestWebClient(unittest.TestCase):
def setUp(self):
self.logger = logging.getLogger(__name__)
self.bot_token = os.environ[SLACK_SDK_TEST_BOT_TOKEN]
self.sync_client: WebClient = WebClient(token=self.bot_token, run_async=False, loop=asyncio.new_event_loop())
self.sync_client: WebClient = WebClient(token=self.bot_token, run_async=False)
self.async_client: WebClient = WebClient(token=self.bot_token, run_async=True)

def tearDown(self):
Expand All @@ -37,7 +36,7 @@ async def test_issue_560_success_async(self):
response = await client.conversations_list(exclude_archived=1)
self.assertIsNotNone(response)

response = client.conversations_list(exclude_archived="true")
response = await client.conversations_list(exclude_archived="true")
self.assertIsNotNone(response)

def test_issue_560_failure(self):
Expand Down
88 changes: 53 additions & 35 deletions slack/rtm/client.py
@@ -1,25 +1,23 @@
"""A Python module for interacting with Slack's RTM API."""

# Standard Imports
import os
import logging
import random
import asyncio
import collections
import concurrent
import inspect
import logging
import os
import random
import signal
from asyncio import Future
from typing import Optional, Callable, DefaultDict, List
from concurrent.futures.thread import ThreadPoolExecutor
from ssl import SSLContext
from threading import current_thread, main_thread
from typing import List
from typing import Optional, Callable, DefaultDict

# ThirdParty Imports
import asyncio
import aiohttp

# Internal Imports
from slack.web.client import WebClient
import slack.errors as client_err
from slack.web.client import WebClient


class RTMClient(object):
Expand Down Expand Up @@ -108,6 +106,8 @@ def __init__(
*,
token: str,
run_async: Optional[bool] = False,
# will be used only when run_async=False
run_sync_thread_pool_size: int = 3,
auto_reconnect: Optional[bool] = True,
ssl: Optional[SSLContext] = None,
proxy: Optional[str] = None,
Expand All @@ -120,6 +120,9 @@ def __init__(
):
self.token = token.strip()
self.run_async = run_async
self.thread_pool_executor = ThreadPoolExecutor(
max_workers=run_sync_thread_pool_size
)
self.auto_reconnect = auto_reconnect
self.ssl = ssl
self.proxy = proxy
Expand All @@ -136,6 +139,16 @@ def __init__(
self._last_message_id = 0
self._connection_attempts = 0
self._stopped = False
self._web_client = WebClient(
token=self.token,
base_url=self.base_url,
ssl=self.ssl,
proxy=self.proxy,
run_async=self.run_async,
loop=self._event_loop,
session=self._session,
headers=self.headers,
)

@staticmethod
def run_on(*, event: str):
Expand Down Expand Up @@ -196,7 +209,6 @@ def start(self) -> asyncio.Future:

if self.run_async:
return future

return self._event_loop.run_until_complete(future)

def stop(self):
Expand Down Expand Up @@ -352,7 +364,6 @@ async def _connect_and_read(self):
client_err.SlackApiError,
# TODO: Catch websocket exceptions thrown by aiohttp.
) as exception:
self._logger.debug(str(exception))
await self._dispatch_event(event="error", data=exception)
if self.auto_reconnect and not self._stopped:
await self._wait_exponentially(exception)
Expand Down Expand Up @@ -462,37 +473,27 @@ async def _dispatch_event(self, event, data=None):
# close/error callbacks.
break

if inspect.iscoroutinefunction(callback):
if self.run_async or inspect.iscoroutinefunction(callback):
await callback(
rtm_client=self, web_client=self._web_client, data=data
)
else:
self._execute_in_thread(callback, data)
await self._execute_in_thread(
callback=callback, web_client=self._web_client, data=data
)
except Exception as err:
name = callback.__name__
module = callback.__module__
msg = f"When calling '#{name}()' in the '{module}' module the following error was raised: {err}"
self._logger.error(msg)
raise

def _execute_in_thread(self, callback, data):
async def _execute_in_thread(self, callback, web_client, data):
"""Execute the callback in another thread. Wait for and return the results."""
web_client = WebClient(
token=self.token,
base_url=self.base_url,
ssl=self.ssl,
proxy=self.proxy,
headers=self.headers,
future = self.thread_pool_executor.submit(
callback, rtm_client=self, web_client=web_client, data=data
)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
callback, rtm_client=self, web_client=web_client, data=data
)

while future.running():
pass

future.result()
return future.result()

async def _retrieve_websocket_info(self):
"""Retrieves the WebSocket info from Slack.
Expand Down Expand Up @@ -527,10 +528,18 @@ async def _retrieve_websocket_info(self):
headers=self.headers,
)
self._logger.debug("Retrieving websocket info.")
if self.connect_method in ["rtm.start", "rtm_start"]:
resp = await self._web_client.rtm_start()
use_rtm_start = self.connect_method in ["rtm.start", "rtm_start"]
if self.run_async:
if use_rtm_start:
resp = await self._web_client.rtm_start()
else:
resp = await self._web_client.rtm_connect()
else:
resp = await self._web_client.rtm_connect()
if use_rtm_start:
resp = self._web_client.rtm_start()
else:
resp = self._web_client.rtm_connect()

url = resp.get("url")
if url is None:
msg = "Unable to retrieve RTM URL from Slack."
Expand All @@ -542,15 +551,24 @@ async def _wait_exponentially(self, exception, max_wait_time=300):
Calculate the number of seconds to wait and then add
a random number of milliseconds to avoid coincidental
synchronized client retries. Wait up to the maximium amount
synchronized client retries. Wait up to the maximum amount
of wait time specified via 'max_wait_time'. However,
if Slack returned how long to wait use that.
"""
wait_time = min(
(2 ** self._connection_attempts) + random.random(), max_wait_time
)
try:
wait_time = exception.response["headers"]["Retry-After"]
headers = (
exception.response["headers"]
if "headers" in exception.response
else None
)
if headers and "Retry-After" in headers:
wait_time = headers["Retry-After"]
else:
# an error returned due to other unrecoverable reasons
raise exception
except (KeyError, AttributeError):
pass
self._logger.debug("Waiting %s seconds before reconnecting.", wait_time)
Expand Down
26 changes: 25 additions & 1 deletion slack/web/__init__.py
@@ -1,8 +1,30 @@
import os
import platform
import sys
import warnings

from typing import Dict

import slack.version as ver


# ---------------------------------------


def get_user_agent():
"""Construct the user-agent header with the package info,
Python version and OS version.
Returns:
The user agent string.
e.g. 'Python/3.6.7 slackclient/2.0.0 Darwin/17.7.0'
"""
# __name__ returns all classes, we only want the client
client = "{0}/{1}".format("slackclient", ver.__version__)
python_version = "Python/{v.major}.{v.minor}.{v.micro}".format(v=sys.version_info)
system_info = "{0}/{1}".format(platform.system(), platform.release())
user_agent_string = " ".join([python_version, client, system_info])
return user_agent_string


# ---------------------------------------

Expand All @@ -15,9 +37,11 @@ def _to_0_or_1_if_bool(v: any) -> str:

def convert_bool_to_0_or_1(params: Dict[str, any]) -> Dict[str, any]:
"""Converts all bool values in dict to "0" or "1".
Slack APIs safely accept "0"/"1" as boolean values.
Using True/False (bool in Python) doesn't work with aiohttp.
This method converts only the bool values in top-level of a given dict.
:param params: params as a dict
:return: return modified dict
"""
Expand Down

0 comments on commit b86a8fc

Please sign in to comment.