Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ class Roles(metaclass=YAMLGetter):
unverified: int
verified: int # This is the Developers role on PyDis, here named verified for readability reasons.
voice_verified: int
video: int


class Guild(metaclass=YAMLGetter):
Expand Down Expand Up @@ -701,3 +702,30 @@ class Event(Enum):
"Noooooo!!",
"I can't believe you've done this",
]

# TIME_FORMATS defines aliases and multipliers for time formats
# key is a standard time unit name like second ,year, decade etc.
# mul is a multiplier where duration of said time unit * multiplier = time in seconds
# eg. 1 day = 1 * multiplier seconds, so mul = 86400
TIME_FORMATS = {
"second": {
"aliases": ("s", "sec", "seconds", "secs"),
"mul": 1
},
"minute": {
"aliases": ("m", "min", "mins", "minutes"),
"mul": 60
},
"hour": {
"aliases": ("h", "hr", "hrs", "hours"),
"mul": 3600
},
"day": {
"aliases": ("d", "days"),
"mul": 86400
},
"year": {
"aliases": ("yr", "yrs", "years", "y"),
"mul": 31536000
}
}
138 changes: 138 additions & 0 deletions bot/exts/moderation/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from discord.ext import commands, tasks
import discord

from bot.constants import Roles, STAFF_ROLES, Guild, TIME_FORMATS
from bot.bot import Bot
import time
from async_rediscache import RedisCache

# Constant error messages
NO_USER_SPECIFIED = "Please specify a user"
TIME_FORMAT_NOT_VALID = "Please specify a valid time format ex. 10h or 1day"
TIME_LESS_EQ_0 = "Duration can not be a 0 or lower"
USER_ALREADY_ALLOWED_TO_STREAM = "This user can already stream"
USER_ALREADY_NOT_ALLOWED_TO_STREAM = "This user already can't stream"


# FORMATS holds a combined list of all allowed time units
# made from TIME_FORMATS constant
FORMATS = []
for key, entry in TIME_FORMATS.items():
FORMATS.extend(entry["aliases"])
FORMATS.append(key)


class Stream(commands.Cog):
"""Stream class handles giving screen sharing permission with commands"""

# Data cache storing userid to unix_time relation
# user id is used to get member who's streaming permission need to be revoked after some time
# unix_time is a time when user's streaming permission needs tp be revoked in unix time notation
user_cache = RedisCache()

def __init__(self, bot: Bot):
self.bot = bot
self.remove_permissions.start()
self.guild_static = None

@staticmethod
def _link_from_alias(time_format) -> (dict, str):
"""Get TIME_FORMATS key and entry by time format or any of its aliases"""
for format_key, val in TIME_FORMATS.items():
if format_key == time_format or time_format in val["aliases"]:
return TIME_FORMATS[format_key], format_key

def _parse_time_to_seconds(self, duration, time_format) -> int:
"""Get time in seconds from duration and time format"""
return duration * self._link_from_alias(time_format)[0]["mul"]

@commands.command(aliases=("streaming", "share"))
@commands.has_any_role(*STAFF_ROLES)
async def stream(
self,
ctx: commands.Context,
user: discord.Member = None,
duration: int = 1,
time_format: str = "h",
*_
):
"""
stream handles <prefix>stream command
argument user - required user mention, any errors should be handled by upper level handler
duration - int must be higher than 0 - defaults to 1
time_format - str defining what time unit you want to use, must be any of FORMATS - defaults to h

Command give user permission to stream and takes it away after provided duration
"""
# Check for required user argument
# if not provided send NO_USER_SPECIFIED message
if not user:
await ctx.send(NO_USER_SPECIFIED)
return

# Time can't be negative lol
if duration <= 0:
await ctx.send(TIME_LESS_EQ_0)
return

# Check if time_format argument is a valid time format
# eg. d, day etc are aliases for day time format
if time_format not in FORMATS:
await ctx.send(TIME_FORMAT_NOT_VALID)
return

# Check if user already has streaming permission
already_allowed = any(Roles.video == role.id for role in user.roles)
if already_allowed:
await ctx.send(USER_ALREADY_ALLOWED_TO_STREAM)
return

# Set user id - time in redis cache and add streaming permission role
await self.user_cache.set(user.id, time.time() + self._parse_time_to_seconds(duration, time_format))
await user.add_roles(discord.Object(Roles.video), reason="Temporary streaming access granted")
await ctx.send(f"{user.mention} can now stream for {duration} {self._link_from_alias(time_format)[1]}/s")

@tasks.loop(seconds=30)
async def remove_permissions(self):
"""
background loop for removing streaming permission
"""
all_entries = await self.user_cache.items()
for user_id, delete_time in all_entries:
if time.time() > delete_time:
member = self.guild_static.fetch_memebr(user_id)
if member:
await member.remove_roles(discord.Object(Roles.video), reason="Temporary streaming access revoked")
await self.user_cache.pop(user_id)

@remove_permissions.before_loop
async def await_ready(self):
"""Wait for bot to be ready before starting remove_permissions loop
and get guild by id
"""
await self.bot.wait_until_ready()
self.guild_static = self.bot.get_guild(Guild.id)

@commands.command(aliases=("unstream", ))
@commands.has_any_role(*STAFF_ROLES)
async def revokestream(
self,
ctx: commands.Context,
user: discord.Member = None
):
"""
stream handles <prefix>revokestream command
argument user - required user mention, any errors should be handled by upper level handler

command removes streaming permission from a user
"""
not_allowed = not any(Roles.video == role.id for role in user.roles)
if not_allowed:
await user.remove_roles(discord.Object(Roles.video))
else:
await ctx.send(USER_ALREADY_NOT_ALLOWED_TO_STREAM)


def setup(bot: Bot) -> None:
"""Loads the Stream cog."""
bot.add_cog(Stream(bot))
3 changes: 3 additions & 0 deletions config-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ guild:
jammers: 737249140966162473
team_leaders: 737250302834638889

# Streaming
video: 764245844798079016

moderation_roles:
- *OWNERS_ROLE
- *ADMINS_ROLE
Expand Down
85 changes: 85 additions & 0 deletions tests/bot/exts/moderation/test_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
import unittest

from async_rediscache import RedisSession

from bot.constants import TIME_FORMATS, Roles
from bot.exts.moderation.stream import Stream
from tests.helpers import MockBot, MockRole, MockMember

redis_session = None
redis_loop = asyncio.get_event_loop()


def setUpModule(): # noqa: N802
"""Create and connect to the fakeredis session."""
global redis_session
redis_session = RedisSession(use_fakeredis=True)
redis_loop.run_until_complete(redis_session.connect())


def tearDownModule(): # noqa: N802
"""Close the fakeredis session."""
if redis_session:
redis_loop.run_until_complete(redis_session.close())


class StreamCommandTest(unittest.IsolatedAsyncioTestCase):

def setUp(self) -> None:
self.bot = MockBot()
self.cog = Stream(self.bot)

def test_linking_time_format_from_alias_or_key(self):
"""
User provided time format needs to be lined to a proper entry in TIME_FORMATS
This Test checks _link_from_alias method
Checking for whether alias or key exists in TIME_FORMATS is done before calling this function
"""
FORMATS = []
for key, entry in TIME_FORMATS.items():
FORMATS.extend(entry["aliases"])
FORMATS.append(key)

test_cases = (("sec", "second"),
("s", "second"),
("seconds", "second"),
("second", "second"),
("secs", "second"),
("min", "minute"),
("m", "minute"),
("minutes", "minute"),
("hr", "hour"),
("hrs", "hour"),
("hours", "hour"),
("d", "day"),
("days", "day"),
("yr", "year"),
("yrs", "year"),
("y", "year"))

for case in test_cases:
linked = self.cog._link_from_alias(case[0])[1]
self.assertEqual(linked, case[1])

def test_parsing_duration_and_time_format_to_seconds(self):
"""
Test calculating time in seconds from duration and time unit
This test is technically dependent on _link_from_alias function, not the best practice but necessary
"""
test_cases = ((1, "minute", 60), (5, "second", 5), (2, "day", 172800))
for case in test_cases:
time_in_seconds = self.cog._parse_time_to_seconds(case[0], case[1])
self.assertEqual(time_in_seconds, case[2])

def test_checking_if_user_has_streaming_permission(self):
"""
Test searching for video role in Member.roles
"""
user1 = MockMember(roles=[MockRole(id=Roles.video)])
user2 = MockMember()
already_allowed_user1 = any(Roles.video == role.id for role in user1.roles)
self.assertEqual(already_allowed_user1, True)

already_allowed_user2 = any(Roles.video == role.id for role in user2.roles)
self.assertEqual(already_allowed_user2, False)