# Рекомендательная система в Docker: Postgres+Mongo+Flask

В рамках данного воркшопа будет продемонстрирована архитектура хранения и обработки данных:

- Использование реляционныой БД для хранения данных
- Выгрузка и преварительная обработка данных с помощью SQL
- Взаимодействие между БД и Python
- Нереляционные хранилища: Mongo, Redis

## Решение по хранению и обработке данных

Запускаем Docker-контейнер с данными в PostgreSQL

<pre>
docker-compose --project-name data-cli -f docker-compose.yml up --build flask-app
</pre>

Подробнее про Docker тут: https://hackernoon.com/docker-tutorial-getting-started-with-python-redis-and-nginx-81a9d740d091

Сборка контейнера включает в себя создание реляционной БД PostgreSQL с двумя таблицами (данные берём из CSV). В базу данных будут загружены файлы из [этого](https://www.kaggle.com/rounakbanik/the-movies-dataset/data) соревнования- их нужно скачать заранее, прягодятся для домашних работ.

Если всё прошло успешно, то по url http://0.0.0.0:5001 можно будет увидеть приветственную страницу приложения

![Главная страница приложения](https://habrastorage.org/webt/oc/rb/op/ocrbophkojh8ll_qsqp5naf3i-g.jpeg)

Управлять выдачей на странице ["SVD рекомендации"](http://0.0.0.0:5001/recs?user_id=10&top=15) можно с помощью параметров user_id и top.

![Страница приложения с рекомендациями](https://habrastorage.org/webt/bp/d9/q-/bpd9q-c1v1_k2kbrgxwvssgq0da.jpeg)

## Получаем данные

Данные хранятся в PostgreSQL - реляционной БД с открытым исходным кодом. Первая часть курса посвящена реляционным базам данных и языку SQL.

Мы научимся писать сложные SQL запросы для фильтрации данных и предварительной обработки: удалению шумов, расчёта фичей и т.д.

Язык запросов SQL позволяет строить гибкие пайплайны по обработке данных. Средства более высокого уровня для обработки "сырых" данных - например, Apache Spark - используют для построения процессов обработки примитивы, аналогичные SQL.

Приведённый ниже сниппет демонтрирует ещё одно достоинство Postgres: в Python существует коннектор к этой БД, билиотека psycopg2. Таким образом можно отладить запрос в любой удобной для себя среде, а потом перенести его в Python-приложение

In [2]:
import psycopg2
import pandas as pd
from scipy.sparse import coo_matrix
import numpy as np

# параметры подключения к БД
params = {"host": "localhost", "port": 5433, "user": 'postgres'}
conn = psycopg2.connect(**params)

cursor = conn.cursor()

# параметры SQL-запроса
USER_ITEM_QUERY_CONFIG = {
       "MIN_USERS_FOR_ITEM": 10,
       "MIN_ITEMS_FOR_USER": 3,
       "MAX_ITEMS_FOR_USER": 50,
       "MAX_ROW_NUMBER": 100000
}
cursor.execute("""
    SELECT 
        ratings.userId, ratings.movieId, AVG(ratings.rating) as rating
    FROM ratings
    -- фильтруем фильмы, которые редко оценивают
    INNER JOIN (
        SELECT 
            movieId, count(*) as users_per_item
        FROM ratings 
        GROUP BY movieId 
        HAVING COUNT(*) > %(MIN_USERS_FOR_ITEM)d
    ) as movie_agg
        ON movie_agg.movieId = ratings.movieId
    -- фильтруем пользователей, у которых мало рейтингов
    INNER JOIN (
        SELECT 
            userId, count(*) as items_per_user
        FROM ratings 
        GROUP BY userId 
        HAVING COUNT(*) BETWEEN %(MIN_ITEMS_FOR_USER)d AND %(MAX_ITEMS_FOR_USER)d 
    ) as user_agg
        ON user_agg.userId = ratings.userId
    GROUP BY 1,2
    LIMIT %(MAX_ROW_NUMBER)d
""" % USER_ITEM_QUERY_CONFIG
)

Закрываем соединение к БД и выгружаем данные в объект Pandas DataFrame

In [3]:
# вытаскиваем результаты SQL в память Python
ui_data = [a for a in cursor.fetchall()]

df = pd.DataFrame(ui_data, columns=[a.name for a in cursor.description])

conn.close()
print(df.shape)
df.head()

(88736, 3)


Unnamed: 0,userid,movieid,rating
0,1,110,1.0
1,1,147,4.5
2,1,858,5.0
3,1,1221,5.0
4,1,1246,5.0


## Препроцессинг данных: переиндексация

Для построения рекомендательной системы мы будем использовать алгоритм матричного разложения SVD.

Для этого нужно сформировать из триплетов *[userid,	movieid, rating]* матрицу user-item ( подробнее в статье в блоге ivi https://habr.com/company/ivi/blog/232843/ ) . В матрице user-item число строк совпадает с числом уникальных пользователей, а число столбцов - с количеством единиц контента (размером каталога).

Сформируем такую матрицу, используем в качестве индекса строк и столбцов непосредственно значения userid,	movieid:

In [4]:
ui_matrix = coo_matrix((
    [row[2] for row in ui_data],
    ([row[0] for row in ui_data], [row[1] for row in ui_data])
)).astype(np.float16)
ui_matrix.shape

(7955, 171764)

В нашей матрице 16912 строк и 176212 столбцов. Однако там много полностью нулевых строк (id юзеров, которые не попали в выборку) и большое количество нулевых столбцов (id контента, которого не слуществует). Проверим, сколько у нас пустых id контента и и пользователей):

In [5]:
print("""% пользователей без активности {}
% контента без просмотров {}""".format(
        1 - np.unique(ui_matrix.nonzero()[0]).size/ui_matrix.shape[0],
        1 - np.unique(ui_matrix.nonzero()[1]).size/ui_matrix.shape[1]
    )
)

% пользователей без активности 0.4253928346951603
% контента без просмотров 0.9723748864721362


Вывод: данные хранятся неоптимально, нужно переиндексировать пользователей и контент чтобы избавиться от пустых строк и столбцов

In [6]:
# индекс пользователей
user_index = {
    i[1]: i[0][0] 
    for i in np.ndenumerate(np.unique([triplet[0] for triplet in ui_data]))
}
# обратный индекс - нужен для фронтэнда
inverse_user_index = {j: i for i, j in user_index.items()}

# аналогично индекс контента
item_index = {
    i[1]: i[0][0] 
    for i in np.ndenumerate(np.unique([triplet[1] for triplet in ui_data]))
}
inverse_item_index = {j: i for i, j in item_index.items()}

Применяем к выгрузке из SQL преобразование индексов и формируем новую user-item матрицу

In [7]:
raiting_list = [row[2] for row in ui_data]
user_index_plain = [user_index[row[0]] for row in ui_data]
item_index_plain = [item_index[row[1]] for row in ui_data]

ui_matrix = coo_matrix((raiting_list, (user_index_plain, item_index_plain))).astype(np.float16)
# del df, ui_data # если не хватит памяти
ui_matrix.shape

(4571, 4745)

Как видно, размерноть матрицы сильно уменьшилось.

Предварительная обработка данных из хранища - обязательный шаг в машинном обучении. Правильная предварительная обработка позволяет экономить вычислительные ресурсы.


## Обучаем модель

Данные подготовлены для обучения - можем построить модель.

В качестве рекомендательной модели мы будем использовать SVD-разложение матрицы user-item. В результате разложения матрица user-item $S$ размерности $m \times n$ будет представлена в виде произведения двух матриц меньшей размерности - матрицей факторов контента и матрицей факторов пользователей.

$$
S = U \times I^T
$$

При этом матрицы размерности $U\sim m \times k$ и $I \sim m \times k$, где $m$ и $n$ порядка нескольких тысяч, а $k$ - размерность пространства скрытых факторов, обычно не превышает 100.

Подробнее про SVD можно почитать тут https://habr.com/company/surfingbird/blog/139863/

In [8]:
from scipy.sparse.linalg import svds

num_users, num_items = ui_matrix.shape
user_factors ,scale, item_factors = svds(ui_matrix.asfptype(), k=50, return_singular_vectors=True)
#create square matrix
scale = np.diag(np.sqrt(scale))
user_factors = np.dot(user_factors, scale).astype(np.float16)
item_factors = np.dot(scale, item_factors).astype(np.float16)

print(user_factors.shape, item_factors.shape)

(4571, 50) (50, 4745)


# Сохраняем модель в MongoDB, Redis

Итак, мы:
- выгрузили данные из Postgres с помощью запроса SQL
- провели небольшую работу по агрегации и очистке данных на стороне SQL
- выполнили небольшой процессинг(переиндексацию) в Python.
- обучили модель

На этапе эксплуатации нам нужно рекомендовать контент для пользователя. Чтобы посчитать персональные рекомендации для каждого пользователя нужно переменожить факторы пользователя на факторы контента, которые мы получили на этапе обучения модели - для этого нужно сохранить эти два массива.

В нашем мини-проекте мы применяем Postgres для хранения "сырых" данных. Для хранения модели больше подходят нереляционные хранилища данных - им посвящена вторая часть курса. Нереляционные (NoSQL) хранилища обладают двумя главными преимуществами - их просто настраивать и лего масштабировать.

Мы будем использовать Mongo для хранения факторов пользователей и Redis как общий кэш для хранения факторов контента

Оба хранилища подняты в докере, порты пробрасываются с локальной машины - за общением между сервисами бэкенда можно наблюдать прямо в консоли.

In [10]:
from pymongo import MongoClient
from redis import Redis

from lz4.block import compress, decompress
from msgpack import packb, unpackb
from msgpack_numpy import decode, encode

mongo_conf = {'host': "localhost", 'port': 27018}
mongo_storage = MongoClient(**mongo_conf)
mongo_recsys_storage = mongo_storage.get_database("recsys")

# инициализируем хранилище Mongo
user_factors_storage = mongo_recsys_storage.get_collection("user")

# инициализируем хранилище Redis
REDIS_CONF = {"host": "localhost", "port": 6380, "db": 0}
redis_storage = Redis(**REDIS_CONF)

Проверим установку - версия должна быть такой же, что и в файле requirements.

In [11]:
import pymongo
pymongo.__version__

'3.7.1'

Мы сохраняем факторы пользователей в Mongo: по одному документу на каждого пользователя
Факторы контента сохраняем в Redis как один массив

In [12]:
# сохраняем факторы пользователей
selector = {'id': {'$in': [user_id for user_id in range(num_users)]}}
user_factors_storage.delete_many(selector)
user_factors_storage.insert_many(
    [
        {
            'id': user_id,
            'value': compress(packb(user_factors[user_id,:], default=encode))
        } 
        for user_id in range(num_users)
    ]
)

# сохраняем факторы контента
redis_storage.set("item_factors" , compress(packb(item_factors, default=encode)))

True

Для выдачи рекомендаций мы получаем факторы пользователя из Mongo, матрицу факторов контента из Redis и и перемножаем их. Получаем вектор размерности $1\times n$ - то есть каждому кконтенту из каталога соответствует некое число - т.н. "релевантность". Осталосль отсортировать каталог по значению релевантности - контент с самым высоким значением должен понравится пользователю

_Примечание_: если хватит памяти на машине, можно предрасчитать рекомендации для всех пользователей сразу как

<pre>
recs = np.dot(user_factors, item_factors)
</pre>

_Домашнее задание_: сохранить в Mongo по каждому пользователю предрастчитанное ранжирование (дополнительно к факторам контента). Подсказка: используйте функцию *update* из PyMongo.

In [13]:
# матрица факторов пользователя
mongo_doc = user_factors_storage.find_one({'id': 100})
if mongo_doc is None:
    print("Пользователя с id %(id)d не существует" % user_doc)
else:
    latent_user_factors = unpackb(decompress(mongo_doc['value']), object_hook=decode)

    # матрица факторов контента
    redis_data = redis_storage.get("item_factors")
    latent_item_factors = unpackb(decompress(redis_data), object_hook=decode)

    # вычисляем персональную релевантность контента
    personal_recs = latent_user_factors.reshape(1,-1).dot(latent_item_factors)


    print("user_factors {}\nitem_factors {}\nrecommendations {}".format(
            latent_user_factors.shape, latent_item_factors.shape, personal_recs.shape)
    )

user_factors (50,)
item_factors (50, 4745)
recommendations (1, 4745)


Мы получили рекомендации в виде массива. На финальном этапе нажно отфильтровать top-100 самых релевантных пользователю единиц контента и выполнить преобразование из нашего плотного индекса обратно к *movieId*

_Домашнее задание_: залить в Mongo другую полезную информацию из репозитория и сделать форму вывода более богатой.

In [14]:
user_recommendations = [inverse_item_index[i] for i in np.argsort(-personal_recs[0])[:50]]
print(user_recommendations)

[524, 1186, 2881, 3363, 3510, 362, 1321, 914, 531, 2145, 1721, 277, 2501, 902, 135, 2366, 2144, 3424, 2707, 3107, 2605, 2572, 719, 2160, 849, 1235, 1663, 830, 1172, 2908, 1680, 3072, 1513, 4002, 3100, 170, 1019, 1378, 2140, 1968, 107, 2427, 3148, 2746, 2371, 2944, 898, 1416, 1957, 1962]


# Выводы

Мы построили веб-приложение на Flask, которое демонстрирует процесс работы с данными для построения рекомендательной системы

- Сырые данные хранятся в баз Postgres
- Первичная обработка происходит внутри SQL
- Пост-обработка данных и обучения модели происходит на стороне Python
- Эксплуатация модели производится с помощью хранилища на Mongo+Redis
- Связь между хранилищами данных и фронт-этом осуществляется с помощью Flask

В заключение: разрабатывать архитектуру хранения данных для приложения нужно с учётом сильных и слабых сторон каждой технологии