In [1]:
import requests
import pandas as pd
from io import StringIO
import datetime
import json
from urllib.parse import urlencode
import time

# Получение данных через `Logs API` и загрузка в `ClickHouse`
## Logs API

`Logs API` позволяет выгрузить сырые данные со счетчика.

Документация по `Logs API` - https://yandex.ru/dev/metrika/doc/api2/logs/intro.html

Данные для этого кейса также доступны на Яндекс.Диске - https://disk.yandex.ru/d/sUmQmh_MnQWL4g?w=1

### Шаг 1: получаем токен
Для работы с API необходимо получить свой токен - https://yandex.ru/dev/oauth/doc/dg/tasks/get-oauth-token.html

Создаем приложение тут (указываем права для чтения в Яндекс.Метрике) - https://oauth.yandex.ru/client/new

Переходим по ссылке вида - `https://oauth.yandex.ru/authorize?response_type=token&client_id=<идентификатор приложения>`

Полученный токен можно сохранить в домашнюю директорию в файл `.yatoken.txt`

In [2]:
TOKEN = open('../.yatoken.txt').read().strip()

### Шаг 2: проверяем, можно ли создать запрос в Logs API

In [3]:
API_HOST = 'https://api-metrika.yandex.ru'
COUNTER_ID = 73226638
START_DATE = '2020-07-01'
END_DATE = '2020-09-30'
SOURCE = 'hits'
API_FIELDS = ('ym:pv:date', 'ym:pv:dateTime', 'ym:pv:URL', 'ym:pv:deviceCategory', 
         'ym:pv:operatingSystemRoot', 'ym:pv:clientID', 'ym:pv:browser', 'ym:pv:lastTrafficSource')


In [4]:
header_dict = {'Authorization': f'OAuth {TOKEN}',
'Content-Type': 'application/x-yametrika+json'
}

In [5]:
url_params = urlencode(
    [
        ('date1', START_DATE),
        ('date2', END_DATE),
        ('source', SOURCE),
        ('fields', ','.join(API_FIELDS))
    ]
)

url = '{host}/management/v1/counter/{counter_id}/logrequests/evaluate?'\
    .format(host=API_HOST, counter_id=COUNTER_ID) + url_params

r = requests.get(url, headers = header_dict)

In [6]:
r.status_code

200

In [7]:
json.loads(r.text)['log_request_evaluation']

{'possible': True, 'max_possible_day_quantity': 2045}

### Шаг 3: создаем запрос

In [8]:
url_params = urlencode(
    [
        ('date1', START_DATE),
        ('date2', END_DATE),
        ('source', SOURCE),
        ('fields', ','.join(sorted(API_FIELDS, key=lambda s: s.lower())))
    ]
)
url = '{host}/management/v1/counter/{counter_id}/logrequests?'\
    .format(host=API_HOST,
            counter_id=COUNTER_ID) \
      + url_params

r = requests.post(url, headers=header_dict)

In [9]:
r.status_code

200

In [10]:
json.loads(r.text)['log_request']

{'request_id': 15563503,
 'counter_id': 73226638,
 'source': 'hits',
 'date1': '2020-07-01',
 'date2': '2020-09-30',
 'fields': ['ym:pv:browser',
  'ym:pv:clientID',
  'ym:pv:date',
  'ym:pv:dateTime',
  'ym:pv:deviceCategory',
  'ym:pv:lastTrafficSource',
  'ym:pv:operatingSystemRoot',
  'ym:pv:URL'],
 'status': 'created',
 'attribution': 'LASTSIGN'}

In [11]:
request_id = json.loads(r.text)['log_request']['request_id']

In [12]:
request_id

15563503

### Шаг 4: ждем окончания обработки

In [13]:
status = 'created'
while status == 'created':
    time.sleep(60)
    print('trying')
    url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}' \
            .format(request_id=request_id,
                    counter_id=COUNTER_ID,
                    host=API_HOST)

    r = requests.get(url, headers=header_dict)
    if r.status_code == 200:
        status = json.loads(r.text)['log_request']['status']
        print(json.dumps(json.loads(r.text)['log_request'], indent = 4))
    else:
        raise(BaseException(r.text))

trying
{
    "request_id": 15563503,
    "counter_id": 73226638,
    "source": "hits",
    "date1": "2020-07-01",
    "date2": "2020-09-30",
    "fields": [
        "ym:pv:browser",
        "ym:pv:clientID",
        "ym:pv:date",
        "ym:pv:dateTime",
        "ym:pv:deviceCategory",
        "ym:pv:lastTrafficSource",
        "ym:pv:operatingSystemRoot",
        "ym:pv:URL"
    ],
    "status": "processed",
    "size": 348454420,
    "parts": [
        {
            "part_number": 0,
            "size": 348454420
        },
        {
            "part_number": 1,
            "size": 0
        },
        {
            "part_number": 2,
            "size": 0
        },
        {
            "part_number": 3,
            "size": 0
        },
        {
            "part_number": 4,
            "size": 0
        },
        {
            "part_number": 5,
            "size": 0
        },
        {
            "part_number": 6,
            "size": 0
        }
    ],
    "attribution": "LAS

In [14]:
json.loads(r.text)['log_request']

{'request_id': 15563503,
 'counter_id': 73226638,
 'source': 'hits',
 'date1': '2020-07-01',
 'date2': '2020-09-30',
 'fields': ['ym:pv:browser',
  'ym:pv:clientID',
  'ym:pv:date',
  'ym:pv:dateTime',
  'ym:pv:deviceCategory',
  'ym:pv:lastTrafficSource',
  'ym:pv:operatingSystemRoot',
  'ym:pv:URL'],
 'status': 'processed',
 'size': 348454420,
 'parts': [{'part_number': 0, 'size': 348454420},
  {'part_number': 1, 'size': 0},
  {'part_number': 2, 'size': 0},
  {'part_number': 3, 'size': 0},
  {'part_number': 4, 'size': 0},
  {'part_number': 5, 'size': 0},
  {'part_number': 6, 'size': 0}],
 'attribution': 'LASTSIGN'}

In [15]:
parts = json.loads(r.text)['log_request']['parts']
parts

[{'part_number': 0, 'size': 348454420},
 {'part_number': 1, 'size': 0},
 {'part_number': 2, 'size': 0},
 {'part_number': 3, 'size': 0},
 {'part_number': 4, 'size': 0},
 {'part_number': 5, 'size': 0},
 {'part_number': 6, 'size': 0}]

### Шаг 5: выгружаем данные

In [16]:
tmp_dfs = []
for part_num in map(lambda x: x['part_number'], parts):
    url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}/part/{part}/download' \
            .format(
                host=API_HOST,
                counter_id=COUNTER_ID,
                request_id=request_id,
                part=part_num
            )

    r = requests.get(url, headers=header_dict)
    if r.status_code == 200:
        tmp_df = pd.read_csv(StringIO(r.text), sep = '\t')
        tmp_dfs.append(tmp_df)
    else:
        raise(BaseError(r.text))
        
logs_df = pd.concat(tmp_dfs)

In [17]:
logs_df.shape

(3037737, 8)

In [18]:
logs_df.to_csv('metrika_cloud_case_data_hits.csv', sep = '\t', index = False)

### Шаг 6: то же самое но для визитов

In [19]:
SOURCE = 'visits'
API_FIELDS = ('ym:s:date', 'ym:s:dateTime', 'ym:s:startURL', 'ym:s:deviceCategory', 
         'ym:s:operatingSystemRoot', 'ym:s:clientID', 'ym:s:browser', 'ym:s:lastTrafficSource', 'ym:s:purchaseRevenue', 'ym:s:purchaseID')


In [20]:
url_params = urlencode(
    [
        ('date1', START_DATE),
        ('date2', END_DATE),
        ('source', SOURCE),
        ('fields', ','.join(sorted(API_FIELDS, key=lambda s: s.lower())))
    ]
)
url = '{host}/management/v1/counter/{counter_id}/logrequests?'\
    .format(host=API_HOST,
            counter_id=COUNTER_ID) \
      + url_params

r = requests.post(url, headers=header_dict)

In [21]:
r.status_code

200

In [22]:
json.loads(r.text)['log_request']

{'request_id': 15563533,
 'counter_id': 73226638,
 'source': 'visits',
 'date1': '2020-07-01',
 'date2': '2020-09-30',
 'fields': ['ym:s:browser',
  'ym:s:clientID',
  'ym:s:date',
  'ym:s:dateTime',
  'ym:s:deviceCategory',
  'ym:s:lastTrafficSource',
  'ym:s:operatingSystemRoot',
  'ym:s:purchaseID',
  'ym:s:purchaseRevenue',
  'ym:s:startURL'],
 'status': 'created',
 'attribution': 'LASTSIGN'}

In [23]:
request_id = json.loads(r.text)['log_request']['request_id']

In [24]:
request_id

15563533

In [25]:
status = 'created'
while status == 'created':
    time.sleep(60)
    print('trying')
    url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}' \
            .format(request_id=request_id,
                    counter_id=COUNTER_ID,
                    host=API_HOST)

    r = requests.get(url, headers=header_dict)
    if r.status_code == 200:
        status = json.loads(r.text)['log_request']['status']
        print(json.dumps(json.loads(r.text)['log_request'], indent = 4))
    else:
        raise(BaseException(r.text))

trying
{
    "request_id": 15563533,
    "counter_id": 73226638,
    "source": "visits",
    "date1": "2020-07-01",
    "date2": "2020-09-30",
    "fields": [
        "ym:s:browser",
        "ym:s:clientID",
        "ym:s:date",
        "ym:s:dateTime",
        "ym:s:deviceCategory",
        "ym:s:lastTrafficSource",
        "ym:s:operatingSystemRoot",
        "ym:s:purchaseID",
        "ym:s:purchaseRevenue",
        "ym:s:startURL"
    ],
    "status": "processed",
    "size": 54667983,
    "parts": [
        {
            "part_number": 0,
            "size": 54667983
        },
        {
            "part_number": 1,
            "size": 0
        },
        {
            "part_number": 2,
            "size": 0
        }
    ],
    "attribution": "LASTSIGN"
}


In [26]:
json.loads(r.text)['log_request']

{'request_id': 15563533,
 'counter_id': 73226638,
 'source': 'visits',
 'date1': '2020-07-01',
 'date2': '2020-09-30',
 'fields': ['ym:s:browser',
  'ym:s:clientID',
  'ym:s:date',
  'ym:s:dateTime',
  'ym:s:deviceCategory',
  'ym:s:lastTrafficSource',
  'ym:s:operatingSystemRoot',
  'ym:s:purchaseID',
  'ym:s:purchaseRevenue',
  'ym:s:startURL'],
 'status': 'processed',
 'size': 54667983,
 'parts': [{'part_number': 0, 'size': 54667983},
  {'part_number': 1, 'size': 0},
  {'part_number': 2, 'size': 0}],
 'attribution': 'LASTSIGN'}

In [27]:
parts = json.loads(r.text)['log_request']['parts']
parts

[{'part_number': 0, 'size': 54667983},
 {'part_number': 1, 'size': 0},
 {'part_number': 2, 'size': 0}]

In [28]:
tmp_dfs = []
for part_num in map(lambda x: x['part_number'], parts):
    url = '{host}/management/v1/counter/{counter_id}/logrequest/{request_id}/part/{part}/download' \
            .format(
                host=API_HOST,
                counter_id=COUNTER_ID,
                request_id=request_id,
                part=part_num
            )

    r = requests.get(url, headers=header_dict)
    if r.status_code == 200:
        tmp_df = pd.read_csv(StringIO(r.text), sep = '\t')
        tmp_dfs.append(tmp_df)
    else:
        raise(BaseError(r.text))
        
visits_df = pd.concat(tmp_dfs)

In [29]:
visits_df.shape

(451593, 10)

In [30]:
visits_df.to_csv('metrika_cloud_case_data_visits.csv', sep = '\t', index = False)

### Что делать Если не получилось вытянуть данные через `LogsAPI` ?
* Данные можно скачать по ссылке. https://disk.yandex.ru/d/sUmQmh_MnQWL4g?w=1
* Положить их с текущую директорию.
* Потом раскомментировать нижние две строчки
* Проинтерпретировать их и двиунться дальше


In [None]:
# logs_df =  pd.read_csv('metrika_cloud_case_data_hits.csv', sep = '\t')
# visits_df = pd.read_csv('metrika_cloud_case_data_visits.csv', sep = '\t')

## ClickHouse
### Подключение и настройка
https://cloud.yandex.ru/docs/managed-clickhouse/
(см. слайды)

### Данные для доступа
* Из интерфейса облака в разделе хосты копируем имя хост в переменную `CH_HOST` вида `'https://{ИМЯ_ХОСТА}.mdb.yandexcloud.net:8443'`
* Используем заведенного юзера в переменной `CH_USER`
* Сохраним пароль заведенного пользователя в текстовый файл `.chpass.txt`
* В переменную `CH_PASS` считаем содержимое файла `.chpass.txt`
* В переменную `cacert` поместим путь к сертификату для подключения к серверу. Файл `YandexInternalRootCA.crt` должен лежать в репозитории

In [32]:
CH_HOST = 'https://rc1a-rt82aoo61ovjyuxf.mdb.yandexcloud.net:8443'
CH_USER = 'z_sergey'
CH_PASS = open('../.chpass.txt').read().strip()

cacert = 'YandexInternalRootCA.crt'

### Проверяем ClickHouse
Используя заговленные выше переменные проверим доступ до сервера (как в документации https://cloud.yandex.ru/docs/managed-clickhouse/operations/connect#connection-string)

При успешном подключении не произойдет никакой ошибки при выполнении кода ниже, а в `rs.text` будет содержаться версия сервера ClickHouse (например `20.8.12.2`)

In [33]:
url = '{host}/?database={db}&query={query}'.format(
        host=CH_HOST,
        db='default',
        query='SELECT version()')

auth = {
        'X-ClickHouse-User': CH_USER,
        'X-ClickHouse-Key': CH_PASS,
    }

rs = requests.get(url, headers=auth, verify=cacert)
# 
rs.raise_for_status()

print(rs.text)


20.8.12.2



###  Функции для интеграции с ClickHouse

В файле `some_funcs` есть класс `simple_ch_client` c 3 функциями
* get_clickhouse_data
* get_clickhouse_df
* upload

Сначала надо создать экземпляр класса, инициализировав его начальными параметрами - хост, пользователь, пароль и путь к сертификату

In [34]:
import some_funcs
from some_funcs import simple_ch_client

In [35]:
my_client = simple_ch_client(CH_HOST, CH_USER, CH_PASS, cacert)

### Загружаем данные

In [36]:
logs_df.head()

Unnamed: 0,ym:pv:browser,ym:pv:clientID,ym:pv:date,ym:pv:dateTime,ym:pv:deviceCategory,ym:pv:lastTrafficSource,ym:pv:operatingSystemRoot,ym:pv:URL
0,chromemobile,5415548024442348949,2020-07-13,2020-07-13 07:46:49,2,referral,android,https://supermarket.ru/info
1,edge,17620899721795923553,2020-07-13,2020-07-13 12:19:32,1,ad,windows,https://supermarket.ru/product_7400
2,edge,17620899721795923553,2020-07-13,2020-07-13 12:19:49,1,ad,windows,https://supermarket.ru/product_7400
3,chrome,7926790720828563502,2020-07-13,2020-07-13 14:21:03,1,referral,windows,https://supermarket.ru/catalog_category_574
4,chrome,7926790720828563502,2020-07-13,2020-07-13 14:21:18,1,direct,windows,https://supermarket.ru/catalog_category_574


In [37]:
logs_df.rename(columns={'ym:pv:browser':'Browser',
                'ym:pv:clientID':'ClientID',
                'ym:pv:date':'EventDate',
                'ym:pv:dateTime':'EventTime',
                'ym:pv:deviceCategory':'DeviceCategory',
                'ym:pv:lastTrafficSource':'TraficSource',
                'ym:pv:operatingSystemRoot':'OSRoot',
                'ym:pv:URL':'URL'}, inplace = True)

In [38]:
q = 'drop table if exists metrica_data.hits '
my_client.get_clickhouse_data(q)

q = '''
create table metrica_data.hits (
    Browser String,
    ClientID UInt64,
    EventDate Date,
    EventTime DateTime,
    DeviceCategory String,
    TraficSource String,
    OSRoot String,
    URL String
) ENGINE = MergeTree(EventDate, intHash32(ClientID), (EventDate, intHash32(ClientID)), 8192)
'''

my_client.get_clickhouse_data(q)

''

In [39]:
my_client.upload(
    'metrica_data.hits',
    logs_df.to_csv(index = False, sep = '\t'))

''

In [40]:
visits_df.shape

(451593, 10)

In [41]:
q = 'drop table if exists metrica_data.visits '
my_client.get_clickhouse_data(q)

q = '''
create table metrica_data.visits (
    Browser String,
    ClientID UInt64,
    StartDate Date,
    StartTime DateTime,
    DeviceCategory UInt8,
    TraficSource String,
    OSRoot String,
    Purchases Int32,
    Revenue Double,
    StartURL String
) ENGINE = MergeTree(StartDate, intHash32(ClientID), (StartDate, intHash32(ClientID)), 8192)
'''

my_client.get_clickhouse_data(q)

''

In [42]:
visits_df.rename(columns={'ym:s:browser':'Browser',
                'ym:s:clientID':'ClientID',
                'ym:s:date':'StartDate',
                'ym:s:dateTime':'StartTime',
                'ym:s:deviceCategory':'DeviceCategory',
                'ym:s:lastTrafficSource':'TraficSource',
                'ym:s:operatingSystemRoot':'OSRoot',
                'ym:s:purchaseRevenue': 'Purchase.Revenue', 
                'ym:s:purchaseID': 'Purchase.ID',
                'ym:s:startURL':'StartURL'}, inplace = True)

In [43]:
visits_df['Purchases'] = visits_df['Purchase.Revenue'].map(lambda x:x.count(','))
visits_df['Revenue'] = visits_df['Purchase.Revenue'].map(lambda x: sum(map(int,x[1:-1].split(','))) if x != '[]' else 0)


In [44]:
visits_df.drop(columns=['Purchase.ID','Purchase.Revenue'], inplace=True)

In [45]:
my_client.upload(
    'metrica_data.visits',
    visits_df.to_csv(sep='\t', index =False))

''

### Готово, можно переходить к следующему NoteBook'у `fin_with_funnels.ipynb`