<div style="border:solid orange 3px; padding: 16px">  
    <font size="4">  
        <p style="text-align: center;">
            <b> 11. Автоматизация. Построение дашборда для анализа поведения пользователей Яндекс.Дзен</b>
        </p> 
    </font>
</div>

<a id="contents"></a>
### Оглавление:
0 [Описание проекта](#0)

I. [Построение дата-пайплайна](#1)

II. [Построение дашборда](#2)

III. [Презентация результатов](#3)


 <a id="0"></a>
### 0. Описание проекта
Аналитика в Яндекс.Дзене. Данные - отметки о взаимодействии пользователей с карточками статей (отображение, клик, просмотр статьи). Еженедельные типовые отчеты приводят к решению об автоматизации работы. 

Необходимо настроить еженедельное сохранение копии БД по расписанию и построить дашборд с необходимым фильтрами для отображения информации о поведении пользователей. 

<a id="1"></a>
### I. Построение дата-пайплайна 

Проект выполнялся в ОС Ubuntu 18. 
#### 0. Установим менеджер библиотек (pip) для python3. 
```bash
sudo apt update
sudo apt install python3-pip
```

#### 1. Установим необходимые библиотеки для Python:
```bash
sudo pip3 install pandas
sudo pip3 install numpy
sudo pip3 install sqlalchemy
sudo apt-get install python3-psycopg2
sudo pip3 install dash==1.6.1
```

#### 2. Для работы с БД (базами данных) установим PostgreSQL:
```bash
sudo apt install postgresql postgresql-contrib
```    

#### 3. Запустим сервер PostgreSQL:
```bash
sudo service postgresql start
service postgresql status
```
На экране отобразится сообщение: "10/main (port 5432): online". Это значит, что сервер работает успешно.   

#### 4. Восстановим БД `zen` из файла `zen1.dump`:

4.0. Скопируем файл дампа БД `zen1.dump` из `ПАПКИ_С_ФАЙЛОМ` в папку /tmp с помощью команды cp:	
```bash
cp /ПУТЬ_К_ПАПКЕ_С_ФАЙЛОМ/zen1.dump /tmp 
 ```
 
4.1. Перейдём в директорию /tmp
```bash
cd /tmp
```

4.2. Сменим пользователя на postgres:
```bash
sudo su postgres
```

4.3. Создадим БД zen:
```bash
    createdb zen --encoding='utf-8'
```

4.4. Подключимся к psql - клиенту СУБД Postgres:
```bash
    psql -d zen
```        
4.5. Создадим пользователя и дадим ему нужные права для доступа к базе:
```SQL
        CREATE USER my_user WITH ENCRYPTED PASSWORD 'my_user_password';
        GRANT ALL PRIVILEGES ON DATABASE zen TO my_user;
```

4.6. Выйдем из psql:
```bash
        \q        
```  

4.7. Восстановим БД zen из файла zen1.dump:
```bash
    pg_restore -d zen zen1.dump
```

4.8. Подключимся к psql:
``` bash   
    psql -d zen        
```  

4.9. Зададим права на таблицы, содержащиеся в БД zen: 
```SQL
        GRANT ALL PRIVILEGES ON TABLE log_raw TO my_user;
        GRANT ALL PRIVILEGES ON TABLE dash_engagement TO my_user;
        GRANT ALL PRIVILEGES ON TABLE dash_visits TO my_user;
```	

    И права для работы с первичными ключами этих таблиц:
    
```SQL
        GRANT USAGE, SELECT ON SEQUENCE log_raw_event_id_seq TO my_user;
        GRANT USAGE, SELECT ON SEQUENCE dash_engagement_record_id_seq TO my_user;
        GRANT USAGE, SELECT ON SEQUENCE dash_visits_record_id_seq TO my_user;
```

4.10. Выйдем из psql:
```bash
        \q
```

4.11. Отключимся от пользователя postgres:
```bash
    exit
```

#### 5. Создадим скрипт для автоматического сбора и агрегации данных `zen_pipeline.py`:

```python
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import getopt
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine

if __name__ == "__main__":

    # зададим входные параметры пайплана: начало и конец временного интервала 
    unixOptions = "s:e"  
    gnuOptions = ["start_dt=", "end_dt="] 

    fullCmdArguments = sys.argv
    argumentList = fullCmdArguments[1:]    

    try:  
        arguments, values = getopt.getopt(argumentList, unixOptions, gnuOptions)
    except getopt.error as err:
        print(str(err))
        sys.exit(2)

    # значения входных параметров будут передаваться при запуске скрипта
    start_dt = ''
    end_dt = ''  

    # считаем значения входных параметров в цикле
    for currentArgument, currentValue in arguments:  
        if currentArgument in ("-s", "--start_dt"):
            start_dt = currentValue                                   
        elif currentArgument in ("-e", "--end_dt"):
            end_dt = currentValue  

    # создадим словарь с параметрами для подключения к БД zen        
    db_config = {'user': 'my_user',
                 'pwd': 'my_user_password',
                 'host': 'localhost',
                 'port': 5432,
                 'db': 'zen'}

    # создадим строку для подключения в нужном формате                
    connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'], 
                                                             db_config['pwd'], 
                                                             db_config['host'], 
                                                             db_config['port'], 
                                                             db_config['db'])
    
    # подключимся к БД
    engine = create_engine(connection_string)
    
    # создадим строку с SQL-запросом, из таблицы log_raw 
    # (cырые данные о событиях взаимодействия пользователей с карточками) 
    # выберем все записи в пределах временного интервала между start_dt и end_dt 
    # переведем время из unix-формата в формат timestamp      
    query = ''' SELECT
                   event_id,
                   age_segment,
                   event,
                   item_id,
                   item_topic,
                   item_type,
                   source_id,
                   source_topic,
                   source_type,
                   TO_TIMESTAMP(ts/1000) AT TIME ZONE 'Etc/UTC' as dt,
                   user_id
                FROM log_raw
                WHERE TO_TIMESTAMP(ts/1000) AT TIME ZONE 'Etc/UTC' BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
            '''.format(start_dt, end_dt)

    # выполним SQL-запрос и запишем его результат в датафрейм 
    # названия столбцов будут такие же, как в БД (кроме 'ts', который переименовали в 'dt')
    # индексная колонка - event_id - уникальный идентификатор события        
    data_raw = pd.io.sql.read_sql(query, con = engine, index_col = 'event_id')

    # список столбцов со строковыми значениями
    columns_str = ['age_segment', 'event', 'item_topic', 'source_topic', 'item_type', 'source_type']
    
    # список столбцов с числовыми значениями
    columns_numeric = ['item_id', 'source_id', 'user_id']

    # приведем данные к нужным типам
    for column in columns_str: data_raw[column] = data_raw[column].astype(str)  
    for column in columns_numeric: data_raw[column] = pd.to_numeric(data_raw[column], errors='coerce')

    # округлим значения в столбце 'dt' до минут и приведем к типу datetime
    data_raw['dt'] = pd.to_datetime(data_raw['dt']).dt.round('min')

    # создадим агрегирующую таблицу истории событий
    # вычислим количество событий с группировкой по минутам, темам карточек, 
    # темам источников и возрастной группе
    dash_visits = data_raw.groupby(['item_topic', 'source_topic', 'age_segment', 'dt'])\
                          .agg({'user_id': 'count'})

    # переименуем столбец для соответствия с уже созданной агрегирующей таблицей
    dash_visits = dash_visits.rename(columns = {'user_id':'visits'})

    # заполним пропуски 0, и сбросим индексы
    dash_visits = dash_visits.fillna(0).reset_index()

    # создадим агрегирующую таблицу воронок
    # вычислим количество уникальных пользователей с группировкой по минутам, 
    # темам карточек, типам событий и возрастной группе
    dash_engagement = data_raw.groupby(['dt', 'item_topic', 'event', 'age_segment'])\
                              .agg({'user_id':'nunique'})

    # переименуем столбец для соответствия с уже созданной агрегирующей таблицей                          
    dash_engagement = dash_engagement.rename(columns = {'user_id':'unique_users'})

    # заполним пропуски 0, и сбросим индексы
    dash_engagement = dash_engagement.fillna(0).reset_index()

    # зададим словарь с парами значение 'имя таблицы' - таблица
    tables = {'dash_visits': dash_visits, 
              'dash_engagement': dash_engagement}
 
    # для каждой пары значений из словаря зададим SQL-запрос на удаление из таблицы тех записей
    # время которых находится в заданном интервале (для того, чтобы избежать возможного дублирования 
    # данных на этом интервале при повторном запуске скрипта с теми же временными рамками)
    for table_name, table_data in tables.items():
        
        query = '''
                 DELETE FROM {} WHERE dt BETWEEN '{}'::TIMESTAMP AND '{}'::TIMESTAMP
               '''.format(table_name, start_dt, end_dt)
        
        engine.execute(query)
        
        # запишем данные в агрегирующие таблицы в БД zen
        table_data.to_sql(name = table_name, con = engine, if_exists = 'append', index = False)

    print('All done.')
```

#### 6. Запустим скрипт пайплайна:

6.0. Скопируем файл скрипта `zen_pipeline.py` из `ПАПКИ_С_ФАЙЛОМ` в папку /tmp с помощью команды cp:	
```bash
cp /ПУТЬ_К_ПАПКЕ_С_ФАЙЛОМ/zen_pipeline.py /tmp
```

6.1. Запустим скрипт пайплайна для всего диапазона тестовых данных с `18:00:00` по `19:00:00` `2019-09-24`:
```bash
python3 zen_pipeline.py --start_dt ='2019-09-24 18:00:00' --end_dt='2019-09-24 19:00:00'
```

После успешного выполнения в окне командной строки отобразится запись "All done." 

В БД `zen` в таблицы `dash_visits` и `dash_engagement` будут записаны агрегированные данные. 

#### 7. Настроим автоматический запуск скрипта zen_pipeline, для этого будем использовать встроенный в Ubuntu планировщик задач cron: 

7.0. Создадим директорию для хранения логов (результатов выполнения скрипта):
```bash
mkdir /home/test_user/logs
```

7.1. Выполним команду вызова редактора расписания `cron`:
```bash	
crontab -e
```

В диалоге выбора текстового редактора нажмём 1, затем Enter (nano редактор для `crontab`).

7.2. В конце появившегося текста `crontab` (расписание работы программы `cron`) допишем строчку 
```bash
15 3 * * * python3 -u -W ignore /home/test_user/code/zen_pipeline.py --start_dt=$(date +\%Y-\%m-\%d\ 00:00:00 -d "1 day ago") --end_dt=$(date +\%Y-\%m-\%d\ 00:00:00) >> /home/test_user/logs/zen_$(date +\%Y-\%m-\%d -d "1 day ago").log 2>&1
```

Эта строчка означает следующее:
- каждый день в 03:15 нужно выполнить python-скрипт zen_pipeline.py (при этом нужно игнорировать предупреждения и не буферизовать результаты выполнения)
- скрипт находится по адресу /home/test_user/code/
- входные параметры для выполнения start_dt - день, предшествующий дню запуска скрипта и время '00:00:00'; end_dt - день запуска скрипта и время '00:00:00'
- для сохранения результата выполнения скрипта нужно создать файл, имя которого состоит из "zen_" и даты дня, предшествующего дню запуска скрипта (т.к. данные, согласно ТЗ, собираются за прошлые сутки)
- после успешного выполнения скрипта нужно сохранить все результаты работы (включая ошибки) в созданном файле в директории home/test_user/logs/.

	Допишем ещё одну пустую строчку. 

7.3. Сохраним изменения:
```bash
Ctrl + o
```

7.4. Выйдем из редактора crontab:
```bash
Ctrl + x 
```        

Теперь каждый день автоматически будет запускаться программа, которая агрегирует данные об активности пользователей за прошедшие сутки и дописывает эту информацию в исходную БД.

<a id="2"></a>
### II. Построение дашборда  [*(Оглавление)*](#contents)

Для отображения информации, собираемой и агрегируемой автоматически, создадим дашборд (интерактивный отчет).

#### 8. Создадим скрипт `zen_dash.py` для построения дашборда с помощью библиотеки `dash`: 

```python
#!/usr/bin/python
# -*- coding: utf-8 -*-

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine

if __name__ == "__main__":

    # создадим словарь с параметрами для подключения к БД zen        
    db_config = {'user': 'my_user',
                 'pwd': 'my_user_password',
                 'host': 'localhost',
                 'port': 5432,
                 'db': 'zen'}
      
    # создадим строку для подключения в нужном формате                
    connection_string = 'postgresql://{}:{}@{}:{}/{}'.format(db_config['user'], 
                                                             db_config['pwd'], 
                                                             db_config['host'], 
                                                             db_config['port'], 
                                                             db_config['db'])
    
    engine = create_engine(connection_string)
    
    # получим сырые данные из таблицы dash_visits
    query = '''
               SELECT * FROM dash_visits
           '''
    
    # запишем все строки из таблицы dash_visits в одноименный датафрейм       
    dash_visits = pd.io.sql.read_sql(query, con = engine)
    
    # приведем данные в столбце 'dt' к типу datetime      
    dash_visits['dt'] = pd.to_datetime(dash_visits['dt'])

    # получим сырые данные из таблицы dash_engagement
    query = '''
               SELECT * FROM dash_engagement
           '''

    # запишем все строки из таблицы dash_engagement в одноименный датафрейм
    dash_engagement = pd.io.sql.read_sql(query, con = engine)

    # приведем данные в столбце 'dt' к типу datetime  
    dash_engagement['dt'] = pd.to_datetime(dash_engagement['dt'])

    # описание дашборда
    note = '''
              Этот дашборд показывает историю взаимодействия пользователей с карточками, 
              разбивку событий по темам источников и среднюю глубину взаимодействия пользователя с системой.
              Можно выбрать временной интервал, темы источника и возрастную категорию пользователей.  
           '''
    
    # зададим лейаут
    external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

    app = dash.Dash(__name__, external_stylesheets=external_stylesheets, compress=False)

    app.layout = html.Div(children=[

    # заголовок дашборда
    html.H1(children = 'Анализ взаимодействия пользователей с карточками Яндекс.Дзен '),
    
    html.Br(),
    
    # описание дашборда
    html.Label(note),    
    
    html.Br(), 
        
    # структура дашборда с размерами
    html.Div([  
        
        html.Div([ 
            # фильтр даты         
            html.Label('Выбор даты:'),
            dcc.DatePickerRange(
                start_date = dash_visits['dt'].min(), 
                end_date = dash_visits['dt'].max(),
                display_format = 'YYYY-MM-DD',
                id = 'dt_selector',       
            ),

            html.Br(),
            html.Br(),
       
            # фильтр возрастных категорий
            html.Label('Выбор возрастных категорий:'),
            dcc.Dropdown(
                options = [{'label': x, 'value': x} for x in dash_visits['age_segment'].unique()],
                value = dash_visits['age_segment'].unique().tolist(),
                multi = True,
                id = 'age-dropdown'
            ), 

            html.Br(),

            # график истории событий по темам карточек  
            html.Label('График истории событий по темам карточек'), 
            dcc.Graph(
                style = {'height': '50vw'},
                id = 'history-absolute-visits'
            ),
                         
        ], className = 'six columns'),    
        
        html.Div([ 
            # фильтр тем карточек  
            html.Label('Выбор темы карточек:'), 
            dcc.Dropdown(
                options = [{'label': x, 'value': x} for x in dash_visits['item_topic'].unique()],
                value = dash_visits['item_topic'].unique().tolist(),
                multi = True,
                id = 'item-topic-dropdown'
            ),
            
            html.Br(),
        
            # график разбивки событий по темам источников
            html.Label('График разбивки событий по темам источников'), 
            dcc.Graph(
                style = {'height': '25vw'},
                id = 'pie-visits'
            ),
                     
            # график средней глубины взаимодействия
            html.Label('График средней глубины взаимодействия пользователей'), 
            dcc.Graph(
                style = {'height': '25vw'},
                id = 'engagement-graph'
            ),
                         
        ], className = 'six columns'),

    ], className = 'row'),           
     
    ])


    # описываем логику дашборда
    # декоратор для функции update_figures с описанием того,
    # от каких элементов управления будут поступать входные сигналы
    # и какие элементы дашборда нужно обновить после выполнения функции   
    @app.callback(
      [Output('history-absolute-visits', 'figure'),
       Output('pie-visits', 'figure'),
       Output('engagement-graph', 'figure')
      ],
      [Input('item-topic-dropdown', 'value'),
       Input('age-dropdown', 'value'),
       Input('dt_selector', 'start_date'),
       Input('dt_selector', 'end_date')
      ])

    # функция для отрисовки графиков при изменении входных параметров (на фильтрах)    
    # порядок задания входных и выходных параметров соответствует порядку их записи в декораторе
    def update_figures(selected_item_topics, selected_ages, start_date, end_date):

        # для графика истории событий по темам карточек 
        # фильтрация событий по времени 
        filtered = dash_visits.query('dt >= @start_date and dt <= @end_date')

        # фильтрация по выбранным темам карточек
        filtered = filtered.query('item_topic in @selected_item_topics')

        # фильтрация данных по выбранным возрастным категориям пользователей
        filtered = filtered.query('age_segment in @selected_ages')

        # для графика разбивки событий по темам источника
        # фильтрация событий по времени
        filtered_topic = dash_visits.query('dt >= @start_date and dt <= @end_date')

        # фильтрация по выбранным темам карточек 
        filtered_topic = filtered_topic.query('item_topic in @selected_item_topics')
        

        # для графика взаимодействия пользователей с системой
        # фильтрация событий по времени
        filtered_interactions = dash_engagement.query('dt >= @start_date and dt <= @end_date')
        

        # группировка по теме карточки и времени
        history_by_items = filtered.groupby(['item_topic', 'dt'])\
                                   .agg({'visits': 'sum'})\
                                   .reset_index()\
                                   .sort_values(by ='visits', ascending = False) # сортировка по убыванию

        # график истории событий по темам карточек
        data_by_items = []
        
        # для каждой темы карточки из выбранных в фмльтре  
        for item_topic in history_by_items['item_topic'].unique():

            # выберем количество событий с этой темой карточки
            current = history_by_items[history_by_items['item_topic'] == item_topic]

            # построим линейный график с накоплением 'количество событий в минуту - время'
            data_by_items += [go.Scatter(x = current['dt'],
                                         y = current['visits'],
                                         mode = 'lines',
                                         stackgroup = 'one',
                                         name = item_topic)]

        # группировка количества событий по темам источников
        report = filtered_topic.groupby(['source_topic'])\
                               .agg({'visits': 'sum'})\
                               .reset_index()
        
        # график разбивки количества событий по темам источников    
        data_by_topic = [go.Pie(labels = report['source_topic'],
                                values = report['visits'])]
        

        # группировка среднего количества уникальных пользователей по типу события ('show' - 'click' - 'view')
        report_event = filtered_interactions.groupby(['event'])\
                                            .agg({'unique_users': 'mean'})\
                                            .reset_index()\
                                            .sort_values(by ='unique_users', ascending = False) # сортировка по убыванию
        

        # приведение к относительным показателям (относительно общего количества показов) в процентах
        report_event['avg_unique_users'] = report_event['unique_users'] *100 / report_event['unique_users'].max()                                                 
                            
        # график средней глубины взаимодействия
        funnel_interactions = [go.Bar(x = report_event['event'],
                                      y = report_event['avg_unique_users'], 
                                      text = report_event['avg_unique_users'].round(1),
                                      textposition = 'auto')]

        #сформируем результат для отображения
        return (
                  {
                      'data': data_by_items,
                      'layout': go.Layout(xaxis = {'title': 'Дата и время'},
                                          yaxis = {'title': 'Количество событий по темам'})
                   },            
                  {
                      'data': data_by_topic,
                      'layout': go.Layout()
                   },
                  {
                      'data': funnel_interactions,
                      'layout': go.Layout(xaxis = {'title': 'Событие'},
                                          yaxis = {'title': '% событий'})
                   },            
        
        )

if __name__ == '__main__':
    app.run_server(debug = True, host='0.0.0.0', port=3000)
```

#### 9. Запуск дашборда на локальной машине:

9.0. Скопируем файл скрипта дашборда `zen_dash.py` из `ПАПКИ_С_ФАЙЛОМ` в папку /tmp с помощью команды cp:	
```bash
cp /ПУТЬ_К_ПАПКЕ_С_ФАЙЛОМ/zen_dash.py /tmp
```

9.1. Запустим скрипт дашборда:
```bash
python3 zen_dash.py
```

9.2. Локальная версия дашборда станет доступна в браузере по адресу:

http://127.0.0.1:8050/ 

9.3. Остановка программы дашборда:
```bash
Ctrl + c
```

#### 10. Запуск дашборда на виртуальной (удаленной) машине:

10.0. Отправим файл со скриптом дашборда `zen_dash.py` на виртуальную машину, используя безопасное копирование через ssh:
```bash
scp <ПУТЬ_К_ФАЙЛУ>/zen_dash.py <ЛОГИН>@<публичный_IP-адрес_виртуальной машины>:
```

10.1. Подключимся к виртуальной машине с использованием команды ssh:
```bash
ssh <ЛОГИН>@<публичный_IP-адрес_виртуальной машины>
```

10.2. Скопируем файл скрипта дашборда `zen_dash.py` из корневого каталога пользователя виртуальной машины в папку /tmp с помощью команды cp:
```bash
cp zen_dash.py /tmp
```

10.3. Запустим скрипт дашборда:
```bash
python3 /tmp/zen_dash.py
```

10.4. Публичная версия дашборда станет доступна в браузере по адресу:

http://<публичный_IP-адрес_виртуальной машины>:8050/

10.5. Остановка программы дашборда:
```bash
Ctrl + c
```

<a id="3"></a>
### III. Презентация результатов  [*(Оглавление)*](#contents)

Презентация размещена здесь:

https://disk.yandex.ru/i/Paa6b1RPM6KW-w