# Подбор эксклюзивных предложений

## Описание проекта

Агрегатор для доставки еды  вводит в своём приложении новую опцию — подписку. Одной из возможностей нововведения является добавление ресторанов в избранное. Только в этом случае пользователю будут поступать уведомления о специальных акциях с ограниченным сроком действия. Акции длятся всего несколько часов и часто бывают ситуативными, что накладывает условия на потоковую обработку данных и быстрой доставки их кругу пользователей, у которых ресторан добавлен в избранное. Обновление поможет ресторанам привлечь новую аудиторию, получить фидбэк на новые блюда и акции, реализовать профицит товара и увеличить продажи в непиковые часы.

Требуется разработать систему, которая поможет реализовать это обновление.

## Механизм работы сервиса

* Ресторан через мобильное приложение агрегатора для доставки еды отправляет акцию с ограниченным предложением.

* Разрабатываемый сервис в потоковом режиме забирает из брокера сообщений Kafka отправленное рестораном сообщение и подгружает данные о пользователях приложения из таблицы PostgreSQL.

* Происходит объединение загруженных из разных источников данных, и проходит проверка, кто из пользователей добавил этот ресторан в избранный список.

* Формируется заготовка для push-уведомлений этим пользователям о временных акциях и отправляется в нужный топик Kafka.

* Параллельно предыдущему пункту полученный результат сохраняется в отдельной таблице PostgreSQL для аналитики обратной связи от пользователей.

## Этап 1. Загрузка данных об акциях и пользователях

### Загрузка необходимых библиотек

Малые сроки проведения акций накладывают условия обработки поступающей информации в реальном времени. Для решения этой задачи в данном проекте будет использоваться библиотека Spark Streaming.

* Загрузка необходимых библиотек:

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType

* Заготовка для загрузки библиотек, необходимых для интеграции Spark с Kafka и PostgreSQL:

In [None]:
spark_jars_packages = ",".join(
        [
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
            "org.postgresql:postgresql:42.4.0",
        ]
    )

### Данные об акциях

В рамках проекта данные об акциях хранятся в отдельном топике **restaurant_actions**. 

1. Создание Spark-сессии(в код сразу будет включена заготовка *spark_jars_packages*):

In [None]:
def spark_init() -> SparkSession:
    spark_jars_packages = ",".join(
        [
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
            "org.postgresql:postgresql:42.4.0",
        ]
    )
    return (SparkSession.builder
            .master("local")
            .appName('RestaurantSubscribeStreamingService')
            .config("spark.jars.packages", spark_jars_packages)
            .getOrCreate()
            )

2.  Создание консьюмера для чтения в потоковом режиме сообщений об акциях из топика **restaurant_actions**:

In [None]:
def load_df(spark: SparkSession) -> DataFrame:
    return (spark.readStream
            .format('kafka')
            .option("subscribe", "restaurant_actions")   
            .load())

3. Тело сообщений,приходящих из Kafka, имеет следующую структуру:

*restaurant_id* - универсальный уникальный идентификатор(**UUID**) ресторана

*adv_campaign_id* - **UUID** рекламной кампании

*adv_campaign_content* - текст кампании

*adv_campaign_datetime_start* - время начала рекламной кампании в формате *timestamp*

*adv_campaign_datetime_end* -  время окончания рекламной кампании в формате *timestamp*

*datetime_created* - время создания кампании в формате *timestamp*

### Данные о пользователях

1. Данные о пользователях хранятся в таблице PostgreSQL **subscribers_restaurants**, содержащую следующие атрибуты:

*id* - первичный ключ, уникальный идентификатор записи в таблице

*client_id* - **UUID** пользователя

*restaurant_id* - **UUID** ресторана

2. Код функции, загружайщей данные из таблицы **subscribers_restaurants**:

In [None]:
def read_subscribers(spark: SparkSession) -> DataFrame:
    marketing_df = (spark.read
                    .format("jdbc")   
                    .option("dbtable", "subscribers_restaurants")
                    .option("driver", "org.postgresql.Driver")
                    .options(**postgresql_settings)
                    .option("maxOffsetsPerTrigger", 20)
                    .load())
    return marketing_df

## Этап 2. Подбор акций клиентам

### Подготовка данных из Kafka

Стоит учитывать ,что консьюмер Kafka принимает сообщение в json-виде `value:event`, который не подходит  для объединения с данными из PostgreSQL. Необходимо создать функция, которая проведёт десериализацию сообщения и создаст необходимый для работы в PySpark датафрейм.

* Основываясь на структуре сообщения, описанного в 1-м этапе, зададим схему датафрейма следующим образом:

In [None]:
schema = StructType([
    StructField("restaurant_id", StringType()),
	StructField("adv_campaign_id", StringType()),
	StructField("adv_campaign_content", StringType()),
    StructField("adv_campaign_datetime_start", TimestampType()),
	StructField("adv_campaign_datetime_end", TimestampType()),
	StructField("datetime_created", TimestampType())])

* Ещё одной особенностью данных является то, что все временные атрибуты заданы в формате *Unix time*. Для трансформации в удобный вид будет использоваться функция **from_unixtime**. 

* Чтобы пользователи не получали закончившиеся и ещё не начавшиеся акции, необходимо ввести проверку *adv_campaign_datetime_start* и *adv_campaign_datetime_end* с текущим временем обработки. Дополнительно будет отниматься 10-ти минутный интервал от времени конца акции, чтобы клиенты не получали акции, которые вот-вот могут стать неактуальны:

In [None]:
.filter((f.current_timestamp() >= f.col('adv_campaign_datetime_start')) & (f.current_timestamp() <= f.col('adv_campaign_datetime_end') - f.expr(f"INTERVAL 10 MINUTES")))


* Так же необходимо избавиться от ситуаций, когда одному и тому же пользователю могут повторно приходить предложения. Для этого вводится временной промежуто в 10 минут, в течение которого Spark будет искать дубли:

In [None]:
.dropDuplicates(['restaurant_id', 'adv_campaign_id', 'datetime_created']).withWatermark('datetime_created', '10 minutes')

* Обобщая всё описанное, функция создания датафрейма будет иметь следующий вид:

In [None]:
def transform(df: DataFrame) -> DataFrame:
    schema = StructType([
    StructField("restaurant_id", StringType()),
	StructField("adv_campaign_id", StringType()),
	StructField("adv_campaign_content", StringType()),
    StructField("adv_campaign_datetime_start", TimestampType()),
	StructField("adv_campaign_datetime_end", TimestampType()),
	StructField("datetime_created", TimestampType())        
    ])
 
    return (df
            .withColumn('value', f.col('value').cast(StringType()))
            .withColumn('event', f.from_json(f.col('value'), schema))
            .selectExpr('event.*')
            .withColumn('adv_campaign_datetime_start',
                        f.from_unixtime(f.col('adv_campaign_datetime_start'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
	        .withColumn('adv_campaign_datetime_end',
                        f.from_unixtime(f.col('adv_campaign_datetime_end'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
            .withColumn('datetime_created',
                        f.from_unixtime(f.col('datetime_created'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
            .filter((f.current_timestamp() >= f.col('adv_campaign_datetime_start')) & (f.current_timestamp() <= f.col('adv_campaign_datetime_end') - f.expr(f"INTERVAL 10 MINUTES")))
            .dropDuplicates(['restaurant_id', 'adv_campaign_id', 'datetime_created'])
            .withWatermark('datetime_created', '10 minutes')                       
          )

### Поиск подписок пользователей на рестораны

Проверка у каких пользователей ресторан находится в избранном будет проходить методом объединения таблиц *inner join*. Так же для удобства отслеживания работы программы будет введён атрибут времени объединения данных *trigger_datetime_created*. Итоговый вид функции:

In [None]:
def join_df(restaurant_stream_transformed, subscribers_df) -> DataFrame:
    return (restaurant_stream_transformed
            .join(subscribers_df, ['restaurant_id'], 'inner')
            .withColumn('res_id', subscribers_df.restaurant_id)
            .withColumn('trigger_datetime_created', f.lit(datetime.now()))
            .drop('res_id' , 'id')
            .dropDuplicates(['restaurant_id', 'client_id', 'adv_campaign_id'])
            .withWatermark('datetime_created', '5 minutes'))

## Этап 3. Отправка результатов отбора

В рамках создания сервиса необходимо отправлять результаты обработки сразу в два стока — Kafka для push-уведомлений и Postgres для аналитики фидбэка. Для это будет использован метод **foreachBatch(…)**

### Отправка результата для аналитики фидбэка

Для аналитики обратной связи в PostgreSQL создана таблица **subscribers_feedback**. Для загрузки данных в эту таблицу необходимо в полученный после объединения датасет добавить атрибут *feedback*. Код записи в базу данных:

In [None]:
Postgres_df = PersistDF.withColumn('feedback', f.lit(None))
Postgres_df.write \
    .format("jdbc") \
    .mode("append") \
    .option("driver", "org.postgresql.Driver") \
    .save()

### Отправка результата в Kafka для подготовки push-уведомления

Для отправки сообщений в топик Kafka необходимо сериализовать данные в сообщение формата «ключ-значение(value)». Функция, которая сериализует данные из датафрейма в JSON и кладёт их в колонку value, будет выглядеть следующим образом:

In [None]:
def serialize(df) -> DataFrame:        
    return(df
        .select(f.to_json(f.struct('restaurant_id', 'adv_campaign_id', 'adv_campaign_content', 'adv_campaign_owner', 'adv_campaign_owner_contact', 'adv_campaign_datetime_start', 'adv_campaign_datetime_end' , 'datetime_created' , 'client_id', 'trigger_datetime_created'))).alias('value'))

Запись сообщения в топик *push_message*:

In [None]:
df_kafka.write \
        .format("kafka") \
        .mode("append") \
        .option("topic", 'push_message')

Чтобы не создавать занаво объединённый датасет для отправки в два стока, перед добавлением в него атрибута *feedback* нужно сохранить его в память методом **persist()**

Итоговый код метода **foreachBatch(…)**:

In [None]:
def foreach_batch_function(joined_df, epoch_id):
    
    # сохраняем df в памяти, чтобы не создавать df заново перед отправкой в Kafka
    PersistDF = joined_df.persist()
    # создаём df для отправки в Kafka. Сериализация в json.
    df_kafka = serialize(PersistDF)

    # записываем df в PostgreSQL с полем feedback    
    Postgres_df = PersistDF.withColumn('feedback', f.lit(None))
    Postgres_df.write \
        .format("jdbc") \
        .mode("append") \
        .option("driver", "org.postgresql.Driver") \
        .save()

    # отправляем сообщения в результирующий топик Kafka без поля feedback
    df_kafka.write \
        .format("kafka") \
        .mode("append") \
        .option("topic", 'push_message') \
        
               
    # очищаем память от df
    PersistDF.unpersist()

Вызов данного метода происходит следующим образом:

In [None]:
query = joined_df.writeStream \
.foreachBatch(foreach_batch_function) \
.trigger(processingTime='60 seconds') \
.start() \
    
try:
    query.awaitTermination()
finally:
    query.stop()

## Выводы

1. Реализована возможность читать данные из Kafka с помощью Spark Structured Streaming и Python в режиме реального времени.
2. Написана фунция получения списка подписчиков из базы данных Postgres. 
3. Разработан процесс, осуществляющий в реальном времени поиска актуальных предложений для пользователей. 
4. Настроно автоматическое пополнение таблицы, необходимой для оценки обратной информации о блюдах от пользователей.