# Проект

Описание бота...

In [None]:
import pandas as pd
from tinydb import TinyDB, Query
import os
import datetime
import warnings
import pandas_datareader as pdr
import requests_cache
from dateutil.relativedelta import relativedelta
from pandas.tseries.offsets import CustomBusinessDay
import telebot
from telebot import types
from telebot.types import ReplyKeyboardMarkup
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import tempfile

In [None]:
telegram_api_key = 'TELEGRAM_API_KEY'
db_path = "./data/tickers.json"

In [None]:
def setup_db(db_path: str):
    """
    Creates the directory for the database file if it doesn't exist.

    Args:
        db_path (str): The path to the database file.
    """
    os.makedirs(os.path.dirname(db_path), exist_ok=True)


def store_tickers(db_path: str, chat_id: int, tickers: pd.Series):
    """
    Stores the tickers associated with a chat ID in the database.

    Args:
        db_path (str): The path to the database file.
        chat_id (int): The ID of the chat.
        tickers (pd.Series): A Pandas Series object containing the tickers.

    """
    db_path = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    User = Query()  # Create a Query object for performing queries on the database
    db_path.upsert({'chat_id': chat_id, "tickers": list(tickers.values)}, User.chat_id == chat_id)
    # Update or insert a new document in the database with chat_id and tickers


def store_time(db_path: str, chat_id: int, time_choice: tuple):
    """
    Stores the time choice associated with a chat ID in the database.

    Args:
        db_path (str): The path to the database file.
        chat_id (int): The ID of the chat.
        time_choice (tuple): A tuple representing the time choice.

    """
    db_path = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    User = Query()  # Create a Query object for performing queries on the database
    db_path.upsert({'chat_id': chat_id, "time": time_choice}, User.chat_id == chat_id)
    # Update or insert a new document in the database with chat_id and time_choice


def get_user_tickers(db_path: str, chat_id: int) -> list:
    """
    Retrieves the tickers associated with a chat ID from the database.

    Args:
        db_path (str): The path to the database file.
        chat_id (int): The ID of the chat.

    Returns:
        list: A list of tickers associated with the chat ID.
    """
    db = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    chat = Query()  # Create a Query object for performing queries on the database
    return db.get(chat.chat_id == chat_id)['tickers']
    # Retrieve the tickers field from the document with the matching chat_id


def set_jobs_dict(db_path: str) -> dict:
    """
    Creates a dictionary of chat IDs and their corresponding time choices from the database.

    Args:
        db_path (str): The path to the database file.

    Returns:
        dict: A dictionary mapping chat IDs to time choices.
    """
    db = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    jobs = {}  # Dictionary to store the chat IDs and time choices
    for user in db.all():
        try:
            jobs[user["chat_id"]] = tuple(user["time"])
            # Add chat ID and time choice as a key-value pair to the jobs dictionary
        except KeyError:
            continue  # Skip users without a time choice field

    return jobs


def get_user_time(db_path: str, chat_id: int) -> tuple:
    """
    Retrieves the time choice associated with a chat ID from the database.

    Args:
        db_path (str): The path to the database file.
        chat_id (int): The ID of the chat.

    Returns:
        tuple: The time choice associated with the chat ID.
    """
    db = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    chat = Query()  # Create a Query object for performing queries on the database
    return db.get(chat.chat_id == chat_id)['time']
    # Retrieve the time field from the document with the matching chat_id


def get_users(db_path: str) -> list:
    """
    Retrieves a list of chat IDs from the database.

    Args:
        db_path (str): The path to the database file.

    Returns:
        list: A list of chat IDs.
    """
    db = TinyDB(db_path)  # Initialize the TinyDB object with the given database path
    return [x['chat_id'] for x in db.all()]
    # Extract the chat_id field from all documents in the database and return as a list

In [None]:
moex_holidays = [datetime.date(2023, 6, 12), datetime.date(2022, 6, 12), datetime.date(2022, 6, 13),
                 datetime.date(2022, 11, 4), datetime.date(2023, 11, 4)]
moex_bday = CustomBusinessDay(holidays=moex_holidays)


def get_nearest_work_day(today: datetime.datetime) -> datetime.date:
    """
    Returns the nearest working day given a datetime.

    Args:
        today (datetime.datetime): The current datetime.

    Returns:
        datetime.date: The nearest working day.

    """
    if today.date() in moex_holidays or today.weekday() in [5, 6] or today.hour < 24:
        return today - moex_bday
    return today


def price_change(date1: pd.Series, date2: pd.Series) -> pd.Series:
    """
    Calculates the price change percentage between two sets of prices.

    Args:
        date1 (pd.Series): The first set of prices.
        date2 (pd.Series): The second set of prices.

    Returns:
        pd.Series: The price change percentage.

    """
    return ((date1 / date2 - 1) * 100).round(2)


def get_prices(ticker_list: list, date: datetime.date, session: requests_cache.CachedSession) -> pd.Series:
    """
    Retrieves the closing prices of a list of tickers on a specific date.

    Args:
        ticker_list (list): The list of tickers.
        date (datetime.date): The date for which prices are requested.
        session (requests_cache.CachedSession): The requests cache session.

    Returns:
        pd.Series: The closing prices of the tickers.

    """
    with warnings.catch_warnings():
        warnings.simplefilter(action='ignore', category=FutureWarning)
        if requests_cache is None:
            return pdr.get_data_moex(ticker_list, date, date).set_index('SECID')['CLOSE']
        else:
            return pdr.get_data_moex(ticker_list, date, date, session=session).set_index('SECID')['CLOSE']


def moex_counter(ticker_list: list, session: requests_cache.CachedSession = None) -> pd.DataFrame:
    """
    Retrieves price changes for a list of tickers.

    Args:
        ticker_list (list): The list of tickers.
        session (requests_cache.CachedSession, optional): The requests cache session.

    Returns:
        pd.DataFrame: A DataFrame containing ticker, close price, DoD change, WoW change, MoM change, and YoY change.

    """
    today = get_nearest_work_day(datetime.datetime.now())
    dod_date = today - moex_bday
    wow_date = get_nearest_work_day(today - relativedelta(weeks=1))
    mom_date = get_nearest_work_day(today - relativedelta(months=1))
    yoy_date = get_nearest_work_day(today - relativedelta(months=12))
    today_prices = get_prices(ticker_list, today, session)
    dod_prices = get_prices(ticker_list, dod_date, session)
    wow_prices = get_prices(ticker_list, wow_date, session)
    mom_prices = get_prices(ticker_list, mom_date, session)
    yoy_prices = get_prices(ticker_list, yoy_date, session)
    dod_change = price_change(today_prices, dod_prices)
    wow_change = price_change(today_prices, wow_prices)
    mom_change = price_change(today_prices, mom_prices)
    yoy_change = price_change(today_prices, yoy_prices)
    df = pd.concat({
        'close_price': today_prices,
        'DoD': dod_change,
        'WoW': wow_change,
        'MoM': mom_change,
        'YoY': yoy_change
    }, axis=1)
    return df

In [None]:
def format_ticker(ticker_info: pd.Series) -> str:
    """
    Formats ticker information into a string.

    Args:
        ticker_info (pd.Series): The ticker information as a Series.

    Returns:
        str: The formatted ticker information.
    """
    return f"{ticker_info.name}   {ticker_info['close_price']}   {ticker_info['DoD']}%   {ticker_info['WoW']}%   {ticker_info['MoM']}%   {ticker_info['YoY']}%"


def format_report(exchange_data: pd.DataFrame) -> str:
    """
    Formats the exchange data report into a string.

    Args:
        exchange_data (pd.DataFrame): The exchange data report as a DataFrame.

    Returns:
        str: The formatted exchange data report.
    """
    report_lines = [format_ticker(exchange_data.loc[i]) for i in exchange_data.index]
    return "Тикер  Цена     DoD     WoW     MoM     YoY\n" + "\n".join(report_lines)

In [None]:
def excel_update(file_path: str, db_path: str, chat_id: int):
    """
    Updates the database with tickers from an Excel file.

    Args:
        file_path (str): The path to the Excel file.
        db_path (str): The path to the database file.
        chat_id (int): The chat ID associated with the tickers.

    Returns:
        None
    """
    tickers = pd.read_excel(file_path)['ticker']
    store_tickers(db_path, chat_id, tickers)


In [None]:
bot = telebot.TeleBot(telegram_api_key, parse_mode=None)


jobstores = {
    "default": SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
    "cache": MemoryJobStore()
}

scheduler = BackgroundScheduler(jobstores=jobstores)

session = requests_cache.CachedSession(cache_name="moex", backend="sqlite")


@bot.message_handler(commands=['start'])
def send_welcome(message: types.Message):
    """
    Handles the 'start' command and initializes the bot.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    if message.chat.id not in get_users(db_path):
        bot.send_message(message.chat.id,
                         'Этот бот предоставляет регулярные отчеты о вашем портфеле на Московской Бирже')
        markup: ReplyKeyboardMarkup = types.ReplyKeyboardMarkup(row_width=2, resize_keyboard=True,
                                                                one_time_keyboard=True)
        markup.add("В 09:30", "Другое")
        msg = bot.send_message(message.chat.id,
                               'Торги на MOEX открываются в 09:30 (по МСК). В какое время вы хотите получать отчет?',
                               reply_markup=markup)
        bot.register_next_step_handler(msg, time_select, **{"from_welcome": True})
    else:
        bot.send_message(message.chat.id,
                         "Вы уже добавляли свои данные. Чтобы изменить список тикеров, используйте команду: /update_tickers")


def time_select(message: types.Message, from_welcome=False):
    """
    Handles the time selection for report delivery.

    Args:
        message (types.Message): The received message object.
        from_welcome (bool): Indicates if the function is called from the welcome message.

    Returns:
        None
    """
    if message.text == "В 09:30":
        chosen_time = (9, 30)
        store_time(db_path, message.chat.id, chosen_time)
        create_jobs(set_jobs_dict(db_path))
        bot.send_message(message.chat.id, "Время выбрано", reply_markup=types.ReplyKeyboardRemove())
        if from_welcome:
            update_ticker(message)

    else:
        msg = bot.send_message(message.chat.id, "Введите желаемое время в формате 00:00")
        bot.register_next_step_handler(msg, custom_time, **{"from_welcome": from_welcome})


@bot.message_handler(commands=['help'])
def handle_help(message: types.Message):
    """
    Handles the 'help' command and sends the help message.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    bot.send_message(message.chat.id, "Этот бот предоставляет регулярные отчеты о вашем портфеле на Московской Бирже")


@bot.message_handler(commands=['update_time'])
def update_time(message: types.Message):
    """
    Handles the 'update_time' command and prompts the user to update the report delivery time.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    markup: ReplyKeyboardMarkup = types.ReplyKeyboardMarkup(row_width=2, resize_keyboard=True,
                                                            one_time_keyboard=True)
    markup.add("В 09:30", "Другое")
    msg = bot.send_message(message.chat.id,
                           'Торги на MOEX открываются в 09:30, во сколько вы хотите получать отчет?',
                           reply_markup=markup)
    bot.register_next_step_handler(msg, time_select)


def custom_time(message: types.Message, from_welcome=False):
    """
    Handles the custom time input for report delivery.

    Args:
        message (types.Message): The received message object.
        from_welcome (bool): Indicates if the function is called from the welcome message.

    Returns:
        None
    """
    try:
        user_choice = parse_time(message.text.strip())
        store_time(db_path, message.chat.id, user_choice)
        create_jobs(set_jobs_dict(db_path))
        msg = bot.send_message(message.chat.id, "Время выбрано", reply_markup=types.ReplyKeyboardRemove())
        if from_welcome:
            update_ticker(msg)
    except ValueError:
        msg = bot.send_message(message.chat.id, "Неверный формат времени")
        bot.register_next_step_handler(msg, custom_time, **{"from_welcome": from_welcome})


def parse_time(time_input: str) -> tuple:
    """
    Parses the user-inputted time and returns a tuple of (hour, minute).

    Args:
        time_input (str): The user-inputted time in the format HH:MM.

    Returns:
        tuple: A tuple of (hour, minute).

    Raises:
        ValueError: If the time format is incorrect.
    """
    try:
        time = datetime.datetime.strptime(time_input, "%H:%M")
        return time.hour, time.minute
    except:
        raise ValueError


@bot.message_handler(commands=['update_tickers'])
def update_ticker(message: types.Message):
    """
    Handles the 'update_tickers' command and prompts the user to choose the method of updating tickers.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    markup: ReplyKeyboardMarkup = types.ReplyKeyboardMarkup(row_width=2, resize_keyboard=True, one_time_keyboard=True)
    markup.add("Сообщение", "Excel-таблица")
    msg = bot.send_message(message.chat.id, "Выберете способ загрузки тикеров",
                           reply_markup=markup)
    bot.register_next_step_handler(msg, handle_broker_choice)


def handle_broker_choice(message: types.Message):
    """
    Handles the user's choice of ticker update method.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    if message.text == "Сообщение":
        msg = bot.send_message(message.chat.id, "Напишите список тикеров через запятую.\nПример: SBER, VTBR",
                               reply_markup=types.ReplyKeyboardRemove())
        bot.register_next_step_handler(msg, message_ticker_handler)
    elif message.text == "Excel-таблица":
        msg = bot.send_message(message.chat.id, "Загрузить Excel-файл с колонками ticker, type. Подробнее: ",
                               reply_markup=types.ReplyKeyboardRemove())
        bot.register_next_step_handler(msg, custom_tickers_handler)


def message_ticker_handler(message: types.Message):
    """
    Handles the user-inputted tickers in the message format.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    user_tickers = [x.upper() for x in message.text.strip().split(", ")]
    store_tickers(db_path, message.chat.id, pd.Series(user_tickers))
    bot.send_message(message.chat.id, "Тикеры добавлены")


def custom_tickers_handler(message: types.Message):
    """
    Handles the user-inputted tickers in the custom Excel file format.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    if message.document.mime_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
                                      "application/vnd.ms-excel"]:
        file_info = bot.get_file(message.document.file_id)
        downloaded_file = bot.download_file(file_info.file_path)
        temp = tempfile.NamedTemporaryFile()
        with open(temp.name, "wb") as excel:
            excel.write(downloaded_file)
        excel_update(temp.name, db_path, message.chat.id)
        bot.send_message(message.chat.id, "Тикеры добавлены")


@bot.message_handler(commands=["get_report"])
def get_report(message: types.Message):
    """
    Handles the 'get_report' command and sends the portfolio report to the user.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    send_report(message.chat.id)


def send_report(user_id: int):
    """
    Sends the portfolio report to the specified user.

    Args:
        user_id (int): The ID of the user.

    Returns:
        None
    """
    user_ticker = get_user_tickers(db_path, user_id)
    msg = bot.send_message(user_id, "Пожалуйста, подождите...", disable_notification=True)
    data = moex_counter(user_ticker)
    bot.delete_message(user_id, msg.id)
    bot.send_message(user_id, format_report(data))


def create_jobs(job_dict: dict):
    """
    Creates the scheduler jobs for sending regular portfolio reports.

    Args:
        job_dict (dict): A dictionary containing user IDs as keys and report delivery times as values.

    Returns:
        None
    """
    for user_id, time in job_dict.items():
        scheduler.remove_all_jobs()
        scheduler.add_job(send_report, 'cron', hour=time[0], minute=time[1], kwargs={"user_id": user_id},
                          jobstore="default")


@bot.message_handler(commands=['get_tickers'])
def get_tickers(message: types.Message):
    """
    Handles the 'get_tickers' command and sends the user's tickers.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    user_tickers = get_user_tickers(db_path, message.chat.id)
    bot.send_message(message.chat.id,
                     "Ваши тикеры:\n" + "\n".join([f"{i + 1}. {ticker}" for i, ticker in enumerate(user_tickers)]))


@bot.message_handler(commands=['get_time'])
def get_time(message: types.Message):
    """
    Handles the 'get_time' command and sends the user's report delivery time.

    Args:
        message (types.Message): The received message object.

    Returns:
        None
    """
    user_time = get_user_time(db_path, message.chat.id)
    bot.send_message(message.chat.id, f"Время доставки отчета: {user_time[0]:02d}:{user_time[1]:02d}")

def clear_cache():
    session.cache.clear()


setup_db(db_path)
scheduler.start()
scheduler.add_job(clear_cache, "cron", hour=0, minute=0, jobstore="cache")
bot.infinity_polling()