Skip to content

Commit

Permalink
Fix #701 by reverting the change for simultaneous request handling
Browse files Browse the repository at this point in the history
  • Loading branch information
seratch committed May 25, 2020
1 parent 439c7ae commit 0c942cd
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 31 deletions.
2 changes: 1 addition & 1 deletion integration_tests/rtm/test_issue_558.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def tearDown(self):
# Reset the decorators by @RTMClient.run_on
RTMClient._callbacks = collections.defaultdict(list)

@pytest.mark.skipif(condition=is_not_specified(), reason="To avoid rate limited errors")
@pytest.mark.skipif(condition=is_not_specified(), reason="Still unfixed")
@async_test
async def test_issue_558(self):
channel_id = os.environ[SLACK_SDK_TEST_RTM_TEST_CHANNEL_ID]
Expand Down
135 changes: 135 additions & 0 deletions integration_tests/rtm/test_issue_701.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import asyncio
import collections
import logging
import os
import threading
import time
import unittest

import pytest

from integration_tests.env_variable_names import SLACK_SDK_TEST_CLASSIC_APP_BOT_TOKEN
from integration_tests.helpers import async_test, is_not_specified
from slack import RTMClient, WebClient


class TestRTMClient(unittest.TestCase):
"""Runs integration tests with real Slack API
https://github.com/slackapi/python-slackclient/issues/701
"""

def setUp(self):
self.logger = logging.getLogger(__name__)
self.bot_token = os.environ[SLACK_SDK_TEST_CLASSIC_APP_BOT_TOKEN]

def tearDown(self):
# Reset the decorators by @RTMClient.run_on
RTMClient._callbacks = collections.defaultdict(list)

# @pytest.mark.skipif(condition=is_not_specified(), reason="to avoid rate_limited errors")
@pytest.mark.skip()
def test_receiving_all_messages(self):
self.rtm_client = RTMClient(token=self.bot_token, loop=asyncio.new_event_loop())
self.web_client = WebClient(token=self.bot_token)

self.call_count = 0

@RTMClient.run_on(event="message")
def send_reply(**payload):
self.logger.debug(payload)
web_client, data = payload["web_client"], payload["data"]
web_client.reactions_add(channel=data["channel"], timestamp=data["ts"], name="eyes")
self.call_count += 1

def connect():
self.logger.debug("Starting RTM Client...")
self.rtm_client.start()

rtm = threading.Thread(target=connect)
rtm.setDaemon(True)

rtm.start()
time.sleep(3)

total_num = 10

sender_completion = []

def sent_bulk_message():
for i in range(total_num):
text = f"Sent by <https://slack.dev/python-slackclient/|python-slackclient>! ({i})"
self.web_client.chat_postMessage(channel="#random", text=text)
time.sleep(0.1)
sender_completion.append(True)

num_of_senders = 3
senders = []
for sender_num in range(num_of_senders):
sender = threading.Thread(target=sent_bulk_message)
sender.setDaemon(True)
sender.start()
senders.append(sender)

while len(sender_completion) < num_of_senders:
time.sleep(1)

expected_call_count = total_num * num_of_senders
wait_seconds = 0
max_wait = 20
while self.call_count < expected_call_count and wait_seconds < max_wait:
time.sleep(1)
wait_seconds += 1

self.assertEqual(total_num * num_of_senders, self.call_count, "The RTM handler failed")

@pytest.mark.skipif(condition=is_not_specified(), reason="to avoid rate_limited errors")
@async_test
async def test_receiving_all_messages_async(self):
self.rtm_client = RTMClient(token=self.bot_token, run_async=True)
self.web_client = WebClient(token=self.bot_token, run_async=False)

self.call_count = 0

@RTMClient.run_on(event="message")
async def send_reply(**payload):
self.logger.debug(payload)
web_client, data = payload["web_client"], payload["data"]
await web_client.reactions_add(channel=data["channel"], timestamp=data["ts"], name="eyes")
self.call_count += 1

# intentionally not waiting here
self.rtm_client.start()

await asyncio.sleep(3)

total_num = 10

sender_completion = []

def sent_bulk_message():
for i in range(total_num):
text = f"Sent by <https://slack.dev/python-slackclient/|python-slackclient>! ({i})"
self.web_client.chat_postMessage(channel="#random", text=text)
time.sleep(0.1)
sender_completion.append(True)

num_of_senders = 3
senders = []
for sender_num in range(num_of_senders):
sender = threading.Thread(target=sent_bulk_message)
sender.setDaemon(True)
sender.start()
senders.append(sender)

while len(sender_completion) < num_of_senders:
await asyncio.sleep(1)

expected_call_count = total_num * num_of_senders
wait_seconds = 0
max_wait = 20
while self.call_count < expected_call_count and wait_seconds < max_wait:
await asyncio.sleep(1)
wait_seconds += 1

self.assertEqual(total_num * num_of_senders, self.call_count, "The RTM handler failed")
45 changes: 15 additions & 30 deletions slack/rtm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,7 @@ async def _connect_and_read(self):

async def _read_messages(self):
"""Process messages received on the WebSocket connection."""
text_message_callback_executions: List[Future] = []
while not self._stopped and self._websocket is not None:
for future in text_message_callback_executions:
if future.done():
text_message_callback_executions.remove(future)

try:
# Wait for a message to be received, but timeout after a second so that
# we can check if the socket has been closed, or if self._stopped is
Expand All @@ -419,34 +414,22 @@ async def _read_messages(self):
)
self._websocket = None
await self._dispatch_event(event="close")
num_of_running_callbacks = len(text_message_callback_executions)
if num_of_running_callbacks > 0:
self._logger.info(
"WebSocket connection has been closed "
f"though {num_of_running_callbacks} callback executions were still in progress"
)
return

if message.type == aiohttp.WSMsgType.TEXT:
payload = message.json()
event = payload.pop("type", "Unknown")

async def run_dispatch_event():
try:
await self._dispatch_event(event, data=payload)
except Exception as err:
data = message.data if message else message
self._logger.info(
f"Caught a raised exception ({err}) while dispatching a TEXT message ({data})"
)
# Raised exceptions here happen in users' code and were just unhandled.
# As they're not intended for closing current WebSocket connection,
# this exception should not be propagated to higher level (#_connect_and_read()).
return

# Asynchronously run callbacks to handle simultaneous incoming messages from Slack
f = asyncio.ensure_future(run_dispatch_event())
text_message_callback_executions.append(f)
try:
payload = message.json()
event = payload.pop("type", "Unknown")
await self._dispatch_event(event, data=payload)
except Exception as err:
data = message.data if message else message
self._logger.info(
f"Caught a raised exception ({err}) while dispatching a TEXT message ({data})"
)
# Raised exceptions here happen in users' code and were just unhandled.
# As they're not intended for closing current WebSocket connection,
# this exception should not be propagated to higher level (#_connect_and_read()).
continue
elif message.type == aiohttp.WSMsgType.ERROR:
self._logger.error("Received an error on the websocket: %r", message)
await self._dispatch_event(event="error", data=message)
Expand Down Expand Up @@ -479,6 +462,8 @@ async def _dispatch_event(self, event, data=None):
}
}
"""
if self._logger.level <= logging.DEBUG:
self._logger.debug("Received an event: '%s' - %s", event, data)
for callback in self._callbacks[event]:
self._logger.debug(
"Running %s callbacks for event: '%s'",
Expand Down

0 comments on commit 0c942cd

Please sign in to comment.