Skip to content

Commit

Permalink
fix: use SqliteQueueDatabase
Browse files Browse the repository at this point in the history
  • Loading branch information
whtsky committed Aug 10, 2021
1 parent 85ca9ad commit 8942d2b
Showing 1 changed file with 6 additions and 37 deletions.
43 changes: 6 additions & 37 deletions efb_telegram_master/db.py
Expand Up @@ -6,12 +6,10 @@
import time
from contextlib import suppress
from functools import partial
from queue import Queue
from threading import Thread
from typing import List, Optional, Tuple, Callable, Sequence, Any, Dict, Collection, TYPE_CHECKING
from typing import List, Optional, Tuple, Dict, Collection, TYPE_CHECKING

from peewee import Model, TextField, DateTimeField, CharField, SqliteDatabase, DoesNotExist, fn, BlobField, \
OperationalError
from peewee import Model, TextField, DateTimeField, CharField, DoesNotExist, fn, BlobField
from playhouse.sqliteq import SqliteQueueDatabase
from playhouse.migrate import SqliteMigrator, migrate
from telegram import Message
from typing_extensions import TypedDict
Expand All @@ -30,7 +28,7 @@
from . import TelegramChannel
from .chat import ETMChatMember, ETMChatType

database = SqliteDatabase(None)
database = SqliteQueueDatabase(None, autostart=False)

PickledDict = TypedDict('PickledDict', {
"target": EFBChannelChatIDStr,
Expand Down Expand Up @@ -181,13 +179,10 @@ def __init__(self, channel: 'TelegramChannel'):

self.logger.debug("Loading database...")
database.init(str(base_path / 'tgdata.db'))
database.start()
database.connect()
self.logger.debug("Database loaded.")

self.task_queue: 'Queue[Optional[Tuple[Callable, Sequence[Any], Dict[str, Any]]]]' = Queue()
self.worker_thread = Thread(target=self.task_worker, name="ETM database worker thread")
self.worker_thread.start()

self.logger.debug("Checking database migration...")
if not ChatAssoc.table_exists():
self._create()
Expand All @@ -204,29 +199,10 @@ def __init__(self, channel: 'TelegramChannel'):
self._migrate(3)
self.logger.debug("Database migration finished...")

def task_worker(self):
while True:
task = self.task_queue.get()
if task is None:
self.task_queue.task_done()
break
method, args, kwargs = task
try:
method(*args, **kwargs)
except OperationalError as e:
self.logger.exception("Operational error occurred when running %s(%s, %s): %r", method.__name__, args,
kwargs, e)
self.task_queue.task_done()

def stop_worker(self):
self.task_queue.put(None)
self.worker_thread.join()

def add_task(self, method: Callable, args: Sequence[Any], kwargs: Dict[str, Any]):
self.task_queue.put((method, args, kwargs))
database.stop()

@staticmethod
@database.atomic()
def _create():
"""
Initializing tables.
Expand All @@ -235,7 +211,6 @@ def _create():
database.create_tables([ChatAssoc, MsgLog, SlaveChatInfo])

@staticmethod
@database.atomic()
def _migrate(i: int):
"""
Run migrations.
Expand Down Expand Up @@ -274,7 +249,6 @@ def _migrate(i: int):
migrator.add_column("msglog", "file_unique_id", MsgLog.file_unique_id)
)

@database.atomic()
def add_chat_assoc(self, master_uid: EFBChannelChatIDStr,
slave_uid: EFBChannelChatIDStr,
multiple_slave: bool = False):
Expand All @@ -293,7 +267,6 @@ def add_chat_assoc(self, master_uid: EFBChannelChatIDStr,
return ChatAssoc.create(master_uid=master_uid, slave_uid=slave_uid)

@staticmethod
@database.atomic()
def remove_chat_assoc(master_uid: Optional[EFBChannelChatIDStr] = None,
slave_uid: Optional[EFBChannelChatIDStr] = None):
"""
Expand Down Expand Up @@ -402,7 +375,6 @@ def get_chat_assoc(master_uid: Optional[EFBChannelChatIDStr] = None,
except DoesNotExist:
return []

@database.atomic()
def add_or_update_message_log(self,
msg: ETMMsg,
master_message: Message,
Expand Down Expand Up @@ -479,7 +451,6 @@ def get_msg_log(master_msg_id: Optional[TgChatMsgIDStr] = None,
return None

@staticmethod
@database.atomic()
def delete_msg_log(master_msg_id: Optional[TgChatMsgIDStr] = None,
slave_msg_id: Optional[EFBChannelChatIDStr] = None,
slave_origin_uid: Optional[EFBChannelChatIDStr] = None):
Expand Down Expand Up @@ -526,7 +497,6 @@ def get_slave_chat_info(slave_channel_id: Optional[ModuleID] = None,
except DoesNotExist:
return None

@database.atomic()
def set_slave_chat_info(self, chat_object: 'ETMChatType') -> SlaveChatInfo:
"""
Insert or update slave chat info entry
Expand Down Expand Up @@ -575,7 +545,6 @@ def set_slave_chat_info(self, chat_object: 'ETMChatType') -> SlaveChatInfo:
pickle=chat_object.pickle)

@staticmethod
@database.atomic()
def delete_slave_chat_info(slave_channel_id: ModuleID, slave_chat_uid: ChatID, slave_chat_group_id: ChatID = None):
return SlaveChatInfo.delete() \
.where((SlaveChatInfo.slave_channel_id == slave_channel_id) &
Expand Down

0 comments on commit 8942d2b

Please sign in to comment.