From 6838e5122e4e5197ced8feebd6e3f8806929b2c0 Mon Sep 17 00:00:00 2001 From: innightwolfsleep Date: Wed, 13 Sep 2023 17:18:31 +0600 Subject: [PATCH 1/4] token from env --- .env.example | 1 + .gitignore | 1 + run.cmd | 1 + run.py | 8 +++++++- run.sh | 1 + script.py | 7 ++++++- 6 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 .env.example create mode 100644 run.cmd create mode 100644 run.sh diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..84f49cc --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +BOT_TOKEN=XXXXXXXXXX:YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7fc1116..4fe7a2b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ characters/* !characters/Coder.json history/* history/telegram_history_here.txt +.env \ No newline at end of file diff --git a/run.cmd b/run.cmd new file mode 100644 index 0000000..6cc737f --- /dev/null +++ b/run.cmd @@ -0,0 +1 @@ +call python run.py \ No newline at end of file diff --git a/run.py b/run.py index 201672b..1f4a097 100644 --- a/run.py +++ b/run.py @@ -1,12 +1,18 @@ import sys +import os from threading import Thread from telegram_bot_wrapper import TelegramBotWrapper +from dotenv import load_dotenv +config_file_path="configs/telegram_config.json" def run_server(token): + if not token: + load_dotenv() + token = os.environ.get("BOT_TOKEN", "") # create TelegramBotWrapper instance # by default, read parameters in telegram_config.cfg - tg_server = TelegramBotWrapper(config_file_path="configs/telegram_config.json") + tg_server = TelegramBotWrapper(config_file_path=config_file_path) # by default - read token from telegram_token.txt tg_server.run_telegram_bot(bot_token=str(token)) diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..64d2ef0 --- /dev/null +++ b/run.sh @@ -0,0 +1 @@ +python3 run.py \ No newline at end of file diff --git a/script.py b/script.py index 4536e78..bdac9bf 100644 --- a/script.py +++ b/script.py @@ -1,12 +1,17 @@ from threading import Thread from extensions.telegram_bot.TelegramBotWrapper import TelegramBotWrapper +from dotenv import load_dotenv +config_file_path="extensions/telegram_bot/configs/telegram_config.json" def run_server(): + if not token: + load_dotenv() + token = os.environ.get("BOT_TOKEN", "") # create TelegramBotWrapper instance # by default, read parameters in telegram_config.cfg tg_server = TelegramBotWrapper( - config_file_path="extensions/telegram_bot/configs/telegram_config.json" + config_file_path=config_file_path ) # by default - read token from extensions/telegram_bot/telegram_token.txt tg_server.run_telegram_bot() From d23ba7cc829f0b3b947a03ddc1217e66f1bf2a92 Mon Sep 17 00:00:00 2001 From: innightwolfsleep Date: Wed, 13 Sep 2023 17:52:15 +0600 Subject: [PATCH 2/4] running on-click scripts --- readme.md | 11 ++++++----- requirements.txt | 3 ++- run.sh | 1 + script.py | 14 ++++++++------ 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/readme.md b/readme.md index d003bcb..f9a125e 100644 --- a/readme.md +++ b/readme.md @@ -21,16 +21,17 @@ pip install -r llm_telegram_bot\requirements.txt ``` HOW TO USE: -1) add your bot token to configs/telegram_token.txt (ask https://t.me/BotFather how to get token) -2) add your model bin file to models/ -3) write path to your bin model file in configs/telegram_config.json - model_path -2) run "python llm_telegram_bot/main.py" +1) get bot token from https://t.me/BotFather +2) add bot token to environment (look `.env.example`) OR file `configs/telegram_token.txt` +3) move your model file to `models/` +4) set **model_path** to your model in `configs/telegram_config.json` +5) start `run.cmd` or `run.sh` or `python3 run.py` FEATURES: - chat and notebook modes - session for all users are separative (by chat_id) - local session history - conversation won't be lost if server restarts. Separated history between users and chars. -- nice "X typing" during generating (users will not think that bot stucking) +- nice "X typing" during generating (users will not think that bot stuck) - buttons: continue previous message, regenerate last message, remove last messages from history, reset history button, new char loading menu - you can load new characters from text-generation-webui\characters with button - you can load new model during conversation with button diff --git a/requirements.txt b/requirements.txt index f0cb3dd..8a70646 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ torch>=2.0.1 backoff>=2.2.1 langchain>=0.0.286 requests>=2.31.0 -urllib3>=2.0.4 \ No newline at end of file +urllib3>=2.0.4 +python-dotenv==1.0.0 \ No newline at end of file diff --git a/run.sh b/run.sh index 64d2ef0..621ceca 100644 --- a/run.sh +++ b/run.sh @@ -1 +1,2 @@ +#!/usr/bin/bash python3 run.py \ No newline at end of file diff --git a/script.py b/script.py index bdac9bf..d3bab6a 100644 --- a/script.py +++ b/script.py @@ -1,18 +1,20 @@ +import os from threading import Thread -from extensions.telegram_bot.TelegramBotWrapper import TelegramBotWrapper +from extensions.telegram_bot.telegram_bot_wrapper import TelegramBotWrapper from dotenv import load_dotenv -config_file_path="extensions/telegram_bot/configs/telegram_config.json" +# This module added to get compatibility with text-generation-webui-telegram_bot -def run_server(): +config_file_path = "extensions/telegram_bot/configs/telegram_config.json" + + +def run_server(token=""): if not token: load_dotenv() token = os.environ.get("BOT_TOKEN", "") # create TelegramBotWrapper instance # by default, read parameters in telegram_config.cfg - tg_server = TelegramBotWrapper( - config_file_path=config_file_path - ) + tg_server = TelegramBotWrapper(config_file_path=config_file_path) # by default - read token from extensions/telegram_bot/telegram_token.txt tg_server.run_telegram_bot() From cab25c79d26b705c1d1af5d682b268aa81374d76 Mon Sep 17 00:00:00 2001 From: innightwolfsleep Date: Wed, 13 Sep 2023 18:07:55 +0600 Subject: [PATCH 3/4] running on-click scripts --- run.py | 3 +- telegram_bot_wrapper.py | 346 ++++++++++++++++++++++++++++++---------- 2 files changed, 266 insertions(+), 83 deletions(-) diff --git a/run.py b/run.py index 1f4a097..a2a3e57 100644 --- a/run.py +++ b/run.py @@ -4,7 +4,8 @@ from telegram_bot_wrapper import TelegramBotWrapper from dotenv import load_dotenv -config_file_path="configs/telegram_config.json" +config_file_path = "configs/telegram_config.json" + def run_server(token): if not token: diff --git a/telegram_bot_wrapper.py b/telegram_bot_wrapper.py index 623fdf5..e243649 100644 --- a/telegram_bot_wrapper.py +++ b/telegram_bot_wrapper.py @@ -100,7 +100,9 @@ class TelegramBotWrapper: "📽", ] sd_api_prompt_of = "Detailed description of OBJECT:" - sd_api_prompt_self = "Detailed description of appearance, surroundings and what doing right now: " + sd_api_prompt_self = ( + "Detailed description of appearance, surroundings and what doing right now: " + ) # Language list language_dict = { "en": "🇬🇧", @@ -184,7 +186,9 @@ def __init__( with open(self.user_rules_file_path, "r") as user_rules_file: self.user_rules = json.loads(user_rules_file.read()) else: - logging.error("Cant find user_rules_file_path: " + self.user_rules_file_path) + logging.error( + "Cant find user_rules_file_path: " + self.user_rules_file_path + ) self.user_rules = {} # Silero initiate self.silero = Silero() @@ -204,28 +208,48 @@ def load_config_file(self, config_file_path: str): with open(config_file_path, "r") as config_file_path: config = json.loads(config_file_path.read()) self.bot_mode = config.get("bot_mode", self.bot_mode) - self.generator_script = config.get("generator_script", self.generator_script) + self.generator_script = config.get( + "generator_script", self.generator_script + ) self.model_path = config.get("model_path", self.model_path) self.default_preset = config.get("default_preset", self.default_preset) self.default_char = config.get("default_char", self.default_char) self.model_lang = config.get("model_lang", self.model_lang) self.user_lang = config.get("user_lang", self.user_lang) - self.characters_dir_path = config.get("characters_dir_path", self.characters_dir_path) - self.presets_dir_path = config.get("presets_dir_path", self.presets_dir_path) - self.history_dir_path = config.get("history_dir_path", self.history_dir_path) - self.token_file_path = config.get("token_file_path", self.token_file_path) - self.admins_file_path = config.get("admins_file_path", self.admins_file_path) - self.users_file_path = config.get("users_file_path", self.users_file_path) + self.characters_dir_path = config.get( + "characters_dir_path", self.characters_dir_path + ) + self.presets_dir_path = config.get( + "presets_dir_path", self.presets_dir_path + ) + self.history_dir_path = config.get( + "history_dir_path", self.history_dir_path + ) + self.token_file_path = config.get( + "token_file_path", self.token_file_path + ) + self.admins_file_path = config.get( + "admins_file_path", self.admins_file_path + ) + self.users_file_path = config.get( + "users_file_path", self.users_file_path + ) self.generator_params_file_path = config.get( "generator_params_file_path", self.generator_params_file_path ) - self.user_rules_file_path = config.get("user_rules_file_path", self.user_rules_file_path) + self.user_rules_file_path = config.get( + "user_rules_file_path", self.user_rules_file_path + ) self.sd_api_url = config.get("sd_api_url", self.sd_api_url) - self.sd_config_file_path = config.get("sd_config_file_path", self.sd_config_file_path) + self.sd_config_file_path = config.get( + "sd_config_file_path", self.sd_config_file_path + ) self.translation_as_hidden_text = config.get( "translation_as_hidden_text", self.translation_as_hidden_text ) - self.stopping_strings = config.get("stopping_strings", self.stopping_strings) + self.stopping_strings = config.get( + "stopping_strings", self.stopping_strings + ) self.eos_token = config.get("eos_token", self.eos_token) else: logging.error("Cant find config_file " + config_file_path) @@ -244,8 +268,12 @@ def run_telegram_bot(self, bot_token="", token_file_name=""): with open(token_file_name, "r", encoding="utf-8") as f: bot_token = f.read().strip() self.updater = Updater(token=bot_token, use_context=True) - self.updater.dispatcher.add_handler(CommandHandler("start", self.cb_start_command)), - self.updater.dispatcher.add_handler(MessageHandler(Filters.text, self.cb_get_message)) + self.updater.dispatcher.add_handler( + CommandHandler("start", self.cb_start_command) + ), + self.updater.dispatcher.add_handler( + MessageHandler(Filters.text, self.cb_get_message) + ) self.updater.dispatcher.add_handler( MessageHandler( Filters.document.mime_type("application/json"), @@ -260,7 +288,9 @@ def run_telegram_bot(self, bot_token="", token_file_name=""): def no_sleep_callback(self): while True: try: - self.updater.bot.send_message(chat_id=99999999999, text="One message every minute") + self.updater.bot.send_message( + chat_id=99999999999, text="One message every minute" + ) except BadRequest: pass except Exception as error: @@ -296,7 +326,9 @@ def thread_welcome_message(self, upd: Update, context: CallbackContext): parse_mode="HTML", ) - def make_template_message(self, request: str, chat_id: int, custom_string="") -> str: + def make_template_message( + self, request: str, chat_id: int, custom_string="" + ) -> str: # create a message using default_messages_template or return # UNKNOWN_TEMPLATE if chat_id in self.users: @@ -347,15 +379,21 @@ def init_check_user(self, chat_id): characters_dir_path=self.characters_dir_path, char_file=self.default_char, ) - self.users[chat_id].load_user_history(f"{self.history_dir_path}/{str(chat_id)}.json") - self.users[chat_id].find_and_load_user_char_history(chat_id, self.history_dir_path) + self.users[chat_id].load_user_history( + f"{self.history_dir_path}/{str(chat_id)}.json" + ) + self.users[chat_id].find_and_load_user_char_history( + chat_id, self.history_dir_path + ) def thread_get_json_document(self, upd: Update, context: CallbackContext): chat_id = upd.message.chat.id if not self.check_user_permission(chat_id): return False self.init_check_user(chat_id) - default_user_file_path = str(Path(f"{self.history_dir_path}/{str(chat_id)}.json")) + default_user_file_path = str( + Path(f"{self.history_dir_path}/{str(chat_id)}.json") + ) with open(default_user_file_path, "wb") as f: context.bot.get_file(upd.message.document.file_id).download(out=f) self.users[chat_id].load_user_history(default_user_file_path) @@ -374,10 +412,14 @@ def thread_get_json_document(self, upd: Update, context: CallbackContext): def typing_status_start(self, context: CallbackContext, chat_id: int) -> Event: typing_active = Event() typing_active.set() - Thread(target=self.thread_typing_status, args=(context, chat_id, typing_active)).start() + Thread( + target=self.thread_typing_status, args=(context, chat_id, typing_active) + ).start() return typing_active - def thread_typing_status(self, context: CallbackContext, chat_id: int, typing_active: Event): + def thread_typing_status( + self, context: CallbackContext, chat_id: int, typing_active: Event + ): limit_counter = int(self.generation_timeout / 6) while typing_active.is_set() and limit_counter > 0: context.bot.send_chat_action(chat_id=chat_id, action=CHATACTION_TYPING) @@ -488,7 +530,9 @@ def generate_answer(self, user_in, chat_id) -> Tuple[str, str]: else: user.user_in.append(user_in) user.history.append("") - user.history.append(self.sd_api_prompt_of.replace("OBJECT", user_in[1:].strip())) + user.history.append( + self.sd_api_prompt_of.replace("OBJECT", user_in[1:].strip()) + ) return_msg_action = self.MSG_SD_API elif user_in[0] in self.impersonate_prefixes: # If user_in starts with prefix - impersonate-like (if you try to get "impersonate view") @@ -542,7 +586,9 @@ def generate_answer(self, user_in, chat_id) -> Tuple[str, str]: available_len -= context_len if available_len < 0: available_len = 0 - logging.info("telegram_bot: " + str(chat_id) + " - CONTEXT IS TOO LONG!!!") + logging.info( + "telegram_bot: " + str(chat_id) + " - CONTEXT IS TOO LONG!!!" + ) conversation = [example, greeting] + user.history @@ -591,17 +637,31 @@ def prepare_text(self, original_text, user_language="en", direction="to_user"): if self.model_lang != user_language: try: if direction == "to_model": - text = Translator(source=user_language, target=self.model_lang).translate(text) + text = Translator( + source=user_language, target=self.model_lang + ).translate(text) elif direction == "to_user": - text = Translator(source=self.model_lang, target=user_language).translate(text) + text = Translator( + source=self.model_lang, target=user_language + ).translate(text) except Exception as e: text = "can't translate text:" + str(text) logging.error("translator_error" + str(e)) # Add HTML tags and other... if direction not in ["to_model", "no_html"]: - text = text.replace("#", "#").replace("<", "<").replace(">", ">") - original_text = original_text.replace("#", "#").replace("<", "<").replace(">", ">") - if self.model_lang != user_language and direction == "to_user" and self.translation_as_hidden_text == "on": + text = ( + text.replace("#", "#").replace("<", "<").replace(">", ">") + ) + original_text = ( + original_text.replace("#", "#") + .replace("<", "<") + .replace(">", ">") + ) + if ( + self.model_lang != user_language + and direction == "to_user" + and self.translation_as_hidden_text == "on" + ): text = ( self.html_tag[0] + original_text @@ -616,34 +676,46 @@ def prepare_text(self, original_text, user_language="en", direction="to_user"): return text @backoff.on_exception( - backoff.expo, (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), max_time=60 + backoff.expo, + (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), + max_time=60, ) def send_sd_image(self, upd: Update, context: CallbackContext, answer, user_text): chat_id = upd.message.chat.id file_list = self.SdApi.txt_to_image(answer) - answer = answer.replace(self.sd_api_prompt_of.replace("OBJECT", user_text[1:].strip()), "") + answer = answer.replace( + self.sd_api_prompt_of.replace("OBJECT", user_text[1:].strip()), "" + ) for char in ["[", "]", "{", "}", "(", ")", "*", '"', "'"]: answer = answer.replace(char, "") if len(file_list) > 0: for image_path in file_list: if os.path.exists(image_path): with open(image_path, "rb") as image_file: - context.bot.send_photo(caption=answer, chat_id=chat_id, photo=image_file) + context.bot.send_photo( + caption=answer, chat_id=chat_id, photo=image_file + ) os.remove(image_path) @backoff.on_exception( - backoff.expo, (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), max_time=60 + backoff.expo, + (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), + max_time=60, ) def clean_last_message_markup(self, context: CallbackContext, chat_id: int): if chat_id in self.users and len(self.users[chat_id].msg_id) > 0: last_msg = self.users[chat_id].msg_id[-1] try: - context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=last_msg, reply_markup=None) + context.bot.editMessageReplyMarkup( + chat_id=chat_id, message_id=last_msg, reply_markup=None + ) except Exception as exception: logging.error("last_message_markup_clean: " + str(exception)) @backoff.on_exception( - backoff.expo, (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), max_time=60 + backoff.expo, + (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), + max_time=60, ) def send(self, context: CallbackContext, chat_id: int, text: str): user = self.users[chat_id] @@ -661,7 +733,9 @@ def send(self, context: CallbackContext, chat_id: int, text: str): audio_text = ":".join(text.split(":")[1:]) else: audio_text = text - audio_path = self.silero.get_audio(text=audio_text, user_id=chat_id, user=user) + audio_path = self.silero.get_audio( + text=audio_text, user_id=chat_id, user=user + ) if audio_path is not None: with open(audio_path, "rb") as audio: message = context.bot.send_audio( @@ -683,7 +757,9 @@ def send(self, context: CallbackContext, chat_id: int, text: str): return message @backoff.on_exception( - backoff.expo, (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), max_time=60 + backoff.expo, + (urllib3.exceptions.HTTPError, urllib3.exceptions.ConnectTimeoutError), + max_time=60, ) def edit( self, @@ -712,10 +788,14 @@ def edit( audio_text = ":".join(text.split(":")[1:]) else: audio_text = text - audio_path = self.silero.get_audio(text=audio_text, user_id=chat_id, user=user) + audio_path = self.silero.get_audio( + text=audio_text, user_id=chat_id, user=user + ) if audio_path is not None: with open(audio_path, "rb") as audio: - media = InputMediaAudio(media=audio, filename=f"{user.name2}_to_{user.name1}.wav") + media = InputMediaAudio( + media=audio, filename=f"{user.name2}_to_{user.name1}.wav" + ) context.bot.edit_message_media( chat_id=chat_id, media=media, @@ -744,14 +824,21 @@ def thread_get_message(self, upd: Update, context: CallbackContext): # Send "typing" message typing = self.typing_status_start(context, chat_id) try: - if self.check_user_rule(chat_id=chat_id, option=self.GET_MESSAGE) is not True: + if ( + self.check_user_rule(chat_id=chat_id, option=self.GET_MESSAGE) + is not True + ): return False self.init_check_user(chat_id) user = self.users[chat_id] # Generate answer and replace "typing" message with it if user_text not in self.sd_api_prefixes: - user_text = self.prepare_text(user_text, self.users[chat_id].language, "to_model") - answer, system_message = self.generate_answer(user_in=user_text, chat_id=chat_id) + user_text = self.prepare_text( + user_text, self.users[chat_id].language, "to_model" + ) + answer, system_message = self.generate_answer( + user_in=user_text, chat_id=chat_id + ) if system_message == self.MSG_SYSTEM: context.bot.send_message(text=answer, chat_id=chat_id) elif system_message == self.MSG_SD_API: @@ -759,7 +846,9 @@ def thread_get_message(self, upd: Update, context: CallbackContext): self.send_sd_image(upd, context, answer, user_text) else: if system_message == self.MSG_DEL_LAST: - context.bot.deleteMessage(chat_id=chat_id, message_id=user.msg_id[-1]) + context.bot.deleteMessage( + chat_id=chat_id, message_id=user.msg_id[-1] + ) message = self.send(text=answer, chat_id=chat_id, context=context) # Clear buttons on last message (if they exist in current # thread) @@ -831,25 +920,45 @@ def handle_button_option(self, option, chat_id, upd, context): self.show_options_button(upd=upd, context=context) elif option == self.BTN_DELETE and self.check_user_rule(chat_id, option): self.delete_pressed_button(upd=upd, context=context) - elif option.startswith(self.BTN_CHAR_LIST) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_CHAR_LIST) and self.check_user_rule( + chat_id, option + ): self.keyboard_characters_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_CHAR_LOAD) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_CHAR_LOAD) and self.check_user_rule( + chat_id, option + ): self.load_character_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_PRESET_LIST) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_PRESET_LIST) and self.check_user_rule( + chat_id, option + ): self.keyboard_presets_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_PRESET_LOAD) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_PRESET_LOAD) and self.check_user_rule( + chat_id, option + ): self.load_presets_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_MODEL_LIST) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_MODEL_LIST) and self.check_user_rule( + chat_id, option + ): self.keyboard_models_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_MODEL_LOAD) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_MODEL_LOAD) and self.check_user_rule( + chat_id, option + ): self.load_model_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_LANG_LIST) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_LANG_LIST) and self.check_user_rule( + chat_id, option + ): self.keyboard_language_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_LANG_LOAD) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_LANG_LOAD) and self.check_user_rule( + chat_id, option + ): self.load_language_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_VOICE_LIST) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_VOICE_LIST) and self.check_user_rule( + chat_id, option + ): self.keyboard_voice_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_VOICE_LOAD) and self.check_user_rule(chat_id, option): + elif option.startswith(self.BTN_VOICE_LOAD) and self.check_user_rule( + chat_id, option + ): self.load_voice_button(upd=upd, context=context, option=option) def show_options_button(self, upd: Update, context: CallbackContext): @@ -888,7 +997,9 @@ def next_message_button(self, upd: Update, context: CallbackContext): user = self.users[chat_id] # send "typing" self.clean_last_message_markup(context, chat_id) - answer, _ = self.generate_answer(user_in=self.GENERATOR_MODE_NEXT, chat_id=chat_id) + answer, _ = self.generate_answer( + user_in=self.GENERATOR_MODE_NEXT, chat_id=chat_id + ) message = self.send(text=answer, chat_id=chat_id, context=context) self.users[chat_id].msg_id.append(message.message_id) user.save_user_history(chat_id, self.history_dir_path) @@ -898,7 +1009,9 @@ def continue_message_button(self, upd: Update, context: CallbackContext): message = upd.callback_query.message user = self.users[chat_id] # get answer and replace message text! - answer, _ = self.generate_answer(user_in=self.GENERATOR_MODE_CONTINUE, chat_id=chat_id) + answer, _ = self.generate_answer( + user_in=self.GENERATOR_MODE_CONTINUE, chat_id=chat_id + ) self.edit( text=answer, chat_id=chat_id, @@ -1041,7 +1154,9 @@ def load_model_button(self, upd: Update, context: CallbackContext, option: str): ) raise e - def keyboard_models_button(self, upd: Update, context: CallbackContext, option: str): + def keyboard_models_button( + self, upd: Update, context: CallbackContext, option: str + ): if generator_script.get_model_list() is not None: chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message @@ -1091,7 +1206,12 @@ def load_preset(self, preset): if os.path.exists(preset_path): with open(preset_path, "r") as preset_file: for line in preset_file.readlines(): - name, value = line.replace("\n", "").replace("\r", "").replace(": ", "=").split("=") + name, value = ( + line.replace("\n", "") + .replace("\r", "") + .replace(": ", "=") + .split("=") + ) if name in self.generation_params: if type(self.generation_params[name]) == int: self.generation_params[name] = int(float(value)) @@ -1104,7 +1224,9 @@ def load_preset(self, preset): elif type(self.generation_params[name]) == list: self.generation_params[name] = list(value.split(",")) - def keyboard_presets_button(self, upd: Update, context: CallbackContext, option: str): + def keyboard_presets_button( + self, upd: Update, context: CallbackContext, option: str + ): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1125,7 +1247,9 @@ def keyboard_presets_button(self, upd: Update, context: CallbackContext, option: data_load=self.BTN_PRESET_LOAD, keyboard_colum=3, ) - context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons) + context.bot.editMessageReplyMarkup( + chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons + ) def load_character_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1134,11 +1258,17 @@ def load_character_button(self, upd: Update, context: CallbackContext, option: s self.clean_last_message_markup(context, chat_id) self.init_check_user(chat_id) char_file = char_list[char_num] - self.users[chat_id].load_character_file(characters_dir_path=self.characters_dir_path, char_file=char_file) + self.users[chat_id].load_character_file( + characters_dir_path=self.characters_dir_path, char_file=char_file + ) # If there was conversation with this char - load history - self.users[chat_id].find_and_load_user_char_history(chat_id, self.history_dir_path) + self.users[chat_id].find_and_load_user_char_history( + chat_id, self.history_dir_path + ) if len(self.users[chat_id].history) > 0: - send_text = self.make_template_message("hist_loaded", chat_id, self.users[chat_id].history[-1]) + send_text = self.make_template_message( + "hist_loaded", chat_id, self.users[chat_id].history[-1] + ) else: send_text = self.make_template_message("char_loaded", chat_id) context.bot.send_message( @@ -1148,7 +1278,9 @@ def load_character_button(self, upd: Update, context: CallbackContext, option: s reply_markup=self.get_options_keyboard(chat_id), ) - def keyboard_characters_button(self, upd: Update, context: CallbackContext, option: str): + def keyboard_characters_button( + self, upd: Update, context: CallbackContext, option: str + ): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1171,7 +1303,9 @@ def keyboard_characters_button(self, upd: Update, context: CallbackContext, opti data_list=self.BTN_CHAR_LIST, data_load=self.BTN_CHAR_LOAD, ) - context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons) + context.bot.editMessageReplyMarkup( + chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons + ) def load_language_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1192,7 +1326,9 @@ def load_language_button(self, upd: Update, context: CallbackContext, option: st reply_markup=self.get_options_keyboard(chat_id), ) - def keyboard_language_button(self, upd: Update, context: CallbackContext, option: str): + def keyboard_language_button( + self, upd: Update, context: CallbackContext, option: str + ): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1213,7 +1349,9 @@ def keyboard_language_button(self, upd: Update, context: CallbackContext, option data_load=self.BTN_LANG_LOAD, keyboard_colum=4, ) - context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=lang_buttons) + context.bot.editMessageReplyMarkup( + chat_id=chat_id, message_id=msg.message_id, reply_markup=lang_buttons + ) def load_voice_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1262,7 +1400,9 @@ def keyboard_voice_button(self, upd: Update, context: CallbackContext, option: s data_load=self.BTN_VOICE_LOAD, keyboard_colum=4, ) - context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=voice_buttons) + context.bot.editMessageReplyMarkup( + chat_id=chat_id, message_id=msg.message_id, reply_markup=voice_buttons + ) # ============================================================================= # load characters char_file from ./characters @@ -1286,14 +1426,22 @@ def get_options_keyboard(self, chat_id=0): voice = "🔇" if self.check_user_rule(chat_id, self.BTN_DOWNLOAD): - keyboard_raw.append(InlineKeyboardButton(text="💾Save", callback_data=self.BTN_DOWNLOAD)) + keyboard_raw.append( + InlineKeyboardButton(text="💾Save", callback_data=self.BTN_DOWNLOAD) + ) # if self.check_user_rule(chat_id, self.BTN_LORE): # keyboard_raw.append(InlineKeyboardButton( # text="📜Lore", callback_data=self.BTN_LORE)) if self.check_user_rule(chat_id, self.BTN_CHAR_LIST): - keyboard_raw.append(InlineKeyboardButton(text="🎭Chars", callback_data=self.BTN_CHAR_LIST + "-9999")) + keyboard_raw.append( + InlineKeyboardButton( + text="🎭Chars", callback_data=self.BTN_CHAR_LIST + "-9999" + ) + ) if self.check_user_rule(chat_id, self.BTN_RESET): - keyboard_raw.append(InlineKeyboardButton(text="⚠Reset", callback_data=self.BTN_RESET)) + keyboard_raw.append( + InlineKeyboardButton(text="⚠Reset", callback_data=self.BTN_RESET) + ) if self.check_user_rule(chat_id, self.BTN_LANG_LIST): keyboard_raw.append( InlineKeyboardButton( @@ -1302,29 +1450,61 @@ def get_options_keyboard(self, chat_id=0): ) ) if self.check_user_rule(chat_id, self.BTN_VOICE_LIST): - keyboard_raw.append(InlineKeyboardButton(text=voice + "Voice", callback_data=self.BTN_VOICE_LIST + "0")) - if self.check_user_rule(chat_id, self.BTN_PRESET_LIST) and generator_script.generator.preset_change_allowed: - keyboard_raw.append(InlineKeyboardButton(text="🔧Presets", callback_data=self.BTN_PRESET_LIST + "0")) - if self.check_user_rule(chat_id, self.BTN_MODEL_LIST) and generator_script.generator.model_change_allowed: - keyboard_raw.append(InlineKeyboardButton(text="🔨Model", callback_data=self.BTN_MODEL_LIST + "0")) + keyboard_raw.append( + InlineKeyboardButton( + text=voice + "Voice", callback_data=self.BTN_VOICE_LIST + "0" + ) + ) + if ( + self.check_user_rule(chat_id, self.BTN_PRESET_LIST) + and generator_script.generator.preset_change_allowed + ): + keyboard_raw.append( + InlineKeyboardButton( + text="🔧Presets", callback_data=self.BTN_PRESET_LIST + "0" + ) + ) + if ( + self.check_user_rule(chat_id, self.BTN_MODEL_LIST) + and generator_script.generator.model_change_allowed + ): + keyboard_raw.append( + InlineKeyboardButton( + text="🔨Model", callback_data=self.BTN_MODEL_LIST + "0" + ) + ) if self.check_user_rule(chat_id, self.BTN_DELETE): - keyboard_raw.append(InlineKeyboardButton(text="❌Close", callback_data=self.BTN_DELETE)) + keyboard_raw.append( + InlineKeyboardButton(text="❌Close", callback_data=self.BTN_DELETE) + ) return InlineKeyboardMarkup([keyboard_raw]) def get_chat_keyboard(self, chat_id=0): keyboard_raw = [] if self.check_user_rule(chat_id, self.BTN_NEXT): - keyboard_raw.append(InlineKeyboardButton(text="▶Next", callback_data=self.BTN_NEXT)) + keyboard_raw.append( + InlineKeyboardButton(text="▶Next", callback_data=self.BTN_NEXT) + ) if self.check_user_rule(chat_id, self.BTN_CONTINUE): - keyboard_raw.append(InlineKeyboardButton(text="➡Continue", callback_data=self.BTN_CONTINUE)) + keyboard_raw.append( + InlineKeyboardButton(text="➡Continue", callback_data=self.BTN_CONTINUE) + ) if self.check_user_rule(chat_id, self.BTN_DEL_WORD): - keyboard_raw.append(InlineKeyboardButton(text="⬅Del word", callback_data=self.BTN_DEL_WORD)) + keyboard_raw.append( + InlineKeyboardButton(text="⬅Del word", callback_data=self.BTN_DEL_WORD) + ) if self.check_user_rule(chat_id, self.BTN_REGEN): - keyboard_raw.append(InlineKeyboardButton(text="♻Regenerate", callback_data=self.BTN_REGEN)) + keyboard_raw.append( + InlineKeyboardButton(text="♻Regenerate", callback_data=self.BTN_REGEN) + ) if self.check_user_rule(chat_id, self.BTN_CUTOFF): - keyboard_raw.append(InlineKeyboardButton(text="✖Cutoff", callback_data=self.BTN_CUTOFF)) + keyboard_raw.append( + InlineKeyboardButton(text="✖Cutoff", callback_data=self.BTN_CUTOFF) + ) if self.check_user_rule(chat_id, self.BTN_OPTION): - keyboard_raw.append(InlineKeyboardButton(text="⚙Options", callback_data=self.BTN_OPTION)) + keyboard_raw.append( + InlineKeyboardButton(text="⚙Options", callback_data=self.BTN_OPTION) + ) return InlineKeyboardMarkup([keyboard_raw]) def get_switch_keyboard( @@ -1355,7 +1535,9 @@ def get_switch_keyboard( if column >= keyboard_colum: column = 0 characters_buttons[-1].append( - InlineKeyboardButton(text=f"{opt_list[i]}", callback_data=f"{data_load}{str(i)}") + InlineKeyboardButton( + text=f"{opt_list[i]}", callback_data=f"{data_load}{str(i)}" + ) ) i += 1 # add switch buttons From 6842e53f0ae8b0aa314816b4cb1c643698242f7f Mon Sep 17 00:00:00 2001 From: innightwolfsleep Date: Wed, 13 Sep 2023 18:15:00 +0600 Subject: [PATCH 4/4] running on-click scripts --- generators/generator_langchain_llama_cpp.py | 13 +- generators/generator_llama_cpp.py | 17 +- generators/generator_text_generator_webui.py | 44 +-- telegram_bot_generator.py | 12 +- telegram_bot_sd_api.py | 4 +- telegram_bot_silero.py | 44 +-- telegram_bot_user.py | 32 +- telegram_bot_wrapper.py | 330 +++++-------------- 8 files changed, 112 insertions(+), 384 deletions(-) diff --git a/generators/generator_langchain_llama_cpp.py b/generators/generator_langchain_llama_cpp.py index 4c2ba8a..23f5230 100644 --- a/generators/generator_langchain_llama_cpp.py +++ b/generators/generator_langchain_llama_cpp.py @@ -27,14 +27,7 @@ def __init__(self, model_path, n_ctx=2048, seed=0, n_gpu_layers=0): ) def get_answer( - self, - prompt, - generation_params, - eos_token, - stopping_strings, - default_answer, - turn_template="", - **kwargs + self, prompt, generation_params, eos_token, stopping_strings, default_answer, turn_template="", **kwargs ): if "max_tokens" in generation_params: llm.max_tokens = generation_params["max_tokens"] @@ -44,9 +37,7 @@ def get_answer( llm.top_p = generation_params["top_p"] if "top_k" in generation_params: llm.top_k = generation_params["top_k"] - prompt_template = PromptTemplate( - template="{prompt}", input_variables=["prompt"] - ) + prompt_template = PromptTemplate(template="{prompt}", input_variables=["prompt"]) llm.stop = stopping_strings llm_chain = LLMChain(prompt=prompt_template, llm=self.llm) answer = llm_chain.run(prompt) diff --git a/generators/generator_llama_cpp.py b/generators/generator_llama_cpp.py index 33fb479..fbbd97e 100644 --- a/generators/generator_llama_cpp.py +++ b/generators/generator_llama_cpp.py @@ -13,19 +13,10 @@ def __init__(self, model_path: str, n_ctx=4096, seed=0, n_gpu_layers=0): self.seed = seed self.n_gpu_layers = n_gpu_layers print(n_gpu_layers) - self.llm = Llama( - model_path=model_path, n_ctx=n_ctx, seed=seed, n_gpu_layers=n_gpu_layers - ) + self.llm = Llama(model_path=model_path, n_ctx=n_ctx, seed=seed, n_gpu_layers=n_gpu_layers) def get_answer( - self, - prompt, - generation_params, - eos_token, - stopping_strings, - default_answer: str, - turn_template="", - **kwargs + self, prompt, generation_params, eos_token, stopping_strings, default_answer: str, turn_template="", **kwargs ): # Preparing, add stopping_strings answer = default_answer @@ -58,6 +49,4 @@ def get_model_list(self): def load_model(self, model_file: str): with open("models\\" + model_file, "r") as model: - self.llm: Llama = Llama( - model_path=model.read(), n_ctx=self.n_ctx, seed=self.seed - ) + self.llm: Llama = Llama(model_path=model.read(), n_ctx=self.n_ctx, seed=self.seed) diff --git a/generators/generator_text_generator_webui.py b/generators/generator_text_generator_webui.py index a49b115..889c2fc 100644 --- a/generators/generator_text_generator_webui.py +++ b/generators/generator_text_generator_webui.py @@ -15,52 +15,30 @@ def __init__(self, model_path="", n_ctx=2048, n_gpu_layers=0): pass @staticmethod - def get_answer( - prompt, - generation_params, - eos_token, - stopping_strings, - default_answer, - turn_template="", - **kwargs - ): + def get_answer(prompt, generation_params, eos_token, stopping_strings, default_answer, turn_template="", **kwargs): generation_params.update({"turn_template": turn_template}) generation_params.update( { "stream": False, "max_new_tokens": int( - generation_params.get( - "max_new_tokens", generation_params.get("max_length", 200) - ) + generation_params.get("max_new_tokens", generation_params.get("max_length", 200)) ), "do_sample": bool(generation_params.get("do_sample", True)), "temperature": float(generation_params.get("temperature", 0.5)), "top_p": float(generation_params.get("top_p", 1)), - "typical_p": float( - generation_params.get( - "typical_p", generation_params.get("typical", 1) - ) - ), + "typical_p": float(generation_params.get("typical_p", generation_params.get("typical", 1))), "epsilon_cutoff": float(generation_params.get("epsilon_cutoff", 0)), "eta_cutoff": float(generation_params.get("eta_cutoff", 0)), "tfs": float(generation_params.get("tfs", 1)), "top_a": float(generation_params.get("top_a", 0)), "repetition_penalty": float( - generation_params.get( - "repetition_penalty", generation_params.get("rep_pen", 1.1) - ) - ), - "repetition_penalty_range": int( - generation_params.get("repetition_penalty_range", 0) - ), - "encoder_repetition_penalty": float( - generation_params.get("encoder_repetition_penalty", 1.0) + generation_params.get("repetition_penalty", generation_params.get("rep_pen", 1.1)) ), + "repetition_penalty_range": int(generation_params.get("repetition_penalty_range", 0)), + "encoder_repetition_penalty": float(generation_params.get("encoder_repetition_penalty", 1.0)), "top_k": int(generation_params.get("top_k", 0)), "min_length": int(generation_params.get("min_length", 0)), - "no_repeat_ngram_size": int( - generation_params.get("no_repeat_ngram_size", 0) - ), + "no_repeat_ngram_size": int(generation_params.get("no_repeat_ngram_size", 0)), "num_beams": int(generation_params.get("num_beams", 1)), "penalty_alpha": float(generation_params.get("penalty_alpha", 0)), "length_penalty": float(generation_params.get("length_penalty", 1)), @@ -77,16 +55,12 @@ def get_answer( ) ), "ban_eos_token": bool(generation_params.get("ban_eos_token", False)), - "skip_special_tokens": bool( - generation_params.get("skip_special_tokens", True) - ), + "skip_special_tokens": bool(generation_params.get("skip_special_tokens", True)), "custom_stopping_strings": "", # leave this blank "stopping_strings": generation_params.get("stopping_strings", []), } ) - generator = generate_reply( - question=prompt, state=generation_params, stopping_strings=stopping_strings - ) + generator = generate_reply(question=prompt, state=generation_params, stopping_strings=stopping_strings) # This is "bad" implementation of getting answer, should be reworked answer = default_answer for a in generator: diff --git a/telegram_bot_generator.py b/telegram_bot_generator.py index bc7f86b..ce43e29 100644 --- a/telegram_bot_generator.py +++ b/telegram_bot_generator.py @@ -24,9 +24,7 @@ def init(script="GeneratorLlamaCpp", model_path="", n_ctx=4096, n_gpu_layers=0): n_gpu_layers: n_gpu_layers for llama """ try: - generator_class = getattr( - importlib.import_module("generators." + script), "Generator" - ) + generator_class = getattr(importlib.import_module("generators." + script), "Generator") except ImportError: generator_class = getattr( importlib.import_module("extensions.telegram_bot.generators." + script), @@ -37,13 +35,7 @@ def init(script="GeneratorLlamaCpp", model_path="", n_ctx=4096, n_gpu_layers=0): def get_answer( - prompt, - generation_params, - eos_token, - stopping_strings, - default_answer: str, - turn_template="", - **kwargs + prompt, generation_params, eos_token, stopping_strings, default_answer: str, turn_template="", **kwargs ) -> str: """Generate and return answer string. diff --git a/telegram_bot_sd_api.py b/telegram_bot_sd_api.py index dc6f046..f7fabaa 100644 --- a/telegram_bot_sd_api.py +++ b/telegram_bot_sd_api.py @@ -31,9 +31,7 @@ def txt_to_image(self, prompt: str): image = Image.open(io.BytesIO(base64.b64decode(i.split(",", 1)[0]))) png_payload = {"image": "data:image/png;base64," + i} - response2 = requests.post( - url=f"{self.url}/sdapi/v1/png-info", json=png_payload - ) + response2 = requests.post(url=f"{self.url}/sdapi/v1/png-info", json=png_payload) output_file = str(random.random()) + ".png" png_info = PngImagePlugin.PngInfo() png_info.add_text("parameters", response2.json().get("info")) diff --git a/telegram_bot_silero.py b/telegram_bot_silero.py index c584805..4735c3c 100644 --- a/telegram_bot_silero.py +++ b/telegram_bot_silero.py @@ -92,21 +92,11 @@ def get_audio(self, text: str, user_id: int, user: User): if user.silero_speaker == "None" or user.silero_model_id == "None": return None if user.silero_speaker == "None" or user.silero_model_id == "None": - user.silero_model_id, user.silero_speaker = self.get_default_audio_settings( - user.language - ) - if ( - user.silero_speaker - not in self.voices[user.language]["male"] - + self.voices[user.language]["female"] - ): - user.silero_model_id, user.silero_speaker = self.get_default_audio_settings( - user.language - ) + user.silero_model_id, user.silero_speaker = self.get_default_audio_settings(user.language) + if user.silero_speaker not in self.voices[user.language]["male"] + self.voices[user.language]["female"]: + user.silero_model_id, user.silero_speaker = self.get_default_audio_settings(user.language) if user.silero_model_id not in self.voices[user.language]["model"]: - user.silero_model_id, user.silero_speaker = self.get_default_audio_settings( - user.language - ) + user.silero_model_id, user.silero_speaker = self.get_default_audio_settings(user.language) try: model, _ = torch.hub.load( @@ -142,9 +132,7 @@ def preprocess(self, string): # For example, you need to remove the commas in numbers before expanding them string = self.remove_surrounded_chars(string) string = string.replace('"', "") - string = string.replace("\u201D", "").replace( - "\u201C", "" - ) # right and left quote + string = string.replace("\u201D", "").replace("\u201C", "") # right and left quote string = string.replace("\u201F", "") # italic looking quote string = string.replace("\n", " ") string = string.replace("*", " ! ") @@ -192,11 +180,7 @@ def convert_num_locale(text): start = match.start() end = match.end() - result = ( - result[0:start] - + result[start:end].replace(".", "").replace(",", ".") - + result[end : len(result)] - ) + result = result[0:start] + result[start:end].replace(".", "").replace(",", ".") + result[end : len(result)] # removes comma separators from existing American numbers pattern = re.compile(r"(\d),(\d)") @@ -262,19 +246,13 @@ def replace_abbreviations(self, string): start = match.start() end = match.end() - result = ( - result[0:start] - + self.replace_abbreviation(result[start:end]) - + result[end : len(result)] - ) + result = result[0:start] + self.replace_abbreviation(result[start:end]) + result[end : len(result)] return result def replace_lowercase_abbreviations(self, string): # abbreviations 1 to 4 characters long, separated by dots i.e. e.g. - pattern = re.compile( - rf"(^|[\s(.\'\[<])(([a-z]\.){{1,4}})({self.punctuation}|$)" - ) + pattern = re.compile(rf"(^|[\s(.\'\[<])(([a-z]\.){{1,4}})({self.punctuation}|$)") result = string while True: match = pattern.search(result) @@ -283,11 +261,7 @@ def replace_lowercase_abbreviations(self, string): start = match.start() end = match.end() - result = ( - result[0:start] - + self.replace_abbreviation(result[start:end].upper()) - + result[end : len(result)] - ) + result = result[0:start] + self.replace_abbreviation(result[start:end].upper()) + result[end : len(result)] return result diff --git a/telegram_bot_user.py b/telegram_bot_user.py index 66e877f..b750034 100644 --- a/telegram_bot_user.py +++ b/telegram_bot_user.py @@ -52,12 +52,8 @@ def __init__( self.silero_speaker: str = silero_speaker self.silero_model_id: str = silero_model_id self.turn_template: str = turn_template - self.user_in: list = ( - [] - ) # "user input history": [["Hi!","Who are you?"]], need for regenerate option - self.history: list = ( - [] - ) # "history": [["Hi!", "Hi there!","Who are you?", "I am you assistant."]], + self.user_in: list = [] # "user input history": [["Hi!","Who are you?"]], need for regenerate option + self.history: list = [] # "history": [["Hi!", "Hi there!","Who are you?", "I am you assistant."]], self.msg_id: list = [] # "msg_id": [143, 144, 145, 146], self.greeting: str = greeting @@ -136,15 +132,9 @@ def from_json(self, json_data: str): self.context = data["context"] if "context" in data else "" self.example = data["example"] if "example" in data else "" self.language = data["language"] if "language" in data else "en" - self.silero_speaker = ( - data["silero_speaker"] if "silero_speaker" in data else "None" - ) - self.silero_model_id = ( - data["silero_model_id"] if "silero_model_id" in data else "None" - ) - self.turn_template = ( - data["turn_template"] if "turn_template" in data else "" - ) + self.silero_speaker = data["silero_speaker"] if "silero_speaker" in data else "None" + self.silero_model_id = data["silero_model_id"] if "silero_model_id" in data else "None" + self.turn_template = data["turn_template"] if "turn_template" in data else "" self.user_in = data["user_in"] self.history = data["history"] self.msg_id = data["msg_id"] @@ -192,9 +182,7 @@ def load_character_file(self, characters_dir_path: str, char_file: str): self.turn_template = data["turn_template"] self.context = "" if "char_persona" in data: - self.context += ( - f"{self.name2}'s Persona: {data['char_persona'].strip()}\n" - ) + self.context += f"{self.name2}'s Persona: {data['char_persona'].strip()}\n" if "context" in data: self.context += f"{data['context'].strip()}\n" if "world_scenario" in data: @@ -242,12 +230,8 @@ def find_and_load_user_char_history(self, chat_id, history_dir_path: str): True user history found and loaded, otherwise False """ chat_id = str(chat_id) - user_char_history_path = ( - f"{history_dir_path}/{str(chat_id)}{self.char_file}.json" - ) - user_char_history_old_path = ( - f"{history_dir_path}/{str(chat_id)}{self.name2}.json" - ) + user_char_history_path = f"{history_dir_path}/{str(chat_id)}{self.char_file}.json" + user_char_history_old_path = f"{history_dir_path}/{str(chat_id)}{self.name2}.json" if exists(user_char_history_path): return self.load_user_history(user_char_history_path) elif exists(user_char_history_old_path): diff --git a/telegram_bot_wrapper.py b/telegram_bot_wrapper.py index e243649..f970e97 100644 --- a/telegram_bot_wrapper.py +++ b/telegram_bot_wrapper.py @@ -100,9 +100,7 @@ class TelegramBotWrapper: "📽", ] sd_api_prompt_of = "Detailed description of OBJECT:" - sd_api_prompt_self = ( - "Detailed description of appearance, surroundings and what doing right now: " - ) + sd_api_prompt_self = "Detailed description of appearance, surroundings and what doing right now: " # Language list language_dict = { "en": "🇬🇧", @@ -186,9 +184,7 @@ def __init__( with open(self.user_rules_file_path, "r") as user_rules_file: self.user_rules = json.loads(user_rules_file.read()) else: - logging.error( - "Cant find user_rules_file_path: " + self.user_rules_file_path - ) + logging.error("Cant find user_rules_file_path: " + self.user_rules_file_path) self.user_rules = {} # Silero initiate self.silero = Silero() @@ -208,48 +204,28 @@ def load_config_file(self, config_file_path: str): with open(config_file_path, "r") as config_file_path: config = json.loads(config_file_path.read()) self.bot_mode = config.get("bot_mode", self.bot_mode) - self.generator_script = config.get( - "generator_script", self.generator_script - ) + self.generator_script = config.get("generator_script", self.generator_script) self.model_path = config.get("model_path", self.model_path) self.default_preset = config.get("default_preset", self.default_preset) self.default_char = config.get("default_char", self.default_char) self.model_lang = config.get("model_lang", self.model_lang) self.user_lang = config.get("user_lang", self.user_lang) - self.characters_dir_path = config.get( - "characters_dir_path", self.characters_dir_path - ) - self.presets_dir_path = config.get( - "presets_dir_path", self.presets_dir_path - ) - self.history_dir_path = config.get( - "history_dir_path", self.history_dir_path - ) - self.token_file_path = config.get( - "token_file_path", self.token_file_path - ) - self.admins_file_path = config.get( - "admins_file_path", self.admins_file_path - ) - self.users_file_path = config.get( - "users_file_path", self.users_file_path - ) + self.characters_dir_path = config.get("characters_dir_path", self.characters_dir_path) + self.presets_dir_path = config.get("presets_dir_path", self.presets_dir_path) + self.history_dir_path = config.get("history_dir_path", self.history_dir_path) + self.token_file_path = config.get("token_file_path", self.token_file_path) + self.admins_file_path = config.get("admins_file_path", self.admins_file_path) + self.users_file_path = config.get("users_file_path", self.users_file_path) self.generator_params_file_path = config.get( "generator_params_file_path", self.generator_params_file_path ) - self.user_rules_file_path = config.get( - "user_rules_file_path", self.user_rules_file_path - ) + self.user_rules_file_path = config.get("user_rules_file_path", self.user_rules_file_path) self.sd_api_url = config.get("sd_api_url", self.sd_api_url) - self.sd_config_file_path = config.get( - "sd_config_file_path", self.sd_config_file_path - ) + self.sd_config_file_path = config.get("sd_config_file_path", self.sd_config_file_path) self.translation_as_hidden_text = config.get( "translation_as_hidden_text", self.translation_as_hidden_text ) - self.stopping_strings = config.get( - "stopping_strings", self.stopping_strings - ) + self.stopping_strings = config.get("stopping_strings", self.stopping_strings) self.eos_token = config.get("eos_token", self.eos_token) else: logging.error("Cant find config_file " + config_file_path) @@ -268,12 +244,8 @@ def run_telegram_bot(self, bot_token="", token_file_name=""): with open(token_file_name, "r", encoding="utf-8") as f: bot_token = f.read().strip() self.updater = Updater(token=bot_token, use_context=True) - self.updater.dispatcher.add_handler( - CommandHandler("start", self.cb_start_command) - ), - self.updater.dispatcher.add_handler( - MessageHandler(Filters.text, self.cb_get_message) - ) + self.updater.dispatcher.add_handler(CommandHandler("start", self.cb_start_command)), + self.updater.dispatcher.add_handler(MessageHandler(Filters.text, self.cb_get_message)) self.updater.dispatcher.add_handler( MessageHandler( Filters.document.mime_type("application/json"), @@ -288,9 +260,7 @@ def run_telegram_bot(self, bot_token="", token_file_name=""): def no_sleep_callback(self): while True: try: - self.updater.bot.send_message( - chat_id=99999999999, text="One message every minute" - ) + self.updater.bot.send_message(chat_id=99999999999, text="One message every minute") except BadRequest: pass except Exception as error: @@ -326,9 +296,7 @@ def thread_welcome_message(self, upd: Update, context: CallbackContext): parse_mode="HTML", ) - def make_template_message( - self, request: str, chat_id: int, custom_string="" - ) -> str: + def make_template_message(self, request: str, chat_id: int, custom_string="") -> str: # create a message using default_messages_template or return # UNKNOWN_TEMPLATE if chat_id in self.users: @@ -379,21 +347,15 @@ def init_check_user(self, chat_id): characters_dir_path=self.characters_dir_path, char_file=self.default_char, ) - self.users[chat_id].load_user_history( - f"{self.history_dir_path}/{str(chat_id)}.json" - ) - self.users[chat_id].find_and_load_user_char_history( - chat_id, self.history_dir_path - ) + self.users[chat_id].load_user_history(f"{self.history_dir_path}/{str(chat_id)}.json") + self.users[chat_id].find_and_load_user_char_history(chat_id, self.history_dir_path) def thread_get_json_document(self, upd: Update, context: CallbackContext): chat_id = upd.message.chat.id if not self.check_user_permission(chat_id): return False self.init_check_user(chat_id) - default_user_file_path = str( - Path(f"{self.history_dir_path}/{str(chat_id)}.json") - ) + default_user_file_path = str(Path(f"{self.history_dir_path}/{str(chat_id)}.json")) with open(default_user_file_path, "wb") as f: context.bot.get_file(upd.message.document.file_id).download(out=f) self.users[chat_id].load_user_history(default_user_file_path) @@ -412,14 +374,10 @@ def thread_get_json_document(self, upd: Update, context: CallbackContext): def typing_status_start(self, context: CallbackContext, chat_id: int) -> Event: typing_active = Event() typing_active.set() - Thread( - target=self.thread_typing_status, args=(context, chat_id, typing_active) - ).start() + Thread(target=self.thread_typing_status, args=(context, chat_id, typing_active)).start() return typing_active - def thread_typing_status( - self, context: CallbackContext, chat_id: int, typing_active: Event - ): + def thread_typing_status(self, context: CallbackContext, chat_id: int, typing_active: Event): limit_counter = int(self.generation_timeout / 6) while typing_active.is_set() and limit_counter > 0: context.bot.send_chat_action(chat_id=chat_id, action=CHATACTION_TYPING) @@ -530,9 +488,7 @@ def generate_answer(self, user_in, chat_id) -> Tuple[str, str]: else: user.user_in.append(user_in) user.history.append("") - user.history.append( - self.sd_api_prompt_of.replace("OBJECT", user_in[1:].strip()) - ) + user.history.append(self.sd_api_prompt_of.replace("OBJECT", user_in[1:].strip())) return_msg_action = self.MSG_SD_API elif user_in[0] in self.impersonate_prefixes: # If user_in starts with prefix - impersonate-like (if you try to get "impersonate view") @@ -586,9 +542,7 @@ def generate_answer(self, user_in, chat_id) -> Tuple[str, str]: available_len -= context_len if available_len < 0: available_len = 0 - logging.info( - "telegram_bot: " + str(chat_id) + " - CONTEXT IS TOO LONG!!!" - ) + logging.info("telegram_bot: " + str(chat_id) + " - CONTEXT IS TOO LONG!!!") conversation = [example, greeting] + user.history @@ -637,31 +591,17 @@ def prepare_text(self, original_text, user_language="en", direction="to_user"): if self.model_lang != user_language: try: if direction == "to_model": - text = Translator( - source=user_language, target=self.model_lang - ).translate(text) + text = Translator(source=user_language, target=self.model_lang).translate(text) elif direction == "to_user": - text = Translator( - source=self.model_lang, target=user_language - ).translate(text) + text = Translator(source=self.model_lang, target=user_language).translate(text) except Exception as e: text = "can't translate text:" + str(text) logging.error("translator_error" + str(e)) # Add HTML tags and other... if direction not in ["to_model", "no_html"]: - text = ( - text.replace("#", "#").replace("<", "<").replace(">", ">") - ) - original_text = ( - original_text.replace("#", "#") - .replace("<", "<") - .replace(">", ">") - ) - if ( - self.model_lang != user_language - and direction == "to_user" - and self.translation_as_hidden_text == "on" - ): + text = text.replace("#", "#").replace("<", "<").replace(">", ">") + original_text = original_text.replace("#", "#").replace("<", "<").replace(">", ">") + if self.model_lang != user_language and direction == "to_user" and self.translation_as_hidden_text == "on": text = ( self.html_tag[0] + original_text @@ -683,18 +623,14 @@ def prepare_text(self, original_text, user_language="en", direction="to_user"): def send_sd_image(self, upd: Update, context: CallbackContext, answer, user_text): chat_id = upd.message.chat.id file_list = self.SdApi.txt_to_image(answer) - answer = answer.replace( - self.sd_api_prompt_of.replace("OBJECT", user_text[1:].strip()), "" - ) + answer = answer.replace(self.sd_api_prompt_of.replace("OBJECT", user_text[1:].strip()), "") for char in ["[", "]", "{", "}", "(", ")", "*", '"', "'"]: answer = answer.replace(char, "") if len(file_list) > 0: for image_path in file_list: if os.path.exists(image_path): with open(image_path, "rb") as image_file: - context.bot.send_photo( - caption=answer, chat_id=chat_id, photo=image_file - ) + context.bot.send_photo(caption=answer, chat_id=chat_id, photo=image_file) os.remove(image_path) @backoff.on_exception( @@ -706,9 +642,7 @@ def clean_last_message_markup(self, context: CallbackContext, chat_id: int): if chat_id in self.users and len(self.users[chat_id].msg_id) > 0: last_msg = self.users[chat_id].msg_id[-1] try: - context.bot.editMessageReplyMarkup( - chat_id=chat_id, message_id=last_msg, reply_markup=None - ) + context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=last_msg, reply_markup=None) except Exception as exception: logging.error("last_message_markup_clean: " + str(exception)) @@ -733,9 +667,7 @@ def send(self, context: CallbackContext, chat_id: int, text: str): audio_text = ":".join(text.split(":")[1:]) else: audio_text = text - audio_path = self.silero.get_audio( - text=audio_text, user_id=chat_id, user=user - ) + audio_path = self.silero.get_audio(text=audio_text, user_id=chat_id, user=user) if audio_path is not None: with open(audio_path, "rb") as audio: message = context.bot.send_audio( @@ -788,14 +720,10 @@ def edit( audio_text = ":".join(text.split(":")[1:]) else: audio_text = text - audio_path = self.silero.get_audio( - text=audio_text, user_id=chat_id, user=user - ) + audio_path = self.silero.get_audio(text=audio_text, user_id=chat_id, user=user) if audio_path is not None: with open(audio_path, "rb") as audio: - media = InputMediaAudio( - media=audio, filename=f"{user.name2}_to_{user.name1}.wav" - ) + media = InputMediaAudio(media=audio, filename=f"{user.name2}_to_{user.name1}.wav") context.bot.edit_message_media( chat_id=chat_id, media=media, @@ -824,21 +752,14 @@ def thread_get_message(self, upd: Update, context: CallbackContext): # Send "typing" message typing = self.typing_status_start(context, chat_id) try: - if ( - self.check_user_rule(chat_id=chat_id, option=self.GET_MESSAGE) - is not True - ): + if self.check_user_rule(chat_id=chat_id, option=self.GET_MESSAGE) is not True: return False self.init_check_user(chat_id) user = self.users[chat_id] # Generate answer and replace "typing" message with it if user_text not in self.sd_api_prefixes: - user_text = self.prepare_text( - user_text, self.users[chat_id].language, "to_model" - ) - answer, system_message = self.generate_answer( - user_in=user_text, chat_id=chat_id - ) + user_text = self.prepare_text(user_text, self.users[chat_id].language, "to_model") + answer, system_message = self.generate_answer(user_in=user_text, chat_id=chat_id) if system_message == self.MSG_SYSTEM: context.bot.send_message(text=answer, chat_id=chat_id) elif system_message == self.MSG_SD_API: @@ -846,9 +767,7 @@ def thread_get_message(self, upd: Update, context: CallbackContext): self.send_sd_image(upd, context, answer, user_text) else: if system_message == self.MSG_DEL_LAST: - context.bot.deleteMessage( - chat_id=chat_id, message_id=user.msg_id[-1] - ) + context.bot.deleteMessage(chat_id=chat_id, message_id=user.msg_id[-1]) message = self.send(text=answer, chat_id=chat_id, context=context) # Clear buttons on last message (if they exist in current # thread) @@ -920,45 +839,25 @@ def handle_button_option(self, option, chat_id, upd, context): self.show_options_button(upd=upd, context=context) elif option == self.BTN_DELETE and self.check_user_rule(chat_id, option): self.delete_pressed_button(upd=upd, context=context) - elif option.startswith(self.BTN_CHAR_LIST) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_CHAR_LIST) and self.check_user_rule(chat_id, option): self.keyboard_characters_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_CHAR_LOAD) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_CHAR_LOAD) and self.check_user_rule(chat_id, option): self.load_character_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_PRESET_LIST) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_PRESET_LIST) and self.check_user_rule(chat_id, option): self.keyboard_presets_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_PRESET_LOAD) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_PRESET_LOAD) and self.check_user_rule(chat_id, option): self.load_presets_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_MODEL_LIST) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_MODEL_LIST) and self.check_user_rule(chat_id, option): self.keyboard_models_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_MODEL_LOAD) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_MODEL_LOAD) and self.check_user_rule(chat_id, option): self.load_model_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_LANG_LIST) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_LANG_LIST) and self.check_user_rule(chat_id, option): self.keyboard_language_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_LANG_LOAD) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_LANG_LOAD) and self.check_user_rule(chat_id, option): self.load_language_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_VOICE_LIST) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_VOICE_LIST) and self.check_user_rule(chat_id, option): self.keyboard_voice_button(upd=upd, context=context, option=option) - elif option.startswith(self.BTN_VOICE_LOAD) and self.check_user_rule( - chat_id, option - ): + elif option.startswith(self.BTN_VOICE_LOAD) and self.check_user_rule(chat_id, option): self.load_voice_button(upd=upd, context=context, option=option) def show_options_button(self, upd: Update, context: CallbackContext): @@ -997,9 +896,7 @@ def next_message_button(self, upd: Update, context: CallbackContext): user = self.users[chat_id] # send "typing" self.clean_last_message_markup(context, chat_id) - answer, _ = self.generate_answer( - user_in=self.GENERATOR_MODE_NEXT, chat_id=chat_id - ) + answer, _ = self.generate_answer(user_in=self.GENERATOR_MODE_NEXT, chat_id=chat_id) message = self.send(text=answer, chat_id=chat_id, context=context) self.users[chat_id].msg_id.append(message.message_id) user.save_user_history(chat_id, self.history_dir_path) @@ -1009,9 +906,7 @@ def continue_message_button(self, upd: Update, context: CallbackContext): message = upd.callback_query.message user = self.users[chat_id] # get answer and replace message text! - answer, _ = self.generate_answer( - user_in=self.GENERATOR_MODE_CONTINUE, chat_id=chat_id - ) + answer, _ = self.generate_answer(user_in=self.GENERATOR_MODE_CONTINUE, chat_id=chat_id) self.edit( text=answer, chat_id=chat_id, @@ -1154,9 +1049,7 @@ def load_model_button(self, upd: Update, context: CallbackContext, option: str): ) raise e - def keyboard_models_button( - self, upd: Update, context: CallbackContext, option: str - ): + def keyboard_models_button(self, upd: Update, context: CallbackContext, option: str): if generator_script.get_model_list() is not None: chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message @@ -1206,12 +1099,7 @@ def load_preset(self, preset): if os.path.exists(preset_path): with open(preset_path, "r") as preset_file: for line in preset_file.readlines(): - name, value = ( - line.replace("\n", "") - .replace("\r", "") - .replace(": ", "=") - .split("=") - ) + name, value = line.replace("\n", "").replace("\r", "").replace(": ", "=").split("=") if name in self.generation_params: if type(self.generation_params[name]) == int: self.generation_params[name] = int(float(value)) @@ -1224,9 +1112,7 @@ def load_preset(self, preset): elif type(self.generation_params[name]) == list: self.generation_params[name] = list(value.split(",")) - def keyboard_presets_button( - self, upd: Update, context: CallbackContext, option: str - ): + def keyboard_presets_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1247,9 +1133,7 @@ def keyboard_presets_button( data_load=self.BTN_PRESET_LOAD, keyboard_colum=3, ) - context.bot.editMessageReplyMarkup( - chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons - ) + context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons) def load_character_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1258,17 +1142,11 @@ def load_character_button(self, upd: Update, context: CallbackContext, option: s self.clean_last_message_markup(context, chat_id) self.init_check_user(chat_id) char_file = char_list[char_num] - self.users[chat_id].load_character_file( - characters_dir_path=self.characters_dir_path, char_file=char_file - ) + self.users[chat_id].load_character_file(characters_dir_path=self.characters_dir_path, char_file=char_file) # If there was conversation with this char - load history - self.users[chat_id].find_and_load_user_char_history( - chat_id, self.history_dir_path - ) + self.users[chat_id].find_and_load_user_char_history(chat_id, self.history_dir_path) if len(self.users[chat_id].history) > 0: - send_text = self.make_template_message( - "hist_loaded", chat_id, self.users[chat_id].history[-1] - ) + send_text = self.make_template_message("hist_loaded", chat_id, self.users[chat_id].history[-1]) else: send_text = self.make_template_message("char_loaded", chat_id) context.bot.send_message( @@ -1278,9 +1156,7 @@ def load_character_button(self, upd: Update, context: CallbackContext, option: s reply_markup=self.get_options_keyboard(chat_id), ) - def keyboard_characters_button( - self, upd: Update, context: CallbackContext, option: str - ): + def keyboard_characters_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1303,9 +1179,7 @@ def keyboard_characters_button( data_list=self.BTN_CHAR_LIST, data_load=self.BTN_CHAR_LOAD, ) - context.bot.editMessageReplyMarkup( - chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons - ) + context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=characters_buttons) def load_language_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1326,9 +1200,7 @@ def load_language_button(self, upd: Update, context: CallbackContext, option: st reply_markup=self.get_options_keyboard(chat_id), ) - def keyboard_language_button( - self, upd: Update, context: CallbackContext, option: str - ): + def keyboard_language_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id msg = upd.callback_query.message # if "return char markup" button - clear markup @@ -1349,9 +1221,7 @@ def keyboard_language_button( data_load=self.BTN_LANG_LOAD, keyboard_colum=4, ) - context.bot.editMessageReplyMarkup( - chat_id=chat_id, message_id=msg.message_id, reply_markup=lang_buttons - ) + context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=lang_buttons) def load_voice_button(self, upd: Update, context: CallbackContext, option: str): chat_id = upd.callback_query.message.chat.id @@ -1400,9 +1270,7 @@ def keyboard_voice_button(self, upd: Update, context: CallbackContext, option: s data_load=self.BTN_VOICE_LOAD, keyboard_colum=4, ) - context.bot.editMessageReplyMarkup( - chat_id=chat_id, message_id=msg.message_id, reply_markup=voice_buttons - ) + context.bot.editMessageReplyMarkup(chat_id=chat_id, message_id=msg.message_id, reply_markup=voice_buttons) # ============================================================================= # load characters char_file from ./characters @@ -1426,22 +1294,14 @@ def get_options_keyboard(self, chat_id=0): voice = "🔇" if self.check_user_rule(chat_id, self.BTN_DOWNLOAD): - keyboard_raw.append( - InlineKeyboardButton(text="💾Save", callback_data=self.BTN_DOWNLOAD) - ) + keyboard_raw.append(InlineKeyboardButton(text="💾Save", callback_data=self.BTN_DOWNLOAD)) # if self.check_user_rule(chat_id, self.BTN_LORE): # keyboard_raw.append(InlineKeyboardButton( # text="📜Lore", callback_data=self.BTN_LORE)) if self.check_user_rule(chat_id, self.BTN_CHAR_LIST): - keyboard_raw.append( - InlineKeyboardButton( - text="🎭Chars", callback_data=self.BTN_CHAR_LIST + "-9999" - ) - ) + keyboard_raw.append(InlineKeyboardButton(text="🎭Chars", callback_data=self.BTN_CHAR_LIST + "-9999")) if self.check_user_rule(chat_id, self.BTN_RESET): - keyboard_raw.append( - InlineKeyboardButton(text="⚠Reset", callback_data=self.BTN_RESET) - ) + keyboard_raw.append(InlineKeyboardButton(text="⚠Reset", callback_data=self.BTN_RESET)) if self.check_user_rule(chat_id, self.BTN_LANG_LIST): keyboard_raw.append( InlineKeyboardButton( @@ -1450,61 +1310,29 @@ def get_options_keyboard(self, chat_id=0): ) ) if self.check_user_rule(chat_id, self.BTN_VOICE_LIST): - keyboard_raw.append( - InlineKeyboardButton( - text=voice + "Voice", callback_data=self.BTN_VOICE_LIST + "0" - ) - ) - if ( - self.check_user_rule(chat_id, self.BTN_PRESET_LIST) - and generator_script.generator.preset_change_allowed - ): - keyboard_raw.append( - InlineKeyboardButton( - text="🔧Presets", callback_data=self.BTN_PRESET_LIST + "0" - ) - ) - if ( - self.check_user_rule(chat_id, self.BTN_MODEL_LIST) - and generator_script.generator.model_change_allowed - ): - keyboard_raw.append( - InlineKeyboardButton( - text="🔨Model", callback_data=self.BTN_MODEL_LIST + "0" - ) - ) + keyboard_raw.append(InlineKeyboardButton(text=voice + "Voice", callback_data=self.BTN_VOICE_LIST + "0")) + if self.check_user_rule(chat_id, self.BTN_PRESET_LIST) and generator_script.generator.preset_change_allowed: + keyboard_raw.append(InlineKeyboardButton(text="🔧Presets", callback_data=self.BTN_PRESET_LIST + "0")) + if self.check_user_rule(chat_id, self.BTN_MODEL_LIST) and generator_script.generator.model_change_allowed: + keyboard_raw.append(InlineKeyboardButton(text="🔨Model", callback_data=self.BTN_MODEL_LIST + "0")) if self.check_user_rule(chat_id, self.BTN_DELETE): - keyboard_raw.append( - InlineKeyboardButton(text="❌Close", callback_data=self.BTN_DELETE) - ) + keyboard_raw.append(InlineKeyboardButton(text="❌Close", callback_data=self.BTN_DELETE)) return InlineKeyboardMarkup([keyboard_raw]) def get_chat_keyboard(self, chat_id=0): keyboard_raw = [] if self.check_user_rule(chat_id, self.BTN_NEXT): - keyboard_raw.append( - InlineKeyboardButton(text="▶Next", callback_data=self.BTN_NEXT) - ) + keyboard_raw.append(InlineKeyboardButton(text="▶Next", callback_data=self.BTN_NEXT)) if self.check_user_rule(chat_id, self.BTN_CONTINUE): - keyboard_raw.append( - InlineKeyboardButton(text="➡Continue", callback_data=self.BTN_CONTINUE) - ) + keyboard_raw.append(InlineKeyboardButton(text="➡Continue", callback_data=self.BTN_CONTINUE)) if self.check_user_rule(chat_id, self.BTN_DEL_WORD): - keyboard_raw.append( - InlineKeyboardButton(text="⬅Del word", callback_data=self.BTN_DEL_WORD) - ) + keyboard_raw.append(InlineKeyboardButton(text="⬅Del word", callback_data=self.BTN_DEL_WORD)) if self.check_user_rule(chat_id, self.BTN_REGEN): - keyboard_raw.append( - InlineKeyboardButton(text="♻Regenerate", callback_data=self.BTN_REGEN) - ) + keyboard_raw.append(InlineKeyboardButton(text="♻Regenerate", callback_data=self.BTN_REGEN)) if self.check_user_rule(chat_id, self.BTN_CUTOFF): - keyboard_raw.append( - InlineKeyboardButton(text="✖Cutoff", callback_data=self.BTN_CUTOFF) - ) + keyboard_raw.append(InlineKeyboardButton(text="✖Cutoff", callback_data=self.BTN_CUTOFF)) if self.check_user_rule(chat_id, self.BTN_OPTION): - keyboard_raw.append( - InlineKeyboardButton(text="⚙Options", callback_data=self.BTN_OPTION) - ) + keyboard_raw.append(InlineKeyboardButton(text="⚙Options", callback_data=self.BTN_OPTION)) return InlineKeyboardMarkup([keyboard_raw]) def get_switch_keyboard( @@ -1535,9 +1363,7 @@ def get_switch_keyboard( if column >= keyboard_colum: column = 0 characters_buttons[-1].append( - InlineKeyboardButton( - text=f"{opt_list[i]}", callback_data=f"{data_load}{str(i)}" - ) + InlineKeyboardButton(text=f"{opt_list[i]}", callback_data=f"{data_load}{str(i)}") ) i += 1 # add switch buttons