diff --git a/.github/labeler.yml b/.github/labeler.yml index 87ac1f962..344ca2191 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -14,6 +14,8 @@ connector/slack: - opsdroid/connector/slack/**/* connector/telegram: - opsdroid/connector/telegram/**/* +connector/twitch: + - opsdroid/connector/twitch/**/* connector/webexteams: - opsdroid/connector/webexteams/**/* connector/websocket: diff --git a/docs/connectors/index.md b/docs/connectors/index.md index efd05b9ce..6dcc8bee4 100644 --- a/docs/connectors/index.md +++ b/docs/connectors/index.md @@ -15,6 +15,7 @@ Connectors are modules which connect opsdroid to an external event source. This shell slack telegram + twitch webexteams websocket custom diff --git a/docs/connectors/twitch.md b/docs/connectors/twitch.md new file mode 100644 index 000000000..4d58b1f25 --- /dev/null +++ b/docs/connectors/twitch.md @@ -0,0 +1,270 @@ +# Twitch + +A connector for [Twitch](https://twitch.tv/). + +## Requirements + +- A Twitch Account +- A Twitch App obtained from [Twitch developers page](https://dev.twitch.tv/console/apps) +- The `code` obtained from the first step of OAuth + +## Configuration + +```yaml +connectors: + twitch: + # required + code: "hfu923hfks02nd2821jfislf" # Code obtained from the first OAuth step + client-id: "e0asdj48jfkspod0284" + client-secret: "kdksd0458j93847j" + channel: theflyingdev # Broadcaster channel + redirect: http://localhost # Url to be passed to get oath token - defaults to localhost + foward-url: 'http://94jfsd9ff.ngrok.io' # Either an URL provided by a forwarding service or an exposed ip address + # optional + always-listening: false # Turn on to connect to the chat server even if stream is offline. +``` + +## Setup Twitch App + +You need to create a [Twitch App](https://dev.twitch.tv/console/apps) to use the Twitch Connector. Click the `+ Register Your Application` button, give this app a name and a redirect url - using `http://localhost` is fine. Once created, you can click the `Manage` button and get your `client-id`, you can then get a `client-secret` by pressing the `New Secret` button (keep this secret safe as it won't be shown to you again). + +## Getting OAuth code + +Twitch OAuth has two steps, first you need to make a `GET` request to a specific URL to obtain a `code`. After you've received the code, you need to make a `POST` request to the same URL and Twitch will send you an `access_token` and `refresh_token`. + +After a certain period, the `access_token` will expire and you have to make a new request with your `refresh_token` to re-authenticate yourself. + +_NOTE: The Twitch Connector will handle token expiration and re-authentication for you._ + +### Step 1 - Getting Code + +To get your code, you need to make a request to `https://id.twitch.tv/oauth2/authorize` with the following parameters: + +- client_id +- redirect_uri +- response_type +- scope + +Both the `client_id` and `redirect_uri` can be obtained when you click the `Manage` button on your app. The `response_type` that we want will be `code` and we will ask for a lot of scopes. You can check the [API Scopes](https://dev.twitch.tv/docs/v5/guides/migration#scopes) and the [IRC Scopes](https://dev.twitch.tv/docs/irc/guide#scopes-for-irc-commands) to read more about what we are asking and why. + +The Twitch Connector interacts with a wide range of services - IRC server, New API, V5 API - so we need to pass a big number of scopes to make sure everything works as expected. + +#### Recommended scopes + +``` +channel:read:subscriptions+channel_subscriptions+analytics:read:games+chat:read+chat:edit+viewing_activity_read+channel_feed_read+channel_editor+channel:read:subscriptions+user:read:broadcast+user:edit:broadcast+user:edit:follows+channel:moderate+channel_read +``` + +#### Example: OAuth URL + +You can use this example url to make your request - make sure you add your `client_id` before making the request. After adding your client id, you can open the url on a browser window, accept the request and Twitch will send you back to your `redirect_url`. Look at the address bar and you will see that it contains a `code=jfsd98hjh8d7da983j` this is what you need to add to your opsdroid config. + +``` +https://id.twitch.tv/oauth2/authorize?client_id=&redirect_uri=http://localhost&response_type=code&scope=channel:read:subscriptions+channel_subscriptions+analytics:read:games+chat:read+chat:edit+viewing_activity_read+channel_feed_read+channel_editor+channel:read:subscriptions+user:read:broadcast+user:edit:broadcast+user:edit:follows+channel:moderate+channel_read +``` + +## Usage + +The connector will subscribe to followers alerts, stream status (live/offline) and subscriber alerts, it will also connect to the chat service whenever the stream status notification is triggered and the `StreamStarted` event is triggered by opsdroid. If you wish you can set the optional config parameter `always-listening: True` to connect to the chat whenever opsdroid is started. + +### Events Available + +The Twitch Connector contains 10 events that you can use on your custom made skill. Some of these events are triggered automatically whenever an action happens on twitch - for example when a user follows your channel. Others you will have to trigger on a skill - for example, to delete a specific message. + +#### Automatic Events + +These events are triggered by opsdroid whenever something happens on twitch. + +```eval_rst +.. autoclass:: opsdroid.events.JoinRoom + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.events.LeaveRoom + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.UserFollowed + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.UserSubscribed + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.UserGiftedSubscription + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.StreamStarted + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.StreamEnded + :members: +``` + +#### Manual Events + +These events will have to be triggered by you with an opsdroid skill. + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.UpdateTitle + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.events.CreateClip + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.events.DeleteMessage + :members: +``` + +```eval_rst +.. autoclass:: opsdroid.events.BanUser + :members: +``` + +## Examples + +You can write your custom skills to interact with the Twitch connector, here are a few examples of what you can do. You can also use the [Twitch Skill](https://github.com/FabioRosado/skill-twitch) to interact with the connector. + +### StreamStarted event + +Let's say that you want to send a message to another connector whenever you go live, you can achieve this by writing that will be triggered when the **StreamStarted** event is triggered. + +```python +from opsdroid.skill import Skill +from opsdroid.matchers import match_event +from opsdroid.connector.twitch.events import StreamStarted + + +class TwitchSkill(Skill): + """opsdroid skill for Twitch.""" + def __init__(self, opsdroid, config, *args, **kwargs): + super().__init__(opsdroid, config, *args, **kwargs) + self.rocketchat_connector = self.opsdroid.get_connector('rocketchat') + + @match_event(StreamStarted) + async def stream_started_skill(event): + """Send message to rocketchat channel.""" + await self.rocketchat_connector.send(Message(f"I'm live on twitch, come see me work on {event.title}")) + +``` + + +### UserFollowed event + +Some bots will send a thank you message to the chat whenever a user follows your channel. You can do the same with opsdroid by using the **UserFollowed** event. + +```python +from opsdroid.skill import Skill +from opsdroid.matchers import match_event +from opsdroid.connector.twitch.events import UserFollowed + + +class TwitchSkill(Skill): + """opsdroid skill for Twitch.""" + def __init__(self, opsdroid, config, *args, **kwargs): + super().__init__(opsdroid, config, *args, **kwargs) + self.connector = self.opsdroid.get_connector('twitch') + + @match_event(UserFollowed) + async def say_thank_you(event): + """Send message to rocketchat channel.""" + await self.connector.send(Message(f"Thank you so much for the follow {event.follower}, you are awesome!")) + +``` + +### BanUser event + +We have seen how to send messages to the chat, how about we remove a spam message and ban bots from trying to sell you followers, subs and viewers? + +```python +from opsdroid.skill import Skill +from opsdroid.matchers import match_regex +from opsdroid.connector.twitch.events import BanUser, DeleteMessage + + +class TwitchSkill(Skill): + """opsdroid skill for Twitch.""" + def __init__(self, opsdroid, config, *args, **kwargs): + super().__init__(opsdroid, config, *args, **kwargs) + self.connector = self.opsdroid.get_connector('twitch') + + @match_regex(r'famous\? Buy followers', case_sensitive=False) + async def goodbye_spam_bot(self, message): + await self.connector.send(BanUser(user=message.user)) + deletion = DeleteMessage(id=message.event_id) + await self.connector.send(deletion) +``` + +### UpdateTitle event + +You need to be careful on how you set this skill, you should have a list of users that are allowed to change your broadcast title otherwise it can be abused while you are streaming. + +```python +from opsdroid.skill import Skill +from opsdroid.constraints import constrain_users +from opsdroid.matchers import match_regex +from opsdroid.connector.twitch.events import UpdateTitle + + +class TwitchSkill(Skill): + """opsdroid skill for Twitch.""" + def __init__(self, opsdroid, config, *args, **kwargs): + super().__init__(opsdroid, config, *args, **kwargs) + self.connector = self.opsdroid.get_connector('twitch') + + @match_regex(r'\!title (.*)') + @constrain_users(your_awesome_twitch_username) + async def change_title(self, message): + _LOGGER.info("Attempt to change title") + await self.connector.send(UpdateTitle(status=message.regex.group(1))) +``` + +You could also add a `whitelisted` config param to your skill and then read the configuration to check if the user that tried to change the title is in that list. + +```yaml +skills: + - twitch: + whitelisted: + - your_username_on_twitch + - your_username_on_another_connector +``` + +```python +from opsdroid.skill import Skill +from opsdroid.constraints import constrain_users +from opsdroid.matchers import match_regex +from opsdroid.connector.twitch.events import UpdateTitle + + +class TwitchSkill(Skill): + """opsdroid skill for Twitch.""" + def __init__(self, opsdroid, config, *args, **kwargs): + super().__init__(opsdroid, config, *args, **kwargs) + self.connector = self.opsdroid.get_connector('twitch') + + @match_regex(r'\!title (.*)') + async def change_title(self, message): + if message.user in self.config.get('whitelisted', []): + await self.connector.send(UpdateTitle(status=message.regex.group(1))) +``` + + +## Reference + +```eval_rst +.. autoclass:: opsdroid.connector.twitch.ConnectorTwitch + :members: +``` diff --git a/opsdroid/connector/twitch/__init__.py b/opsdroid/connector/twitch/__init__.py new file mode 100644 index 000000000..80ea2fd90 --- /dev/null +++ b/opsdroid/connector/twitch/__init__.py @@ -0,0 +1,769 @@ +"""A connector for Twitch.""" +import asyncio +import os +import re +import logging +import aiohttp +import json +import secrets +import hashlib +import hmac + +from voluptuous import Required + +from opsdroid.connector import Connector, register_event +from opsdroid.events import Message, JoinRoom, DeleteMessage, LeaveRoom, BanUser +from opsdroid.const import ( + TWITCH_API_ENDPOINT, + TWITCH_OAUTH_ENDPOINT, + TWITCH_WEBHOOK_ENDPOINT, + TWITCH_IRC_MESSAGE_REGEX, + TWITCH_JSON, +) + +from . import events as twitch_event + + +CONFIG_SCHEMA = { + Required("code"): str, + Required("client-id"): str, + Required("client-secret"): str, + Required("channel"): str, + "redirect": str, + "forward-url": str, + "always-listening": bool, +} + + +_LOGGER = logging.getLogger(__name__) + + +class ConnectorTwitch(Connector): + """A connector for Twitch.""" + + def __init__(self, config, opsdroid=None): + """Set up all the needed things for the connector.""" + super().__init__(config, opsdroid=opsdroid) + _LOGGER.debug(_("Starting Twitch connector.")) + self.name = "twitch" + self.opsdroid = opsdroid + self.is_live = config.get("always-listening", False) + self.default_target = config["channel"] + self.token = None + self.code = config["code"] + self.client_id = config["client-id"] + self.client_secret = config["client-secret"] + self.redirect = config.get("redirect", "http://localhost") + self.bot_name = config.get("bot-name", "opsdroid") + self.websocket = None + self.user_id = None + self.webhook_secret = secrets.token_urlsafe(18) + # TODO: Allow usage of SSL connection + self.server = "ws://irc-ws.chat.twitch.tv" + self.port = "80" + self.base_url = config.get("base-url") + self.loop = asyncio.get_event_loop() + self.reconnections = 0 + self.auth_file = TWITCH_JSON + + async def validate_request(self, request, secret): + """Compute sha256 hash of request and secret. + + Twitch suggests that we should always validate the requests made to our webhook callback url, + that way we protect ourselves from received an event that wasn't sent by Twitch. After sending + ``hub.secret`` on our webhook subscribe, Twitch will use that secret to send the ``x-hub-signature`` + header, that is the hash that we should compare with our own computed one, if they don't match + then the request is not valid and shouldn't be parsed. + + """ + signature = request.headers.get("x-hub-signature") + + if signature: + signature = signature.replace("sha256=", "") + + payload = await request.read() + + computed_hash = hmac.new( + secret.encode(), msg=payload, digestmod=hashlib.sha256 + ).hexdigest() + + return signature == computed_hash + + async def get_user_id(self, channel, token, client_id): + """Call twitch api to get broadcaster user id. + + A lot of webhooks expect you to pass your user id in order to get the + notification when a user subscribes or folllows the broadcaster + channel. + + Since we are calling the Twitch API to get our ``self.user_id`` on connect, + we will use this method to handle when a token has expired, so if we get a + 401 status back from Twitch we will raise a ClientResponseError and send back + the status and the message Unauthorized, that way we can refresh the oauth token + on connect if the exception is raised. + + Args: + channel (string): Channel that we wish to get the broadcaster id from. + token (string): OAuth token obtained from previous authentication. + client_id (string): Client ID obtained from creating a Twitch App to iteract with opsdroid. + + Return: + string: Broadcaster/user id received from Twitch + + Raises: + ConnectionError: Raised exception if we got an unauthorized code from twitch. Our + oauth token probably expired. + + """ + async with aiohttp.ClientSession() as session: + response = await session.get( + f"{TWITCH_API_ENDPOINT}/users", + headers={"Authorization": f"Bearer {token}", "Client-ID": client_id}, + params={"login": channel}, + ) + + if response.status == 401: + raise ConnectionError("Unauthorized") + + if response.status >= 400: + _LOGGER.warning( + _("Unable to receive broadcaster id - Error: %s, %s."), + response.status, + response.text, + ) + + response = await response.json() + + return response["data"][0]["id"] + + async def send_message(self, message): + """Send message throught websocket. + + To send a message to the Twitch IRC server through websocket we need to use the + same style, we will always send the command `PRIVMSG` and the channel we want to + send the message to. The message also comes after :. + + Args: + message(string): Text message that should be sent to Twitch chat. + + """ + await self.websocket.send_str(f"PRIVMSG #{self.default_target} :{message}") + + def save_authentication_data(self, data): + """Save data obtained from requesting authentication token.""" + with open(self.auth_file, "w") as file: + json.dump(data, file) + + def get_authorization_data(self): + """Open file containing authentication data.""" + with open(self.auth_file, "r") as file: + data = json.load(file) + return data + + async def request_oauth_token(self): + """Call Twitch and requests new oauth token. + + This method assumes that the user already has the code obtained from + following the first oauth step which is making a get request to the + twitch api endpoint: ``https://id.twitch.tv/oauth2/authorize`` and passing + the needed client id, redirect uri and needed scopes to work with the bot. + + This method is the second - and final step - when trying to get the oauth token. + We use the code that the user obtained on step one - check documentation - and + make a post request to Twitch to get the ``access_token`` and ``refresh_token`` so + we can refresh the access_token when needed. Note that the refresh_token doesn't + change with each refresh. + + """ + async with aiohttp.ClientSession() as session: + + params = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "authorization_code", + "redirect_uri": self.redirect, + "code": self.code, + } + + resp = await session.post(TWITCH_OAUTH_ENDPOINT, params=params) + data = await resp.json() + + self.token = data["access_token"] + self.save_authentication_data(data) + + async def refresh_token(self): + """Attempt to refresh the oauth token. + + Twitch oauth tokens expire after a day, so we need to do a post request to twitch + to get a new token when ours expires. The refresh token is already saved on the ``twitch.json`` + file so we can just open that file, get the appropriate token and then update the file with the + new received data. + + """ + _LOGGER.warning(_("Oauth token expired, attempting to refresh token.")) + refresh_token = self.get_authorization_data() + + async with aiohttp.ClientSession() as session: + + params = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "refresh_token", + "redirect_uri": self.redirect, + "refresh_token": refresh_token["refresh_token"], + } + + resp = await session.post(TWITCH_OAUTH_ENDPOINT, params=params) + data = await resp.json() + + self.token = data["access_token"] + self.save_authentication_data(data) + + async def send_handshake(self): + """Send needed data to the websockets to be able to make a connection. + + If we try to connect to Twitch with an expired oauth token, we need to + request a new token. The problem is that Twitch doesn't close the websocket + and will only notify the user that the login authentication failed after + we sent the ``PASS`` , ``NICK`` and ``JOIN`` command to the websocket. + + So we need to send the initial commands to Twitch, await for a status with + ``await self.websockets.recv()`` and there will be our notification that the + authentication failed in the form of ``:tmi.twitch.tv NOTICE * :Login authentication failed`` + + This method was created to prevent us from having to copy the same commands + and send them to the websocket. If there is an authentication issue, then we + will have to send the same commands again - just with a new token. + + """ + await self.websocket.send_str(f"PASS oauth:{self.token}") + await self.websocket.send_str(f"NICK {self.bot_name}") + await self.websocket.send_str(f"JOIN #{self.default_target}") + + await self.websocket.send_str("CAP REQ :twitch.tv/commands") + await self.websocket.send_str("CAP REQ :twitch.tv/tags") + await self.websocket.send_str("CAP REQ :twitch.tv/membership") + + async def connect_websocket(self): + """Connect to the irc chat through websockets. + + Our connect method will attempt to make a connection to Twitch through the + websockets server. If the connection is made, any sort of failure received + from the websocket will be in the form of a ``NOTICE``, unless Twitch closes + the websocket connection. + + In this method we attempt to connect to the websocket and use the previously + saved oauth token to join a twitch channel. + + Once we are logged in and on a Twitch channel, we will request access to special + features from Twitch. + + The ``commands`` request is used to allow us to send special commands to the Twitch + IRC server. + + The ``tags`` request is used to receive more information with each message received + from twitch. Tags enable us to get metadata such as message ids. + + The ``membership`` request is used to get notifications when an user enters the + chat server (it doesn't mean that the user is watching the streamer) and also when + a user leaves the chat channel. + + """ + _LOGGER.info(_("Connecting to Twitch IRC Server.")) + + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + f"{self.server}:{self.port}", heartbeat=600 + ) as websocket: + self.websocket = websocket + await self.send_handshake() + await self.get_messages_loop() + + async def webhook(self, topic, mode): + """Subscribe to a specific webhook. + + Twitch has different webhooks that you can subscribe to, when you subscribe to a + particular webhook, a ``post`` request needs to be made containing a ``JSON`` payload, + that tells Twitch what subscription you are attempting to do. + + When you submit the ``post`` request to ``TWITCH_WEBHOOK_ENDPOINT`` , twitch will send back + a ``get`` request to your ``callback`` url (``hub.callback`` ) with a challenge. Twitch will + then await for a response containing only the challenge in plain text. + + With this in mind, that is the reason why we open two routes (``get`` and ``post`` ) that link + to ``/connector/``. + + The ``hub.topic`` represents the webhook that we want to suscribe from twitch. + The ``hub.lease_seconds`` defines the number of seconds until the subscription expires, maximum + is 864000 seconds (10 days), but we will set up a day as our expiration since our app oauth + tokens seem to expire after a day. + + Args: + topic (string): Twitch webhook url to subscribe/unsubscribe to. + mode (string): subscribe or unsuscribe to the webhook. + + """ + _LOGGER.info(_("Attempting to connect to webhook %s."), topic) + + if topic == "follows": + topic = f"{TWITCH_API_ENDPOINT}/users/follows?to_id={self.user_id}&first=1" + + if topic == "stream changed": + topic = f"{TWITCH_API_ENDPOINT}/streams?user_id={self.user_id}" + + if topic == "subscribers": + topic = f"{TWITCH_API_ENDPOINT}/subscriptions/events?broadcaster_id={self.user_id}&first=1" + + headers = {"Client-ID": self.client_id, "Authorization": f"Bearer {self.token}"} + + async with aiohttp.ClientSession() as session: + + payload = { + "hub.callback": f"{self.base_url}/connector/{self.name}", + "hub.mode": mode, + "hub.topic": topic, + "hub.lease_seconds": 60 * 60 * 24 * 9, # Expire after 9 days + "hub.secret": self.webhook_secret, + } + + response = await session.post( + TWITCH_WEBHOOK_ENDPOINT, headers=headers, json=payload + ) + + if response.status >= 400: + _LOGGER.debug(_("Error: %s - %s"), response.status, response.text) + + async def handle_challenge(self, request): + """Challenge handler for get request made by Twitch. + + Upon subscription to a Twitch webhook, Twitch will do a get request to the + ``callback`` url provided to check if the url exists. Twitch will do a get request + with a challenge and expects the ``callback`` url to return that challenge in plain-text + back to Twitch. + + This is what we are doing here, we are getting ``hub.challenge`` from the request and return + it in plain-text, if we can't find that challenge we will return a status code 500. + + Args: + request (aiohttp.web.Request): Request made to the get route created for webhook subscription. + + Returns: + aiohttp.web.Response: if request contains ``hub.challenge`` we return it, otherwise return status 500. + + """ + challenge = request.rel_url.query.get("hub.challenge") + + if challenge: + return aiohttp.web.Response(text=challenge) + + _LOGGER.debug(_("Failed to get challenge from GET Request made by Twitch.")) + return aiohttp.web.Response(status=500) + + async def twitch_webhook_handler(self, request): + """Handle event from Twitch webhooks. + + This method will handle events when they are pushed to the webhook post route. Each webhook will + send a different kind of payload so we can handle each event and trigger the right opsdroid event + for the received payload. + + For follow events the payload will contain ``from_id`` (broadcaster id), ``from_username`` (broadcaster username) + ``to_id`` (follower id), ``to_name`` (follower name) and ``followed_at`` (timestamp). + + For stream changes a lot more things are returned but we only really care about ``type`` (if live/offline) + ``title`` (stream title). + + For subscriptions events we will want to know ``event_type`` , ``timestamp`` , ``event_data.plan_name`` , ``event_data.is_gift`` , + ``event_data.tier`` , ``event_data.username`` and ``event_data.gifter_name``. + + Args: + request (aiohttp.web.Request): Request made to the post route created for webhook subscription. + + Return: + aiohttp.web.Response: Send a ``received`` message and status 200 - Twitch will keep sending the event if it doesn't get the 200 status code. + + """ + + valid = await self.validate_request(request, self.webhook_secret) + payload = await request.json() + + if valid: + try: + [data] = payload.get("data") + _LOGGER.debug(_("Got event from Twitch - %s") % data) + + if data.get("followed_at"): + _LOGGER.debug(_("Follower event received by Twitch.")) + user_followed = twitch_event.UserFollowed( + follower=data["from_name"], + followed_at=data["followed_at"], + connector=self, + ) + await self.opsdroid.parse(user_followed) + + if data.get("started_at"): + _LOGGER.debug(_("Broadcaster went live event received by Twitch.")) + self.is_live = True + await self.listen() + + stream_started = twitch_event.StreamStarted( + title=data["title"], + viewers=data["viewer_count"], + started_at=data["started_at"], + connector=self, + ) + + await self.opsdroid.parse(stream_started) + + if data.get("event_type") == "subscriptions.notification": + _LOGGER.debug(_("Subscriber event received by Twitch.")) + user_subscription = twitch_event.UserSubscribed( + user=data["event_data"]["user_name"], + message=data["event_data"]["message"], + ) + + await self.opsdroid.parse(user_subscription) + + if data.get("event_type") == "subscriptions.subscribe": + _LOGGER.debug(_("Subscriber event received by Twitch.")) + user_subscription = twitch_event.UserSubscribed( + user=data["event_data"]["user_name"], message=None + ) + + await self.opsdroid.parse(user_subscription) + + if data.get("event_type") == "subscriptions.subscribe" and data[ + "event_data" + ].get("is_gift"): + _LOGGER.debug(_("Gifted subscriber event received by Twitch.")) + gifted_subscription = twitch_event.UserGiftedSubscription( + gifter_name=data["event_data"]["gifter_name"], + gifted_named=data["event_data"]["user_name"], + ) + + await self.opsdroid.parse(gifted_subscription) + + except ValueError: + # When the stream goes offline, Twitch will return ```data: []``` + # that will raise ValueError since it can't unpack empty list + stream_ended = twitch_event.StreamEnded(connector=self) + await self.opsdroid.parse(stream_ended) + + if not self.config.get("always-listening"): + self.is_live = False + self.disconnect_websockets() + + return aiohttp.web.Response(text=json.dumps("Received"), status=200) + return aiohttp.web.Response(text=json.dumps("Unauthorized"), status=401) + + async def connect(self): + """Connect to Twitch services. + + Within our connect method we do a quick check to see if the file ``twitch.json`` exists in + the application folder, if this file doesn't exist we assume that it's the first time the + user is running opsdroid and we do the first request for the oauth token. + + If this file exists then we just need to read from the file, get the token in the file and + attempt to connect to the websockets and subscribe to the Twitch events webhook. + + """ + if not os.path.isfile(self.auth_file): + _LOGGER.info( + _("No previous authorization data found, requesting new oauth token.") + ) + await self.request_oauth_token() + else: + _LOGGER.info( + _( + "Found previous authorization data, getting oauth token and attempting to connect." + ) + ) + self.token = self.get_authorization_data()["access_token"] + + try: + self.user_id = await self.get_user_id( + self.default_target, self.token, self.client_id + ) + except ConnectionError: + await self.refresh_token() + + self.user_id = await self.get_user_id( + self.default_target, self.token, self.client_id + ) + + # Setup routes for webhooks subscription + self.opsdroid.web_server.web_app.router.add_get( + f"/connector/{self.name}", self.handle_challenge + ) + self.opsdroid.web_server.web_app.router.add_post( + f"/connector/{self.name}", self.twitch_webhook_handler + ) + + await self.webhook("follows", "subscribe") + await self.webhook("stream changed", "subscribe") + await self.webhook("subscribers", "subscribe") + + async def listen(self): + """Listen method of the connector. + + Every connector has to implement the listen method. When an + infinite loop is running, it becomes hard to cancel this task. + So we are creating a task and set it on a variable so we can + cancel the task. + + If we need to reconnect to Twitch, Twitch will allow us to reconnect + immediatly on the first reconnect and then expects us to wait exponentially + to reconnect to the websocket. + + """ + while self.is_live: + try: + await self.connect_websocket() + except ConnectionError as e: + _LOGGER.debug(e) + await asyncio.sleep(2 ** self.reconnections) + self.reconnections += 1 + await self.connect_websocket() + + async def get_messages_loop(self): + """Listen for and parse messages. + + Since we are using aiohttp websockets support we need to manually send a + pong response every time Twitch asks for it. We also need to handle if + the connection was closed and if it was closed but we are still live, then + a ConnectionError exception is raised so we can attempt to reconnect to the + chat server again. + + """ + async for msg in self.websocket: + if msg.type == aiohttp.WSMsgType.TEXT: + if "PING" in msg.data: + await self.websocket.send_str("PONG :tmi.twitch.tv") + await self._handle_message(msg.data) + + if msg.type == aiohttp.WSMsgType.CLOSED: + await self.websocket.close() + if self.is_live: + raise ConnectionError( + "Connection to Twitch Chat Server dropped, reconnecting..." + ) + + async def _handle_message(self, message): + """Handle message from websocket connection. + + The message that we get from Twitch contains a lot of metadata, so we are using + regex named groups to get only the data that we need in order to parse a message + received. + + We also need to check if whatever we received from the websocket is indeed a text + message or an event that we need to parse. We do a few checks to decide what should + be done with the message. + + If opsdroid is running for a long time, the OAuth token will expire and the connection + to the websockets will send us back a ``:tmi.twitch.tv NOTICE * :Login authentication failed`` + so if we receive that NOTICE we will attempt to refresh the token. + + Twitch websockets send all the messages as strings, this includes PINGs, that means we will + keep getting PINGs as long as our connection is active, these messages tell us nothing important + so we made the decision to just hide them from the logs. + + Args: + message (string): Message received from websocket. + + """ + _LOGGER.debug(_("Got message from Twitch Connector chat - %s"), message) + + chat_message = re.match(TWITCH_IRC_MESSAGE_REGEX, message) + join_event = re.match(r":(?P.*)!.*JOIN", message) + left_event = re.match(r":(?P.*)!.*PART ", message) + + authentication_failed = re.match( + r":tmi.twitch.tv NOTICE \* :Login authentication failed", message + ) + + if authentication_failed: + self.refresh_token() + raise ConnectionError( + "OAuth token expire, need to reconnect to the chat service." + ) + + if chat_message: + + text_message = Message( + text=chat_message.group("message").rstrip(), + user=chat_message.group("user"), + user_id=chat_message.group("user_id"), + raw_event=message, + target=f"#{self.default_target}", + event_id=chat_message.group("message_id"), + connector=self, + ) + + await self.opsdroid.parse(text_message) + + if join_event: + joined_chat = JoinRoom( + user=join_event.group("user"), + raw_event=message, + target=f"#{self.default_target}", + connector=self, + ) + + await self.opsdroid.parse(joined_chat) + + if left_event: + left_chat = LeaveRoom( + user=left_event.group("user"), + raw_event=message, + target=f"#{self.default_target}", + connector=self, + ) + + await self.opsdroid.parse(left_chat) + + @register_event(Message) + async def _send_message(self, message): + """Send message to twitch. + + This method sends a text message to the chat service. We can't use the + default ``send`` method because we are also using different kinds of events + within this connector. + + """ + _LOGGER.debug(_("Attempting to send %s to websocket!"), message.text) + await self.send_message(message.text) + + @register_event(DeleteMessage) + async def remove_message(self, event): + """Remove message from the chat. + + This event is used when we need to remove a specific message from the chat + service. We need to pass the message id to remove a specific message. So this + method is calling the ``/delete`` method together with the message id to remove + that message. + + """ + _LOGGER.debug( + _("DeleteMessage event fired - message with the id %s removed from chat"), + event.linked_event.event_id, + ) + await self.send_message(f"/delete {event.linked_event.event_id}") + + @register_event(BanUser) + async def ban_user(self, event): + """Ban user from the channel. + + This event will be used when we need to ban a specific user from the chat channel. + Banning a user will also remove all the messages sent by that user, so we don't need + to worry about removing a lot of mensages. + + """ + _LOGGER.debug( + _("Ban event fired - user %s was banned from channel"), event.user + ) + await self.send_message(f"/ban {event.user}") + + @register_event(twitch_event.CreateClip) + async def create_clip(self): + """Create clip from broadcast. + + We send a post request to twitch to create a clip from the broadcast, Twitch will + return a response containing a clip ``id`` and ``edit_url`` . TWitch mentions that the + way to check if the clip was created successfully is by making a ``get`` request + to the ``clips`` API enpoint and query by the ``id`` obtained from the previous + request. + + """ + async with aiohttp.ClientSession() as session: + headers = { + "Client-ID": self.client_id, + "Authorization": f"Bearer {self.token}", + } + resp = await session.post( + f"{TWITCH_API_ENDPOINT}/clips?broadcaster_id={self.user_id}", + headers=headers, + ) + response = await resp.json() + + clip_data = await session.get( + f"{TWITCH_API_ENDPOINT}/clips?id={response['data'][0]['id']}", + headers=headers, + ) + + if clip_data.status == 200: + resp = await clip_data.json() + [data] = resp.get("data") + + _LOGGER.debug(_("Twitch clip created successfully.")) + + await self.send_message(data["embed_url"]) + + return + _LOGGER.debug(_("Failed to create Twitch clip %s"), response) + + @register_event(twitch_event.UpdateTitle) + async def update_stream_title(self, event): + """Update Twitch title. + + To update your channel details you need to use Twitch API V5(kraken). The so called "New Twitch API" + doesn't have an enpoint to update the channel. To update your channel details you need to do a put + request and pass your title into the url. + + Args: + event (twitch.events.UpdateTitle): opsdroid event containing ``status`` (your title). + + """ + async with aiohttp.ClientSession() as session: + headers = { + "client-id": self.client_id, + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + param = {"title": event.status, "broadcaster_id": self.user_id} + resp = await session.patch( + f"{TWITCH_API_ENDPOINT}/channels", headers=headers, params=param, + ) + + if resp.status == 204: + _LOGGER.debug(_("Twitch channel title updated to %s"), event.status) + return + + _LOGGER.debug( + _("Failed to update Twitch channel title. Error %s - %s"), + resp.status, + resp.message, + ) + + async def disconnect_websockets(self): + """Disconnect from the websocket.""" + self.is_live = False + + close_method = getattr(self.websocket, "close", None) + + if callable(close_method): + asyncio.ensure_future(close_method(), loop=self.loop) + + self.websocket = None + + async def disconnect(self): + """Disconnect from twitch. + + Before opsdroid exists we will want to disconnect the Twitch connector, we need to + do some clean up. We first set the while loop flag to False to stop the loop and then + try to unsubscribe from all the webhooks that we subscribed to on connect - we want to + do that because when we start opsdroid and the ``connect`` method is called we will send + another subscribe request to Twitch. After we will send a ``PART`` command to leave the + channel that we joined on connect. + + Finally we try to close the websocket connection. + + """ + + if self.is_live: + await self.disconnect_websockets() + + await self.webhook("follows", "unsubscribe") + await self.webhook("stream changed", "unsubscribe") + await self.webhook("subscribers", "unsubscribe") + + return diff --git a/opsdroid/connector/twitch/events.py b/opsdroid/connector/twitch/events.py new file mode 100644 index 000000000..af831e0c3 --- /dev/null +++ b/opsdroid/connector/twitch/events.py @@ -0,0 +1,78 @@ +"""Events for Twitch Connector.""" +import logging + +from opsdroid import events + +_LOGGER = logging.getLogger(__name__) + + +class UserFollowed(events.Event): + """Event class to trigger when a user follows the streamer.""" + + def __init__(self, follower, followed_at, *args, **kwargs): + """Follower username of the follower, followed_at is the timestamp of the following.""" + super().__init__(*args, **kwargs) + self.follower = follower + self.followed_at = followed_at + + +class StreamStarted(events.Event): + """Event class that triggers when streamer started broadcasting.""" + + def __init__(self, title, viewers, started_at, *args, **kwargs): + """Event that is triggered when a streamer starts broadcasting. + + This event is triggered after 2 minutes of starting the broascast and + contains a few attributes that you can access. + + ``title`` your broadcast title + ``viewers`` total number of viewers that your channel has + ``started_at`` timestamp when you went live + + """ + super().__init__(*args, **kwargs) + self.title = title + self.viewers = viewers + self.started_at = started_at + + +class StreamEnded(events.Event): + """Event class that triggers when streamer stoppped broadcasting.""" + + +class CreateClip(events.Event): + """Event class that creates a clip once triggered.""" + + def __init__(self, id, *args, **kwargs): + """Id is your streamer id.""" + super().__init__(*args, **kwargs) + self.id = id + + +class UpdateTitle(events.Event): + """Event class that updates channel title.""" + + def __init__(self, status, *args, **kwargs): + """Status is the new title for your channel.""" + super().__init__(*args, **kwargs) + self.status = status + + +class UserSubscribed(events.Event): + """Event class that triggers whenever a user subscribes to the channel.""" + + def __init__(self, user, message, *args, **kwargs): + """User is the username of the subscriber, message is the sub message, can be None.""" + super().__init__(*args, **kwargs) + self.user = user + self.message = message + + +class UserGiftedSubscription(events.Event): + """Event class that triggers when a user gifts a subscription to someone.""" + + def __init__(self, gifter_name, gifted_named, *args, **kwargs): + """Gifter_name is the sub that gifted a sub, gifted name is the gifted viewer.""" + super().__init__(*args, **kwargs) + self.gifter_name = gifter_name + self.gifted_name = gifted_named diff --git a/opsdroid/connector/twitch/tests/test_connector_twitch.py b/opsdroid/connector/twitch/tests/test_connector_twitch.py new file mode 100644 index 000000000..a362324fd --- /dev/null +++ b/opsdroid/connector/twitch/tests/test_connector_twitch.py @@ -0,0 +1,881 @@ +import os +import logging +import contextlib +import asyncio +import pytest +import asynctest.mock as amock + +from aiohttp import web, WSMessage, WSMsgType +from aiohttp.test_utils import make_mocked_request + +from opsdroid.connector.twitch import ConnectorTwitch +import opsdroid.events as opsdroid_events +import opsdroid.connector.twitch.events as twitch_event + + +AUTH_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "twitch.json") + +connector_config = { + "code": "yourcode", + "channel": "test", + "client-id": "client-id", + "client-secret": "client-secret", +} + + +def test_init(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + assert connector.default_target == "test" + assert connector.name == "twitch" + assert connector.token is None + assert connector.websocket is None + assert connector.user_id is None + assert connector.reconnections == 0 + + +@pytest.mark.asyncio +async def test_validate_request(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + request = amock.CoroutineMock() + request.headers = { + "x-hub-signature": "sha256=fcfa24b327e3467f1586cc1ace043c016cabfe9c15dabc0020aca45440338be9" + } + + request.read = amock.CoroutineMock() + request.read.return_value = b'{"test": "test"}' + + validation = await connector.validate_request(request, "test") + + assert validation + + +@pytest.mark.asyncio +async def test_get_user_id(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + get_response = amock.Mock() + get_response.status = 200 + get_response.json = amock.CoroutineMock() + get_response.json.return_value = {"data": [{"id": "test-bot"}]} + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(get_response) + + response = await connector.get_user_id("theflyingdev", "token", "client-id") + + assert response == "test-bot" + + +@pytest.mark.asyncio +async def test_get_user_id_failure(opsdroid, caplog): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + get_response = amock.Mock() + get_response.status = 404 + get_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(get_response) + + await connector.get_user_id("theflyingdev", "token", "client-id") + + assert "Unable to receive broadcaster id - Error" in caplog.text + + +@pytest.mark.asyncio +async def test_get_user_id_unauthorized(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + get_response = amock.Mock() + get_response.status = 401 + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(get_response) + + with pytest.raises(ConnectionError) as exception: + await connector.get_user_id("theflyingdev", "token", "client-id") + assert "Unauthorized" in exception.message + + +def test_save_authentication_data(opsdroid, tmpdir): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + + connector.save_authentication_data( + {"access_token": "token123", "refresh_token": "refresh_token123"} + ) + + details = connector.get_authorization_data() + + assert details == {"access_token": "token123", "refresh_token": "refresh_token123"} + + +@pytest.mark.asyncio +async def test_request_oauth_token(opsdroid, tmpdir): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + post_response.json.return_value = { + "access_token": "token", + "refresh_token": "refresh_token", + } + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + connector.save_authentication_data = amock.CoroutineMock() + + response = await connector.request_oauth_token() + + print(response) + + assert connector.token is not None + assert connector.token == "token" + assert connector.save_authentication_data.called + + +@pytest.mark.asyncio +async def test_refresh_oauth_token(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + post_response.json.return_value = { + "access_token": "token", + "refresh_token": "refresh_token", + } + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(post_response) + + connector.save_authentication_data = amock.CoroutineMock() + + response = await connector.refresh_token() + + print(response) + + assert connector.token is not None + assert connector.token == "token" + assert connector.save_authentication_data.called + + +@pytest.mark.asyncio +async def test_connect(opsdroid, caplog, tmpdir): + print(tmpdir) + caplog.set_level(logging.INFO) + + get_response = amock.Mock() + get_response.status = 200 + get_response.json = amock.CoroutineMock() + get_response.json.return_value = {"data": [{"id": "test-bot"}]} + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + connector.webhook = amock.CoroutineMock() + + opsdroid.web_server = amock.Mock() + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(get_response) + + await connector.connect() + + assert connector.webhook.called + assert opsdroid.web_server.web_app.router.add_get.called + assert "Found previous authorization data" in caplog.text + + +@pytest.mark.asyncio +async def test_connect_no_auth_data(opsdroid, caplog, tmpdir): + print(tmpdir) + caplog.set_level(logging.INFO) + get_response = amock.Mock() + get_response.status = 200 + get_response.json = amock.CoroutineMock() + get_response.json.return_value = {"data": [{"id": "test-bot"}]} + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + connector.webhook = amock.CoroutineMock() + connector.request_oauth_token = amock.CoroutineMock() + + opsdroid.web_server = amock.Mock() + + with amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as patched_request, amock.patch("os.path") as mocked_file: + patched_request.return_value = asyncio.Future() + patched_request.return_value.set_result(get_response) + + mocked_file.isfile = amock.Mock(return_value=False) + + await connector.connect() + + assert "No previous authorization data found" in caplog.text + assert connector.request_oauth_token.called + assert opsdroid.web_server.web_app.router.add_get.called + assert connector.webhook.called + + +@pytest.mark.asyncio +async def test_connect_refresh_token(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.auth_file = AUTH_FILE + connector.webhook = amock.CoroutineMock() + connector.get_user_id = amock.CoroutineMock(side_effect=ConnectionError) + connector.refresh_token = amock.CoroutineMock() + + opsdroid.web_server = amock.Mock() + + with pytest.raises(ConnectionError): + await connector.connect() + + assert connector.webhook.called + assert opsdroid.web_server.web_app.router.add_get.called + assert connector.refresh_token.called + + +@pytest.mark.asyncio +async def test_send_message(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.websocket = amock.Mock() + connector.websocket.send_str = amock.CoroutineMock() + + await connector.send_message("Hello") + + assert connector.websocket.send_str.called + + +@pytest.mark.asyncio +async def test_send_handshake(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.websocket = amock.Mock() + connector.websocket.send_str = amock.CoroutineMock() + + await connector.send_handshake() + + assert connector.websocket.send_str.called + + +@pytest.mark.asyncio +async def test_connect_websocket(opsdroid, caplog): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + caplog.set_level(logging.INFO) + + with amock.patch("aiohttp.ClientSession") as mocked_session: + + mocked_session.ws_connect = amock.CoroutineMock() + + connector.send_handshake = amock.CoroutineMock() + connector.get_messages_loop = amock.CoroutineMock() + + await connector.connect_websocket() + + assert "Connecting to Twitch IRC Server." in caplog.text + assert connector.websocket + assert connector.send_handshake.called + assert connector.get_messages_loop.called + + +@pytest.mark.asyncio +async def test_webhook_follows(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + await connector.webhook("follows", "subscribe") + + assert mocked_session.called + + +@pytest.mark.asyncio +async def test_webhook_stream_changed(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + await connector.webhook("stream changed", "subscribe") + + assert mocked_session.called + + +@pytest.mark.asyncio +async def test_webhook_subscribers(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + await connector.webhook("subscribers", "subscribe") + + assert mocked_session.called + + +@pytest.mark.asyncio +async def test_webhook_failure(opsdroid, caplog): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + caplog.set_level(logging.DEBUG) + + post_response = amock.Mock() + post_response.status = 500 + post_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + await connector.webhook("subscribers", "subscribe") + + assert "Error:" in caplog.text + + +@pytest.mark.asyncio +async def test_ban_user(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + ban_event = opsdroid_events.BanUser(user="bot_mc_spam_bot") + + connector.send_message = amock.CoroutineMock() + + await connector.ban_user(ban_event) + + assert connector.send_message.called + assert "bot_mc_spam_bot" in caplog.text + + +@pytest.mark.asyncio +async def test_create_clip(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.send_message = amock.CoroutineMock() + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + post_response.json.return_value = {"data": [{"id": "clip123"}]} + + get_response = amock.Mock() + get_response.status = 200 + get_response.json = amock.CoroutineMock() + get_response.json.return_value = { + "data": [{"id": "clip123", "embed_url": "localhost"}] + } + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_post, amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as mocked_get: + + mocked_post.return_value = asyncio.Future() + mocked_post.return_value.set_result(post_response) + + mocked_get.return_value = asyncio.Future() + mocked_get.return_value.set_result(get_response) + + clip_event = twitch_event.CreateClip(id="broadcaster123") + + await connector.create_clip() + + assert "Twitch clip created successfully." in caplog.text + assert "broadcaster123" in clip_event.id + + +@pytest.mark.asyncio +async def test_create_clip_failure(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.send_message = amock.CoroutineMock() + + post_response = amock.Mock() + post_response.status = 200 + post_response.json = amock.CoroutineMock() + post_response.json.return_value = {"data": [{"id": "clip123"}]} + + get_response = amock.Mock() + get_response.status = 404 + + with amock.patch( + "aiohttp.ClientSession.post", new=amock.CoroutineMock() + ) as mocked_post, amock.patch( + "aiohttp.ClientSession.get", new=amock.CoroutineMock() + ) as mocked_get: + + mocked_post.return_value = asyncio.Future() + mocked_post.return_value.set_result(post_response) + + mocked_get.return_value = asyncio.Future() + mocked_get.return_value.set_result(get_response) + + await connector.create_clip() + + assert "Failed to create Twitch clip" in caplog.text + + +@pytest.mark.asyncio +async def test_remove_message(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + message_event = opsdroid_events.Message( + user="spammerMcSpammy", + text="spammy message", + user_id="123", + event_id="messageid123", + ) + + remove_event = opsdroid_events.DeleteMessage(linked_event=message_event) + + connector.send_message = amock.CoroutineMock() + + await connector.remove_message(remove_event) + + assert connector.send_message.called + assert "messageid123" in caplog.text + + +@pytest.mark.asyncio +async def test_send_message_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + message_event = opsdroid_events.Message(text="Hello world!") + + connector.send_message = amock.CoroutineMock() + + await connector._send_message(message_event) + + assert connector.send_message.called + assert "Hello world!" in caplog.text + + +@pytest.mark.asyncio +async def test_update_stream_title(opsdroid, caplog): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + caplog.set_level(logging.DEBUG) + + post_response = amock.Mock() + post_response.status = 204 + post_response.json = amock.CoroutineMock() + + with amock.patch( + "aiohttp.ClientSession.patch", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + status_event = twitch_event.UpdateTitle(status="Test title!") + + await connector.update_stream_title(status_event) + + assert "Test title!" in caplog.text + + +@pytest.mark.asyncio +async def test_update_stream_title_failure(opsdroid, caplog): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + caplog.set_level(logging.DEBUG) + + post_response = amock.Mock() + post_response.status = 500 + post_response.message = "Internal Server Error" + + with amock.patch( + "aiohttp.ClientSession.patch", new=amock.CoroutineMock() + ) as mocked_session: + + mocked_session.return_value = asyncio.Future() + mocked_session.return_value.set_result(post_response) + + status_event = twitch_event.UpdateTitle(status="Test title!") + + await connector.update_stream_title(status_event) + + assert "Failed to update Twitch channel title" in caplog.text + + +@pytest.mark.asyncio +async def test_handle_challenge(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + req = make_mocked_request("GET", "/connector/twitch?hub.challenge=testchallenge123") + + resp = await connector.handle_challenge(req) + + assert "Failed to get challenge" not in caplog.text + assert "testchallenge123" in resp.text + + +@pytest.mark.asyncio +async def test_handle_challenge_error(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + req = make_mocked_request("GET", "/connector/twitch") + + resp = await connector.handle_challenge(req) + + assert "Failed to get challenge" in caplog.text + assert resp.status == 500 + + +@pytest.mark.asyncio +async def test_invalid_post_request_webhook(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=False) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = {"data": [{"test": "test"}]} + + resp = await connector.twitch_webhook_handler(mock_request) + + assert "Unauthorized" in resp.text + assert resp.status == 401 + + +@pytest.mark.asyncio +async def test_stream_ended_event(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=True) + connector.disconnect_websockets = amock.CoroutineMock() + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = {"data": []} + + twitch_event.StreamEnded = amock.Mock() + + resp = await connector.twitch_webhook_handler(mock_request) + + assert not connector.is_live + assert resp.status == 200 + assert twitch_event.StreamEnded.called + + +@pytest.mark.asyncio +async def test_followed_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=True) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "data": [{"from_name": "awesome_follower", "followed_at": "today"}] + } + follow_event = twitch_event.UserFollowed("awesome_follower", "today") + twitch_event.UserFollowed = amock.Mock() + + resp = await connector.twitch_webhook_handler(mock_request) + + assert "Follower event received by Twitch." in caplog.text + assert resp.status == 200 + assert twitch_event.UserFollowed.called + assert "awesome_follower" in follow_event.follower + assert "today" in follow_event.followed_at + + +def test_user_subscribed(): + event = twitch_event.UserSubscribed("user_mc_user", "Hello!") + + assert "user_mc_user" in event.user + assert "Hello!" in event.message + + +@pytest.mark.asyncio +async def test_gift_subscription_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=True) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "data": [ + { + "event_type": "subscriptions.subscribe", + "event_data": { + "user_name": "lucky_mc_luck", + "gifter_name": "Awesome gifter", + "is_gift": True, + }, + } + ] + } + + twitch_event.UserSubscribed = amock.Mock() + + resp = await connector.twitch_webhook_handler(mock_request) + + assert "Gifted subscriber event received by Twitch." in caplog.text + assert resp.status == 200 + assert twitch_event.UserSubscribed.called + + +@pytest.mark.asyncio +async def test_subscription_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=True) + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "data": [ + { + "event_type": "subscriptions.notification", + "event_data": { + "user_name": "awesome_subscriber!", + "message": "Best channel ever!", + }, + } + ] + } + + twitch_event.UserSubscribed = amock.Mock() + + resp = await connector.twitch_webhook_handler(mock_request) + + assert "Subscriber event received by Twitch." in caplog.text + assert resp.status == 200 + assert twitch_event.UserSubscribed.called + + +@pytest.mark.asyncio +async def test_stream_started_event(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.validate_request = amock.CoroutineMock(return_value=True) + connector.listen = amock.CoroutineMock() + + mock_request = amock.CoroutineMock() + mock_request.json = amock.CoroutineMock() + mock_request.json.return_value = { + "data": [ + { + "started_at": "just now", + "title": "Testing with pytest!", + "viewer_count": 1, + } + ] + } + + stream_start_event = twitch_event.StreamStarted( + "Testing with pytest", 1, "just now" + ) + + twitch_event.StreamStarted = amock.Mock() + + resp = await connector.twitch_webhook_handler(mock_request) + + assert "Broadcaster went live event received by Twitch." in caplog.text + assert connector.is_live + assert resp.status == 200 + assert twitch_event.StreamStarted.called + assert connector.listen.called + assert "Testing with pytest" in stream_start_event.title + assert 1 == stream_start_event.viewers + assert "just now" in stream_start_event.started_at + + +@pytest.mark.asyncio +async def test_disconnect(opsdroid): + connector_config["always-listening"] = True + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.disconnect_websockets = amock.CoroutineMock() + connector.webhook = amock.CoroutineMock() + + await connector.disconnect() + + assert connector.is_live is True + assert connector.disconnect_websockets.called + assert connector.webhook.called + + +@pytest.mark.asyncio +async def test_get_message_loop(opsdroid): + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.websocket = amock.MagicMock() + connector.websocket.__aiter__.return_value = [ + WSMessage(WSMsgType.TEXT, "PING", b""), + WSMessage(WSMsgType.TEXT, ":user@user.twitch.tmi! JOIN #channel", b""), + WSMessage(WSMsgType.CLOSED, "CLOSE", ""), + ] + + connector.websocket.send_str = amock.CoroutineMock() + connector.websocket.close = amock.CoroutineMock() + connector._handle_message = amock.CoroutineMock() + + with pytest.raises(ConnectionError): + await connector.get_messages_loop() + + assert connector.is_live + assert connector.send_str.called + assert connector.websocket.close.called + assert connector._handle_message.called + + +@pytest.mark.asyncio +async def test_handle_message_chat_message(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + message = "@badge-info=;badges=;client-nonce=jiwej12;color=;display-name=user;emotes=;flags=0-81:;id=jdias-9212;mod=0;room-id=123;subscriber=0;tmi-sent-ts=1592943868712;turbo=0;user-id=123;user-type= :user!user@user.tmi.twitch.tv PRIVMSG #channel :Hello world!" + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + opsdroid.parse = amock.CoroutineMock() + + await connector._handle_message(message) + + assert "Got message from Twitch" in caplog.text + assert opsdroid.parse.called + + +@pytest.mark.asyncio +async def test_handle_message_join_event(opsdroid): + message = ":user!user@user.tmi.twitch.tv JOIN #channel" + + join_event = opsdroid_events.JoinRoom(user="username") + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + opsdroid.parse = amock.CoroutineMock() + + await connector._handle_message(message) + + assert opsdroid.parse.called + assert "username" in join_event.user + + +@pytest.mark.asyncio +async def test_handle_message_left_event(opsdroid): + message = ":user!user@user.tmi.twitch.tv PART #channel" + + left_event = opsdroid_events.LeaveRoom(user="username") + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + opsdroid.parse = amock.CoroutineMock() + + await connector._handle_message(message) + assert opsdroid.parse.called + assert "username" in left_event.user + + +@pytest.mark.asyncio +async def test_handle_message_authentication_failed(opsdroid): + message = ":tmi.twitch.tv NOTICE * :Login authentication failed" + + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.refresh_token = amock.CoroutineMock() + + with pytest.raises(ConnectionError): + await connector._handle_message(message) + + assert connector.refresh_token.called + + +@pytest.mark.asyncio +async def test_disconnect_websockets(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + connector.websocket = web.WebSocketResponse() + + resp = await connector.disconnect_websockets() + + assert not connector.websocket + assert not connector.is_live + assert not resp + + +@pytest.mark.asyncio +async def test_listen(opsdroid): + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + + with amock.patch( + "aiohttp.ClientSession.ws_connect", new=amock.CoroutineMock() + ) as mocked_websocket: + mocked_websocket.side_effect = Exception() + + with contextlib.suppress(Exception): + await connector.listen() + + +@pytest.mark.asyncio +async def test_listen_reconnect(opsdroid, caplog): + caplog.set_level(logging.DEBUG) + connector = ConnectorTwitch(connector_config, opsdroid=opsdroid) + connector.connect_websocket = amock.CoroutineMock(side_effect=ConnectionError) + + with amock.patch( + "aiohttp.ClientSession.ws_connect", new=amock.CoroutineMock() + ) as mocked_websocket, amock.patch("asyncio.sleep") as mocked_sleep: + mocked_websocket.side_effect = Exception() + + with contextlib.suppress((Exception)): + await connector.listen() + + assert mocked_sleep.called + assert None in caplog.text + assert connector.reconnections == 1 diff --git a/opsdroid/connector/twitch/tests/twitch.json b/opsdroid/connector/twitch/tests/twitch.json new file mode 100644 index 000000000..fd439b741 --- /dev/null +++ b/opsdroid/connector/twitch/tests/twitch.json @@ -0,0 +1 @@ +{"access_token": "token123", "refresh_token": "refresh_token123"} \ No newline at end of file diff --git a/opsdroid/const.py b/opsdroid/const.py index 0b9162134..8a7e71738 100644 --- a/opsdroid/const.py +++ b/opsdroid/const.py @@ -44,3 +44,9 @@ WATSON_API_ENDPOINT = "https://{gateway}.watsonplatform.net/assistant/api" WATSON_API_VERSION = "2019-02-28" ENV_VAR_REGEX = r"^\"?\${?(?=\_?[A-Z])([A-Z-_]+)}?\"?$" + +TWITCH_OAUTH_ENDPOINT = "https://id.twitch.tv/oauth2/token" +TWITCH_WEBHOOK_ENDPOINT = "https://api.twitch.tv/helix/webhooks/hub" +TWITCH_API_ENDPOINT = "https://api.twitch.tv/helix" +TWITCH_IRC_MESSAGE_REGEX = r"@.*;id=(?P.*);m.*user-id=(?P.*);user-type=.*:(?P.*?)!.*PRIVMSG.*:(?P.*)" +TWITCH_JSON = os.path.join(DEFAULT_ROOT_PATH, "twitch.json") diff --git a/opsdroid/events.py b/opsdroid/events.py index 3834e35ac..e62babb81 100644 --- a/opsdroid/events.py +++ b/opsdroid/events.py @@ -452,6 +452,10 @@ class JoinRoom(Event): """Event class to represent a user joining a room.""" +class LeaveRoom(Event): + """Event class to represent a user leaving a room.""" + + class UserInvite(Event): """Event class to represent a user being invited to a room.""" @@ -478,3 +482,11 @@ class PinMessage(Event): class UnpinMessage(Event): """Event to represent unpinning a message or other event.""" + + +class DeleteMessage(Event): + """Event to represent deleting a message or other event.""" + + +class BanUser(Event): + """Event to represent the banning of a user from a room.""" diff --git a/requirements_test.txt b/requirements_test.txt index cf0b84639..8797e2951 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -5,6 +5,7 @@ dialogflow==1.0.0 astroid==2.4.1 pytest==6.0.1 pytest-asyncio==0.12.0 +pytest-aiohttp==0.3.0 pytest-cov==2.7.1 pytest-timeout==1.4.0 pytest-mock==3.2.0