Skip to content

Commit

Permalink
Add queue for Slack events (#1892)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinohmann committed Mar 9, 2022
1 parent 1bdafbd commit 5bfe4c8
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions opsdroid/connector/slack/connector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A connector for Slack."""
import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -71,6 +72,16 @@ def __init__(self, config, opsdroid=None):
self.known_users = {}

self._event_creator = SlackEventCreator(self)
self._event_queue = asyncio.Queue()
self._event_queue_task = None

async def _queue_worker(self):
while True:
payload = await self._event_queue.get()
try:
await self.event_handler(payload)
finally:
self._event_queue.task_done()

async def connect(self):
"""Connect to the chat service."""
Expand Down Expand Up @@ -108,6 +119,10 @@ async def connect(self):
await self.socket_mode_client.connect()
_LOGGER.info(_("Connected successfully with socket mode"))
else:
# Create a task for background processing events received by
# the web event handler.
self._event_queue_task = asyncio.create_task(self._queue_worker())

self.opsdroid.web_server.web_app.router.add_post(
f"/connector/{self.name}",
self.web_event_handler,
Expand All @@ -119,8 +134,13 @@ async def connect(self):
_LOGGER.debug(_("Default room is %s."), self.default_target)

async def disconnect(self):
"""Disconnect from Slack. Only needed when socket_mode_client is used
as the Events API uses the aiohttp server"""
"""Disconnect from Slack.
Cancels the event queue worker task and disconnects the
socket_mode_client if socket mode was enabled."""
if self._event_queue_task:
self._event_queue_task.cancel()
await asyncio.gather(self._event_queue_task, return_exceptions=True)

if self.socket_mode_client:
await self.socket_mode_client.disconnect()
Expand Down Expand Up @@ -195,7 +215,13 @@ async def web_event_handler(self, request):
if payload.get("type") == "url_verification":
return aiohttp.web.json_response({"challenge": payload["challenge"]})

await self.event_handler(payload)
# Put the event in the queue to process it in the background and
# immediately acknowledge the reception by returning status code 200.
# Slack will resend events that have not been acknowledged within 3
# seconds and we want to avoid that.
#
# https://api.slack.com/apis/connections/events-api#the-events-api__responding-to-events
self._event_queue.put_nowait(payload)

return aiohttp.web.Response(text=json.dumps("Received"), status=200)

Expand Down

0 comments on commit 5bfe4c8

Please sign in to comment.