-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_bot.py
53 lines (45 loc) · 1.89 KB
/
async_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import aiohttp
import asyncio
from settings import Config
from cbr import Currency
async def fetch_async(session: aiohttp.ClientSession, read_queue: asyncio.Queue, update_id: int) -> int:
url = f"https://api.telegram.org/bot{Config.bot_token}/getUpdates"
params = {"offset": update_id}
async with session.get(url, params=params) as response:
data = await response.json()
if data['ok']:
for item in data['result']:
if item['update_id'] > update_id:
params["offset"] = item['update_id']
read_queue.put_nowait(item)
if data['result'] and params["offset"] >= update_id:
return params["offset"] + 1
return params["offset"]
async def send_worker(from_queue: asyncio.Queue, session: aiohttp.ClientSession) -> None:
url = f"https://api.telegram.org/bot{Config.bot_token}/sendMessage"
while True:
item: dict = await from_queue.get()
message = item['message']
currency = Currency(session)
currency_dict = await currency.get_currency_dict()
text = message.get('text', '')
if text and text.upper() in currency_dict:
answer = currency_dict[text.upper()]
else:
answer = text
params = {'chat_id': message['chat']['id'], 'text': answer,
"parse_mode": "HTML"}
async with session.get(url, params=params):
await asyncio.sleep(0)
from_queue.task_done()
async def main(workers: int = 4) -> None:
write_queue = asyncio.Queue()
offset: int = 0
async with aiohttp.ClientSession() as session:
for _ in range(workers):
asyncio.create_task(send_worker(write_queue, session))
while True:
offset = await fetch_async(session, write_queue, offset)
await asyncio.sleep(0.001)
if __name__ == '__main__':
asyncio.run(main(16))