Skip to content

Commit

Permalink
feat: absolutely flood with logging calls
Browse files Browse the repository at this point in the history
  • Loading branch information
ooliver1 committed Sep 17, 2022
1 parent f50f8a3 commit 68621aa
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
74 changes: 63 additions & 11 deletions mafic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(
self._ws_task: Task[None] | None = None

self._available = False
self._backoff = ExponentialBackoff()

self.players: dict[int, Player] = {}

Expand All @@ -80,6 +79,7 @@ def secure(self) -> bool:
return self._secure

async def connect(self) -> None:
_log.info("Waiting for client to be ready...", extra={"label": self._label})
await self._client.wait_until_ready()
assert self._client.user is not None

Expand All @@ -95,52 +95,92 @@ async def connect(self) -> None:
"Resume-Key": self._resume_key,
}

_log.info(
"Connecting to lavalink at %s...",
self._rest_uri,
extra={"label": self._label},
)
self._ws = await session.ws_connect( # pyright: ignore[reportUnknownMemberType]
self._ws_uri,
timeout=self._timeout,
heartbeat=self._heartbeat,
headers=headers,
)
_log.info("Connected to lavalink.", extra={"label": self._label})
_log.debug(
"Creating task to send configuration to resume with key %s",
self._resume_key,
extra={"label": self._label},
)

create_task(self.send_resume_configuration())

_log.info(
"Creating task for websocket listener...", extra={"label": self._label}
)
self._ws_task = create_task(
self._ws_listener(), name=f"mafic node {self._label}"
)

_log.info(
"Node %s is now available.", self._label, extra={"label": self._label}
)
self._available = True

await sleep(1)
if self._available:
self._backoff = ExponentialBackoff()

async def _ws_listener(self) -> None:
backoff = ExponentialBackoff()

if self._ws is None:
_log.error(
"No websocket was found, aborting listener.",
extra={"label": self._label},
)
raise RuntimeError(
"Websocket is not connected but attempted to listen, report this."
)

async for message in self._ws:
# To catch closing messages, we cannot use async for.
while True:
msg = await self._ws.receive()

_log.debug("Received message from websocket.", extra={"label": self._label})

# Please aiohttp, fix your typehints.
_type: WSMsgType = message.type # pyright: ignore[reportUnknownMemberType]
_type: WSMsgType = msg.type # pyright: ignore[reportUnknownMemberType]

if _type is WSMsgType.CLOSED:
self._available = False
close_code = self._ws.close_code
self._ws = None

wait_time = self._backoff.delay()
_log.warning("Websocket closed, reconnecting in %.2f...", wait_time)
wait_time = backoff.delay()
_log.warn(
"Websocket was closed from host %s port %s with RFC 6455 code %s. "
"Reconnecting in %.2fs",
self._host,
self._port,
close_code,
wait_time,
extra={"label": self._label},
)

await sleep(wait_time)
create_task(self.connect())
return
else:
create_task(self._handle_msg(message.json(loads=loads)))
_log.debug(
"Creating task to handle websocket message.",
extra={"label": self._label},
)
create_task(self._handle_msg(msg.json(loads=loads)))

async def __send(self, data: OutgoingMessage) -> None:
if self._ws is None:
raise RuntimeError(
"Websocket is not connected but attempted to send, report this."
)

_log.debug("Sending message to websocket.", extra={"label": self._label})
await self._ws.send_json(data, dumps=dumps)

async def _handle_msg(self, data: IncomingMessage) -> None:
Expand All @@ -164,7 +204,7 @@ async def _handle_msg(self, data: IncomingMessage) -> None:
else:
# Of course pyright considers this to be `Never`, so this is to keep types.
op = cast(str, data["op"])
_log.warn("Unknown incoming op code %s", op)
_log.warn("Unknown incoming message op code %s", op)

async def _handle_event(self, data: EventPayload) -> None:
if data["type"] == "WebSocketClosedEvent":
Expand Down Expand Up @@ -192,6 +232,12 @@ async def _handle_event(self, data: EventPayload) -> None:
def send_voice_server_update(
self, guild_id: int, session_id: str, data: VoiceServerUpdatePayload
) -> Coro[None]:
_log.debug(
"Sending voice server update to lavalink with data %s.",
data,
extra={"label": self._label, "guild": guild_id},
)

return self.__send(
{
"op": "voiceUpdate",
Expand All @@ -202,6 +248,12 @@ def send_voice_server_update(
)

def send_resume_configuration(self) -> Coro[None]:
_log.info(
"Sending resume configuration to lavalink with resume key %s.",
self._resume_key,
extra={"label": self._label},
)

return self.__send(
{
"op": "configureResuming",
Expand Down
8 changes: 8 additions & 0 deletions mafic/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING

from .__libraries import VoiceProtocol
Expand All @@ -18,6 +19,9 @@
from .node import Node


_log = getLogger(__name__)


class Player(VoiceProtocol):
def __init__(
self,
Expand All @@ -39,13 +43,15 @@ async def on_voice_state_update(self, data: GuildVoiceStatePayload) -> None:

async def on_voice_server_update(self, data: VoiceServerUpdatePayload) -> None:
if self._node is None:
_log.debug("Getting best node for player", extra={"guild": self._guild_id})
self._node = NodePool.get_node(
guild_id=data["guild_id"], endpoint=data["endpoint"]
)

self._server_state = data

if self._guild_id is None or self._session_id is None:
_log.warn("Ignoring voice server update as guild and session are unknown.")
return

await self._node.send_voice_server_update(
Expand All @@ -63,6 +69,8 @@ async def connect(
if not isinstance(self.channel, GuildChannel):
raise TypeError("Voice channel must be a GuildChannel.")

_log.debug("Connecting to voice channel %s", self.channel.id)

await self.channel.guild.change_voice_state(
channel=self.channel, self_mute=self_mute, self_deaf=self_deaf
)
Expand Down
5 changes: 5 additions & 0 deletions mafic/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from logging import getLogger
from random import choice
from typing import TYPE_CHECKING

Expand All @@ -15,6 +16,9 @@
from .__libraries import Client


_log = getLogger(__name__)


class NodePool:
_nodes: ClassVar[dict[str, Node]] = {}

Expand Down Expand Up @@ -53,6 +57,7 @@ async def create_node(
# TODO: assign dicts for regions and such
cls._nodes[label] = node

_log.info("Created node, connecting it...", extra={"label": label})
await node.connect()

return node
Expand Down

0 comments on commit 68621aa

Please sign in to comment.