## Профильное задание
### ML-разработчик
#### Выполнил: Пантелеев Игорь

# Задача
<p> На вход сервису поступают обновления документов
```
message TDocument {<br>
string Url = 1;<br>
uint64 PubDate = 2;<br>
uint64 FetchTime = 3;<br>
string Text = 4;<br>
uint64 FirstFetchTime = 5<br>
```<br>
Необходимо на выходе формировать такие же сообщения, но с исправленными отдельными полями со следующими правилами (всё нижеуказанное - для группы документов с совпадающим полем Url):
</p>

* Поля Text и Fetch должны быть такими, какими были в документе с наибольшим FetchTime, полученным на данный момент<br>
* Поле PubDate должно быть таким, каким было у сообщения с наименьшим FetchTime<br>
* Поле FirstFetchTime должно быть равно минимальному значению FetchTime<br>


# Подготовка 
<p>
Как я вижу эту задачу. Есть некая очередь (Kafka и т.п.), в которую попадают новые и обновления старых документов. Мы должны взять из этой очереди документ, передать его в наш модуль и на выходе отправить этот документ в другую очередь. Я не силён в UML, но попробую нарисовать схему процесса
</p>

<img src="UML.png" alt="Мое изображение" width="500" height="100">

<p>С учётом того, что из вводных нам дано, что время хранится в uint64, вероятнее всего речь идёт о секундах (милисекунды вряд ли пригодятся для отслеживания обновлений документов и больше применимы к микротранзакциям</p>

# 1-й этап: иммитируем Kafka

<p>
    Чтобы сразу подготовить наше решение к "боевой версии", напишем класс, который будет иммитировать очередь поступающих документов по 3 разным сценариям:
    
* Новый документ
* Обновление старого документа
* Ошибка
</p>

In [1]:
import pandas as pd
import random
import string
import requests
import time
from datetime import datetime, timedelta
import sqlite3

class Generator():
    def __init__(self):
        #api fish-text.ru
        self.__url = 'https://fish-text.ru/get'
        # Параметры запроса
        self.__params = {
            'format': 'html',
            'type': 'paragraph',
        }
        self.__result = {}
        self.__connection = sqlite3.connect('vk.db')
        self.__cursor = self.__connection.cursor()
        self.__cursor.execute('SELECT tod.Url, tod.Text FROM T_Documents tod')
        self.__current_bd = pd.DataFrame(self.__cursor.fetchall(), columns=[description[0] for description in self.__cursor.description])


    #Генератор случайных URL
    def __generate_random_string(self, length: int) -> str:
        rand = random.randint(0, 1)
        #выдаёт случайное новое значение или случайное значение из имеющихся
        try:
            if rand == 1:
                letters = string.ascii_letters + string.digits
                return ''.join(random.choice(letters) for i in range(length))
            else:
                url = self.__current_bd['Url'][random.randint(0, len(self.__current_bd)-1)]
                return url
        except:
            return ''
        
    #Генератор постов
    def __generate_new_text(self) -> str:
        #выдаёт случаное новое значение из api или берёт случайный текст из уже имеющихся
        rand = random.choices([0, 1], [0.4, 0.6], k=1)[0]
        try:
            if rand == 1:
                try:
                    response = requests.get(self.__url, params=self.__params)
                    fish_text = response.text.replace('<p>', '').split('</p>')[0]

                    # чтобы не получить блокировку за превышение количества запросов
                    time.sleep(0.1)
                except:
                    fish_text = ''

                return fish_text
            else:
                fish_text = self.__current_bd['Text'][random.randint(0, len(self.__current_bd)-1)]
                return fish_text
        except:
            return ''

    #функция для генерации случайной даты в заданном диапазоне
    def __generate_random_date(self, start_date: datetime, end_date: datetime):
        rand = random.choices([0, 1], [0.2, 0.8], k=1)[0]
        if rand == 1:
            delta = end_date - start_date
            random_days = random.randrange(abs(delta.days))
            dt = start_date + timedelta(days=random_days)
            try:
                return int(dt.timestamp())
            except:
                return ''
        else:
            return 'Error date'

    def __update_bd(self):
        self.__cursor.execute('SELECT tod.Url, tod.Text FROM T_Documents tod')
        self.__current_bd = pd.DataFrame(self.__cursor.fetchall(),
                                         columns=[description[0] for description in self.__cursor.description])

    #функция генерация документов
    def generate_document(self):
        #примерно раз в 10 секунд, будет выгружать то, что уже находится в очереди и подставлять в значения
        #тем самым иммитируя то, что документ обновился
        if datetime.now().second % 10 == 0:
            self.__update_bd()


        start_date = [datetime(2001, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                      datetime(1001, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                      datetime(2011, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                      datetime(21, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                      datetime(2031, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59))]
        end_date = [datetime(2000, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                    datetime(1000, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                    datetime(2030, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                    datetime(20, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59)), \
                    datetime(2030, 1, 1, hour=random.randint(0, 23), minute=random.randint(0, 59))]
        prob = [0.3, 0.2, 0.1, 0.25, 0.15]
        #получаем случайные url. Только вариант из 12 символов для нас будет корректным
        self.__result['Url'] = self.__generate_random_string(random.choices([12, 103, 0, 3, 17], prob, k=1)[0])
        self.__result['PubDate'] = ''
        self.__result['Text'] = self.__generate_new_text()
        self.__result['FetchTime'] = self.__generate_random_date(random.choices(start_date, prob, k=1)[0], random.choices(end_date, prob, k=1)[0])
        self.__result['FirstFetchTime'] = ''

        return self.__result

    def __del__(self):
        self.__connection.close()


<p>
У нашего класса, который иммитирует очередь есть несколько методов, разберём их: 
    
* __init__(self): конструктор класса. Его задача для объекта генератора подключиться к API и настроить параметры запросов. Также он подключается к выходной очереди и проверяет Url уже модерированных документов, также я добавил выгрузку текстов (не является обязательным и в данном случае даже плохо, так как чуть замедляет работу кода), чтобы иметь возможность подтягивать старые значения
  
* __generate_random_string(self, length: int) -> str: метод, который выдаёт случайную строку по запросу. Его задача генерировать случайные Url наших документов. Поскольку никаких вводных относительно Url не было, я поставил условие проверки - Url должен состоять из 12 символов. Это условие легко можно заменить на любое другое (к примеру включение подстроки https://vk.com)

* __generate_new_text(self) -> str: выдаёт случайные текст. Здесь будет использоваться API fish-text.ru, чтобы получать какой-то текст, который будет увеличивать нагрузку на запрос, тем самым приближая решение к "боевому". Также выдаёт либо текст, либо пустой текст, либо один из имеющихся. Чтобы не получить блокировку за превышение количества запросов, ставлю ограничение в 0.1 секунду на запрос
    
* __generate_random_date(self, start_date: datetime, end_date: datetime): метод, который выдаёт случайную дату в заданном диапазоне. На вход будут подаваться как адекватные даты, так и выходящие за рамки нового времени. Также, этот метод будет возвращать ошибки даты, такое встречается когда запись не была успешной.

*  __update_bd(self): генератор ~ раз в 10 секунд будет обновлять свою переменную, хранящую записи бд, чтобы иногда генерировать обновления старых документов

* generate_document(self): единственный public метод класса. Его задача собрать результат всех функций и вернуть в виде словаря, который в дальнейшем будет отправлен на интерфейс listener

* __del__(self): деструктор закрывает соединение с бд

Все методы, участвующие в генерации документа, выдают либо адекватное значение, либо неадекватное с некоторой вероятностью, чтобы иммитировать битые записи
</p>

# 2-й этап Создаём базу данных и таблицу T_Documents

<p> В качестве БД, которая будет хранить выходную очередь, я выбрал sqllite. Тут я ориентировался на то, чтобы скорость запроса к БД был как можно быстрее. Плюс простота использования.

    import sqlite3

    # Подключение к базе данных (если базы данных не существует, она будет создана)
    conn = sqlite3.connect('vk.db')
    
    # Создание курсора
    cursor = conn.cursor()
    
    # Создание таблицы T_Documents
    cursor.execute('''
    CREATE TABLE T_Documents (
        Url TEXT,
        FetchTime INTEGER,
        PubDate INTEGER,
        FirstFetchTime INTEGER,
        Text TEXT
    )
    ''')
    
    # Сохранение изменений
    conn.commit()
    
    # Закрытие соединения
    conn.close()

</p>

# 3-й этап создание интерфейса

<p> Теперь, нам нужен интерфейс которому на вход будут подаваться документы, на выходе либо такой же документ с изменениями, либо строка 'nil', которая означает, что документ не прошёл документацию. Я сделал выбор в сторону Flask.
    
    from flask import Flask, request, jsonify
    import pandas as pd
    from datetime import datetime
    import sqlite3
    
    #настройки Flask
    app = Flask(__name__)
    #список, который будет хранить Url прошедших "модерацию" постов
    @app.route('/listener', methods=['POST'])
    def listener():
        #отлавливаем ошибку, когда пришёл не json
        try:
            response_data = request.get_json()
            return jsonify(checker(response_data))
        except:
            return 'nil'
</p>

# 4-й этап создание функции проверки документов

<p> У нас есть генератор документов, выходная очередь, интерфейс взаимодействия, теперь осталось сделать функцию (можно класс, но для тестового задания выглядит избыточно), которая будет проверять валидность всех полей и принимать решение - добавить новую запись или обновить старый документ. <br>
    Какие ошибки я могу предвидеть, не имея доступа к реальным данным:

    * очевидная ошибка, которую я отсекаю сразу в интерфейсе - вместо json пришло что-то другое
    * отсутствуют необходимые ключи

    try:
        Url = new_document['Url']
        FetchTime = new_document['FetchTime']
        Text = new_document['Text']
    except:
        return 'nil'

    * невалидная дата (поскольку дата записана в виде числа, если входное значение преобразовать к числу не выйдет или оно окажется меньше 0 - значит дата невалидна

    try:
        if int(FetchTime) >= 0:
            pass
        else:
            return 'nil'
    except:
        return 'nil'

    * неадекватная дата. Здесь я беру диапазон от 1990 года до текущего момента

    if FetchTime > datetime(1990, 1, 1).timestamp() and FetchTime < datetime.now().timestamp():
        pass
    else:
        return 'nil'

    * Url не соответствует требованиям. Выполняю проверку на длину строки Url == 12
</p>

<p> Дальше подключаемся к нашей выходной бд с модерированными документами и вытаскиваем Url документов (дешёвая операция, проверял на удалённой бд с 17 млн записей, занимает максимум секунд 10. На локальной бд будет ещё меньше), чтобы иметь возможность обновлять старые документы, если на них пришло обновление.

    connection = sqlite3.connect('vk.db')
    cursor = connection.cursor()
    cursor.execute("SELECT tod.Url FROM T_Documents tod")
    bd = pd.DataFrame(cursor.fetchall(), columns=[description[0] for description in cursor.description])
    url_list = bd['Url'].to_list()
</p>

<p>Дальше у нас есть 2 сценария:
    
* запись с таким Url уже есть
* такой записи нет
</p>

<p>Рассмотрим второй, более простой сценарий. В данном случае, поскольку мы уже проверили входной словарь, мы проверяем валидность Url (12 символов) и делаем запись в выходную очередь, а также делаем return с теми же значениями. Если Url не выполняет требования или произошла ошибка на этапе записи, возвращаем nil
    
        if Url not in url_list:
            if len(Url) == 12:
                connection = sqlite3.connect('vk.db')
                cursor = connection.cursor()
                cursor.execute(
                    'INSERT INTO T_documents (Url, FetchTime, PubDate, Text, FirstFetchTime) VALUES (?, ?, ?, ?, ?)', \
                    (Url, FetchTime, FetchTime, Text, FetchTime))
                connection.commit()
                connection.close()
                return {'Url': Url, 'FetchTime': FetchTime, 'PubDate': FetchTime, 'Text': Text,
                        'FirstFetchTime': FetchTime}
            else:
                return 'nil'

</p>

<p>По первому сценарию, мы нашли запись с таким Url и теперь нужно проверить даты. Посколько FirstFetchTime == PubDate, можем сравнивать с любым полем
    
            connection = sqlite3.connect('vk.db')
            cursor = connection.cursor()
            cursor.execute(f"SELECT tod.Url, tod.FetchTime, tod.PubDate, tod.Text, tod.FirstFetchTime \
                        FROM T_Documents tod WHERE tod.Url = '{Url}'")
            bd = pd.DataFrame(cursor.fetchall(), columns=[description[0] for description in cursor.description])
            current_fetch = bd['FirstFetchTime'][0]
</p>

<p>Дальше сравниваем даты. Если текущая дата > пришедшей даты, значит нам нужно обновить все поля, касающиеся даты. Если нет, то только поле FetchTime

                if current_fetch > FetchTime:
                    connection = sqlite3.connect('vk.db')
                    cursor = connection.cursor()
                    z = f"UPDATE T_Documents SET FetchTime='{FetchTime}', PubDate='{FetchTime}', Text='{Text}', \
                            FirstFetchTime='{FetchTime}' WHERE Url='{Url}'"
                    cursor.execute(z)
                    connection.commit()
                    return {'Url': Url, 'FetchTime': FetchTime, 'PubDate': FetchTime, 'Text': Text,
                            'FirstFetchTime': FetchTime}
                else:
                    connection = sqlite3.connect('vk.db')
                    cursor = connection.cursor()
                    z = f"UPDATE T_Documents SET FetchTime='{FetchTime}', PubDate='{bd['PubDate'][0]}', Text='{Text}', \
                            FirstFetchTime='{bd['FirstFetchTime'][0]}' WHERE Url='{Url}'"
                    cursor.execute(z)
                    connection.commit()
                    return {'Url': Url, 'FetchTime': FetchTime, 'PubDate': FetchTime, 'Text': Text,
                            'FirstFetchTime': FetchTime}
</p>

# 5-й этап Тестируем решение

<p>
    У нас всё готово для проверки работоспособности решения. Импортируем необходимые библиотеки, создаём функцию, которая будет подключаться к интерфейсу и отправлять в него новый документ от генератора
    
    import document_generator as dg
    import pandas as pd
    import json
    import requests
    import random
    import time

    #функция для отправки новых документов в интерфейс
    def new_document():
        url = 'http://127.0.0.1:5000/listener'
        document = dg.Generator().generate_document()
        return requests.post(url, json=document).text
</p>

<p>Дальше я хочу собрать какую-то статистику по времени выполнения (с оглядкой на задержку в 0.1 секунду по обращению к API fish-text.ru). Поэтому создаём список для хранения времени выполнения</p>
    time_list = []

<p>Начинаем тестирование
    
    while True:
        start_time = time.time()
        new_document()
        end_time = time.time()
        execution_time = end_time - start_time
        time_list.append(execution_time)
        if (len(time_list)) > 50000:
            break
</p>

<img src="Результат.png" alt="Мое изображение" width="500" height="300">

<p>Как видно, медиана одного запроса составляет 0.35 секунды (-0.1 запрос API)
Выбросы вероятнее всего связаны именно с использованием API
</p>
<p>По результатам 50042 запросов, было добавлено и обновлено 1311 записей (сохраняю в бд для проверки)</p>