We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
has
https://api.github.com/yepcord/server/blob/4e0cb962580c7ea8f5a0261f2c5605a0ca44f72f/src/yepcord/core.py#L580
token += f".{sig}" link = f"https://{Config('CLIENT_HOST')}/verify#token={token}" await EmailMsg(user.email, "Confirm your e-mail in YEPCord", f"Thank you for signing up for a YEPCord account!\nFirst you need to make sure that you are you!" f" Click to verify your email address:\n{link}").send() async def verifyEmail(self, user: mUser, token: str) -> None: try: data, sig = token.split(".") data = jloads(b64decode(data).decode("utf8")) sig = b64decode(sig) t = data["time"] assert data["email"] == user.email and data["id"] == user.id and time() - t < 600 key = new(self.key, str(user.id).encode('utf-8'), sha256).digest() vsig = new(key, f"{user.id}:{user.email}:{t}".encode('utf-8'), sha256).digest() assert sig == vsig except: raise InvalidDataErr(400, Errors.make(50035, {"token": {"code": "TOKEN_INVALID", "message": "Invalid token."}})) await user.update(verified=True) async def getUserByEmail(self, email: str) -> Optional[mUser]: return await mUser.objects.get_or_none(email=email) async def changeUserEmail(self, user: mUser, email: str) -> None: email = email.lower() if user.email == email: return if await self.getUserByEmail(email): raise InvalidDataErr(400, Errors.make(50035, {"email": {"code": "EMAIL_ALREADY_REGISTERED", "message": "Email address already registered."}})) await user.update(email=email, verified=False) async def sendMfaChallengeEmail(self, user: mUser, nonce: str) -> None: code = await self.mfaNonceToCode(user, nonce) await EmailMsg(user.email, f"Your one-time verification key is {code}", f"It looks like you're trying to view your account's backup codes.\n" f"This verification key expires in 10 minutes. This key is extremely sensitive, treat it like a " f"password and do not share it with anyone.\n" f"Enter it in the app to unlock your backup codes:\n{code}").send() async def mfaNonceToCode(self, user: mUser, nonce: str) -> Optional[str]: if not (payload := JWT.decode(nonce, self.key)): return token = JWT.encode({"code": payload["code"]}, self.key) signature = token.split(".")[2] return signature.replace("-", "").replace("_", "")[:8].upper() async def createDMGroupChannel(self, user: mUser, recipients: list[mUser], name: Optional[str]=None) -> mChannel: if user.id not in recipients: recipients.append(user) channel = await mChannel.objects.create(id=Snowflake.makeId(), type=ChannelType.GROUP_DM, name=name, owner=user) for recipient in recipients: await channel.recipients.add(recipient) return channel async def addUserToGroupDM(self, channel: mChannel, target_user: mUser) -> None: await channel.recipients.add(target_user) async def deleteChannel(self, channel: mChannel) -> None: await channel.delete() async def deleteMessagesAck(self, channel: mChannel, user: mUser) -> None: await mReadState.objects.delete(user=user, channel=channel) async def pinMessage(self, message: mMessage) -> None: if await mMessage.objects.filter(pinned=True, channel=message.channel).count() >= 50: raise InvalidDataErr(400, Errors.make(30003)) message.extra_data["pinned_at"] = int(time()) await message.update(extra_data=message.extra_data, pinned=True) async def getLastPinnedMessage(self, channel: mChannel) -> Optional[mMessage]: # TODO: order by pinned timestamp return await mMessage.objects.filter(pinned=True, channel=channel).order_by("-id").first() async def getLastPinTimestamp(self, channel: mChannel) -> str: last = await self.getLastPinnedMessage(channel) last_ts = last.extra_data["pinned_at"] if last is not None else 0 return datetime.utcfromtimestamp(last_ts).strftime("%Y-%m-%dT%H:%M:%S+00:00") async def getPinnedMessages(self, channel: mChannel) -> list[mMessage]: return await mMessage.objects.filter(pinned=True, channel=channel).all() async def unpinMessage(self, message: mMessage) -> None: await message.update(pinned=False) async def addReaction(self, message: mMessage, user: mUser, emoji: mEmoji, emoji_name: str) -> mReaction: return await mReaction.objects.get_or_create(user=user, message=message, emoji=emoji, emoji_name=emoji_name) async def removeReaction(self, message: mMessage, user: mUser, emoji: mEmoji, emoji_name: str) -> None: await mReaction.objects.delete(user=user, message=message, emoji=emoji, emoji_name=emoji_name) async def getMessageReactionsJ(self, message: mMessage, user: mUser) -> list: reactions = [] # TODO: test and maybe rewrite whole method #result = await mChannel.Meta.database.fetch_all( # query=f'SELECT `emoji_name` as ename, `emoji` as eid, COUNT(*) AS ecount, (SELECT COUNT(*) > 0 FROM ' # f'`reactions` WHERE `emoji_name`=ename AND (`emoji`=eid OR (`emoji` IS NULL AND eid IS NULL)) ' # f'AND `user_id`=:user_id) as me FROM `reactions` WHERE `message_id`=:message_id GROUP BY ' # f'CONCAT(`emoji_name`, `emoji`) COLLATE utf8mb4_unicode_520_ci;', # values={"user_id": user.id, "message_id": message.id} #) #for r in result: # reactions.append( # {"emoji": {"id": str(r[1]) if r[1] else None, "name": r[0]}, "count": r[2], "me": bool(r[3])}) return reactions async def getReactedUsersJ(self, message: mMessage, limit: int, emoji: mEmoji, emoji_name: str) -> list[dict]: users = [] reactions = await mReaction.objects.select_related("user").filter( message=message, emoji=emoji, emoji_name=emoji_name ).limit(limit).all() for reaction in reactions: data = await reaction.user.data users.append(data.ds_json) return users async def searchMessages(self, search_filter: dict) -> tuple[list[mMessage], int]: filter_args = {} query = mMessage.objects.order_by("-id").limit(25) if "author_id" in search_filter: filter_args["author__id"] = search_filter["author_id"] if "mentions" in search_filter: filter_args["content__contains"] = f"<@{search_filter['mentions']}>" if "has" in search_filter: ... # TODO: add `has` filter if "min_id" in search_filter: filter_args["id__gt"] = search_filter["min_id"] if "max_id" in search_filter: filter_args["id__lt"] = search_filter["max_id"] if "pinned" in search_filter: filter_args["pinned"] = search_filter["pinned"].lower() == "true" if "offset" in search_filter: query = query.offset(search_filter["offset"]) if "content" in search_filter: filter_args["content__icontains"] = search_filter["content"] query = query.filter(**filter_args) messages = await query.all() count = await query.count() return messages, count async def createInvite(self, channel: mChannel, inviter: mUser, max_age: int=86400, max_uses: int=0) -> mInvite: return await mInvite.objects.create( id=Snowflake.makeId(), channel=channel, inviter=inviter, max_age=max_age, max_uses=max_uses ) async def getInvite(self, invite_id: int) -> Optional[mInvite]: return await mInvite.objects.select_related(["channel", "channel__guild", "inviter"]).get_or_none(id=invite_id) async def createGuild(self, guild_id: int, user: mUser, name: str, icon: str=None) -> mGuild: guild = await mGuild.objects.create(id=guild_id, owner=user, name=name, icon=icon) await mRole.objects.create(id=guild.id, guild=guild, name="@everyone") text_category = await mChannel.objects.create( id=Snowflake.makeId(), type=ChannelType.GUILD_CATEGORY, guild=guild, name="Text Channels", position=0, flags=0, rate_limit=0 ) voice_category = await mChannel.objects.create( id=Snowflake.makeId(), type=ChannelType.GUILD_CATEGORY, guild=guild, name="Voice Channels", position=0, flags=0, rate_limit=0 ) system_channel = await mChannel.objects.create( id=Snowflake.makeId(), type=ChannelType.GUILD_TEXT, guild=guild.id, name="general", position=0, flags=0, parent=text_category, rate_limit=0 ) await mChannel.objects.create( id=Snowflake.makeId(), type=ChannelType.GUILD_VOICE, guild=guild.id, name="General", position=0, flags=0, parent=voice_category, bitrate=64000, user_limit=0, rate_limit=0 ) await guild.update(system_channel=system_channel.id) await mGuildMember.objects.create(id=user.id, user=user, guild=guild) return guild async def createGuildFromTemplate(self, guild_id: int, user: mUser, template: mGuildTemplate, name: Optional[str], icon: Optional[str]) -> mGuild: guild = await mGuild.objects.create(id=guild_id, owner=user.id) serialized = template.serialized_guild serialized["name"] = name or serialized["name"] serialized["icon"] = icon replaced_ids: dict[Union[int, NoneType], Union[int, NoneType]] = {None: None, 0: guild_id} channels = {} for role in serialized["roles"]: if role["id"] not in replaced_ids: replaced_ids[role["id"]] = Snowflake.makeId() role["id"] = replaced_ids[role["id"]] await mRole.objects.create(guild=guild, **role) for channel in serialized["channels"]: if channel["id"] not in replaced_ids: replaced_ids[channel["id"]] = Snowflake.makeId() channel["id"] = channel_id = replaced_ids[channel["id"]] channel["parent"] = channels.get(replaced_ids.get(channel["parent_id"], None), None) channel["rate_limit"] = channel["rate_limit_per_user"] channel["default_auto_archive"] = channel["default_auto_archive_duration"] del channel["rate_limit_per_user"] del channel["default_auto_archive_duration"]
The text was updated successfully, but these errors were encountered:
No branches or pull requests
https://api.github.com/yepcord/server/blob/4e0cb962580c7ea8f5a0261f2c5605a0ca44f72f/src/yepcord/core.py#L580
The text was updated successfully, but these errors were encountered: