Skip to content

Commit

Permalink
replace propan with faststream
Browse files Browse the repository at this point in the history
  • Loading branch information
RuslanUC committed Mar 12, 2024
1 parent e72e7b3 commit cebe12f
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 136 deletions.
3 changes: 0 additions & 3 deletions config.example.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
"rabbitmq": {
"url": "",
},
"sqs": {
"url": "",
},
"kafka": {
"bootstrap_servers": [],
},
Expand Down
227 changes: 104 additions & 123 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ werkzeug = "3.0.1"
aioftp = "0.22.3"
orjson = "3.9.15"
mailers = {version = "3.0.2", extras = ["smtp"]}
propan = {version = "0.1.5.26", extras = ["async-rabbit", "async-redis"]} # TODO: replace with faststream
redis = ">=4.6.0"
click = "8.1.7"
maxminddb = "2.5.2"
Expand All @@ -67,7 +66,8 @@ async-timeout = "^4.0.3"
aerich = "^0.7.2"
yc-protobuf3-to-dict = "^0.3.0"
s3lite = "^0.1.4"
fast-depends = ">=2.1.1,<2.2.0"
fast-depends = ">=2.4.2"
faststream = {extras = ["kafka", "nats", "rabbit", "redis"], version = "^0.4.7"}

[tool.poetry.group.dev.dependencies]
pytest-cov = "4.1.0"
Expand Down
4 changes: 2 additions & 2 deletions yepcord/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ class Gateway:
def __init__(self, core: Core):
self.core = core
self.broker = getBroker()
self.broker.handle("yepcord_events")(self.mcl_yepcordEventsCallback)
self.broker.handle("yepcord_sys_events")(self.mcl_yepcordSysEventsCallback)
self.broker.subscriber("yepcord_events")(self.mcl_yepcordEventsCallback)
self.broker.subscriber("yepcord_sys_events")(self.mcl_yepcordSysEventsCallback)
self.store = WsStore()
self.presences = Presences(self)
self.ev = GatewayEvents(self)
Expand Down
2 changes: 1 addition & 1 deletion yepcord/remote_auth/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class Gateway:
def __init__(self):
self.clients_by_fingerprint: dict[str, GatewayClient] = {}
self.broker = getBroker()
self.broker.handle("yepcord_remote_auth")(self.mq_callback)
self.broker.subscriber("yepcord_remote_auth")(self.mq_callback)

async def init(self):
await self.broker.start()
Expand Down
1 change: 0 additions & 1 deletion yepcord/yepcord/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class ConfigMessageBrokers(BaseModel):
type: str = "ws"
redis: ConfigMessageBrokerUrl = Field(default_factory=ConfigMessageBrokerUrl)
rabbitmq: ConfigMessageBrokerUrl = Field(default_factory=ConfigMessageBrokerUrl)
sqs: ConfigMessageBrokerUrl = Field(default_factory=ConfigMessageBrokerUrl)
kafka: ConfigMessageBrokerKafka = Field(default_factory=ConfigMessageBrokerKafka)
nats: ConfigMessageBrokerNats = Field(default_factory=ConfigMessageBrokerNats)
ws: ConfigMessageBrokerUrl = Field(default_factory=lambda: ConfigMessageBrokerUrl(url="ws://127.0.0.1:5055"))
Expand Down
10 changes: 6 additions & 4 deletions yepcord/yepcord/mq_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from typing import Union, Optional, Callable, Coroutine

from async_timeout import timeout
from propan import RabbitBroker, RedisBroker, SQSBroker, KafkaBroker, NatsBroker
from faststream.rabbit import RabbitBroker
from faststream.redis import RedisBroker
from faststream.kafka import KafkaBroker
from faststream.nats import NatsBroker
from websockets.client import connect
from websockets.legacy.client import WebSocketClientProtocol
from websockets.legacy.server import WebSocketServer
Expand Down Expand Up @@ -150,7 +153,7 @@ async def publish(self, message: dict, channel: str) -> None:
"message": message,
}))

def handle(self, channel: str) -> Callable: # pragma: no cover
def subscriber(self, channel: str) -> Callable: # pragma: no cover
def _handle(func):
if channel not in self._handlers:
self._handlers[channel] = set()
Expand All @@ -163,14 +166,13 @@ def _handle(func):
_brokers = {
"rabbitmq": RabbitBroker,
"redis": RedisBroker,
"sqs": SQSBroker,
"kafka": KafkaBroker,
"nats": NatsBroker,
"ws": WsBroker,
}


def getBroker() -> Union[RabbitBroker, RedisBroker, SQSBroker, KafkaBroker, NatsBroker, WsBroker]:
def getBroker() -> Union[RabbitBroker, RedisBroker, KafkaBroker, NatsBroker, WsBroker]:
broker_type = Config.MESSAGE_BROKER["type"].lower()
assert broker_type in ("rabbitmq", "redis", "sqs", "kafka", "nats", "ws",), \
"MESSAGE_BROKER.type must be one of ('rabbitmq', 'redis', 'sqs', 'kafka', 'nats', 'ws')"
Expand Down

0 comments on commit cebe12f

Please sign in to comment.