In [1]:
import requests
import psycopg2
import petl as etl
import numpy as np

from datetime import datetime as dt

# Библиотека PETL

### Загрузка данных
`petl` поддерживает различные источники данных, мы рассмотрим следующие:
- Загрузка из xlsx-файла  
- Использование открытых источников через API
- Работа с базой данных

### Данные из xlsx-файла

Рассмотрим работу с `petl` на наборе результатов летних олимпиад по странам. Нам понадобится файл `datasets/summer_olympics.xlsx`, посмотрим на первые строки, пока не сохраняя таблицу в переменную.

In [2]:
etl.fromxlsx('datasets/summer_olympics.xlsx').head(3)

0,1,2,3,4
,? Summer,01 !,02 !,03 !
Afghanistan (AFG),13,0,0,2
Algeria (ALG),12,5,2,8


Видим, что данные загрузились без ошибок, однако в качестве заголовков должна использоваться первая строка данных. Исправим это, используя функцию `skip`

In [3]:
xlsx_df = etl.fromxlsx('datasets/summer_olympics.xlsx').skip(1)

In [4]:
xlsx_df.head(3)

Unnamed: 0,? Summer,01 !,02 !,03 !
Afghanistan (AFG),13,0,0,2
Algeria (ALG),12,5,2,8
Argentina (ARG),23,18,24,28


Теперь заголовки у столбцов корректные, однако не достаточно информативны, исправим это, задав заголовки в ручную.

In [5]:
new_header = [
    'country',
    'games',
    'gold',
    'silver',
    'bronze',
]
xlsx_df = xlsx_df.setheader(new_header)
xlsx_df.head()

country,games,gold,silver,bronze
Afghanistan (AFG),13,0,0,2
Algeria (ALG),12,5,2,8
Argentina (ARG),23,18,24,28
Armenia (ARM),5,1,2,9
Australasia (ANZ) [ANZ],2,3,4,5


In [6]:
xlsx_df.sort('gold', reverse=True)

country,games,gold,silver,bronze
United States (USA) [P] [Q] [R] [Z],26,976,757,666
Estonia (EST),11,9,9,15
India (IND) [F],23,9,6,11
Ireland (IRL),20,9,8,12
Romania (ROU),20,88,94,119


Теперь мы можем посчитать общее количество медалей `addfield`

In [7]:
xlsx_df.addfield('total', lambda x: x['gold'] + x['silver'] + x['bronze']).head(5)

country,games,gold,silver,bronze,total
Afghanistan (AFG),13,0,0,2,2
Algeria (ALG),12,5,2,8,528
Argentina (ARG),23,18,24,28,182428
Armenia (ARM),5,1,2,9,129
Australasia (ANZ) [ANZ],2,3,4,5,345


Вместо того, чтобы получить суммы, мы просто склеили значения. Чтобы такого не происходило, будем переводить значения в целочисленные. Выясним, какая страна смогла набрать наибольшее число золотых медалей, отсортировав столбец gold по убыванию, с помощью функции `sort`

In [8]:
xlsx_df = xlsx_df.\
    addfield('total', lambda x: int(x['gold']) + int(x['silver']) + int(x['bronze']))
xlsx_df.sort('gold', reverse=True).head(5)

country,games,gold,silver,bronze,total
United States (USA) [P] [Q] [R] [Z],26,976,757,666,2399
Estonia (EST),11,9,9,15,33
India (IND) [F],23,9,6,11,26
Ireland (IRL),20,9,8,12,29
Romania (ROU),20,88,94,119,301


Видим, что в таблице есть сумма по всем странам, что нас не интересует в данной задаче. Можем выбрать из таблицы все строки, кроме строки со значением `country == Totals`. Воспользуемся функцией `select`.  

Кроме того, выясним результативность страны, получив среднее число медалей за игру, и отсортируем по этому значению, чтобы выявить топ-5 стран.

In [9]:
result_xlsx_df = xlsx_df.\
    select(lambda x: x.country != 'Totals').\
    addfield('effectiveness', lambda x: round(x['total'] / float(x['games']), 2)).\
    sort('effectiveness', reverse=True)
result_xlsx_df.head(5)

country,games,gold,silver,bronze,total,effectiveness
Soviet Union (URS) [URS],9,395,319,296,1010,112.22
Unified Team (EUN) [EUN],1,45,38,29,112,112.0
United States (USA) [P] [Q] [R] [Z],26,976,757,666,2399,92.27
East Germany (GDR) [GDR],5,153,129,127,409,81.8
Russia (RUS) [RUS],5,132,121,142,395,79.0


Сохраним полученные результаты в новый xlsx-файл.

In [10]:
result_xlsx_df.toxlsx('output/olympics.xlsx')

### Данные из открытого источника рынка акций

In [11]:
js = requests.get('https://www.quandl.com/api/v3/datasets/WIKI/AAPL.json?start_date=2017-05-01&end_date=2017-07-01')

In [12]:
stock_prices_json = js.json()

In [13]:
stock_prices_json['dataset'].keys()

dict_keys(['id', 'dataset_code', 'database_code', 'name', 'description', 'refreshed_at', 'newest_available_date', 'oldest_available_date', 'column_names', 'frequency', 'type', 'premium', 'limit', 'transform', 'column_index', 'start_date', 'end_date', 'data', 'collapse', 'order', 'database_id'])

Нас интересуют два поля ответа: `column_names`, который мы будем использовать в качестве заголовков таблицы, и `data`, содержащий все необходимые данные. Для преобразования данных из объекта `dict` в таблицу `petl` сделаем следующее:  
- Транспонируем содержимое `data`, чтобы превратить строки в столбцы  
- Используем `column_names` в качестве значения параметра `header` функции `fromcolumns`

In [26]:
etl.fromdicts(stock_prices_json['dataset']['data'], header=stock_prices_json['dataset']['column_names'])

Date,Open,High,Low,Close,Volume,Ex-Dividend,Split Ratio,Adj. Open,Adj. High,Adj. Low,Adj. Close,Adj. Volume
,,,,,,,,,,,,
,,,,,,,,,,,,
,,,,,,,,,,,,
,,,,,,,,,,,,
,,,,,,,,,,,,


In [14]:
columns = np.transpose(stock_prices_json['dataset']['data'])
header = stock_prices_json['dataset']['column_names']
df = etl.fromcolumns(columns, header)

df.head(3)

Date,Open,High,Low,Close,Volume,Ex-Dividend,Split Ratio,Adj. Open,Adj. High,Adj. Low,Adj. Close,Adj. Volume
2017-06-30,144.45,144.96,143.78,144.02,22328979.0,0.0,1.0,143.8662700449,144.3742091084,143.19897754971,143.43800769724,22328979.0
2017-06-29,144.71,145.13,142.28,143.68,31116980.0,0.0,1.0,144.12521937139,144.54352212957,141.70503912765,143.09938165491,31116980.0
2017-06-28,144.49,146.11,143.1601,145.83,21915939.0,0.0,1.0,143.90610840282,145.51956189865,142.58158259782,145.2406933932,21915939.0


Уберём часть столбцов, все, содержащие `'Adj'`, переведём все значения в числа (где это возможно), вычислим разницу курса на определённую дату

In [15]:
result_stock = df.\
    cutout(*(x for x in df.fieldnames() if 'Adj' in x)).\
    convertnumbers().\
    addfield('Difference', lambda row: round(row.Close - row.Open, 2))

result_stock.head()

Date,Open,High,Low,Close,Volume,Ex-Dividend,Split Ratio,Difference
2017-06-30,144.45,144.96,143.78,144.02,22328979.0,0.0,1.0,-0.43
2017-06-29,144.71,145.13,142.28,143.68,31116980.0,0.0,1.0,-1.03
2017-06-28,144.49,146.11,143.1601,145.83,21915939.0,0.0,1.0,1.34
2017-06-27,145.01,146.16,143.62,143.74,24423643.0,0.0,1.0,-1.27
2017-06-26,147.17,148.28,145.38,145.82,25524661.0,0.0,1.0,-1.35


Сохраним полученную табличку в csv-файл.

In [16]:
result_stock.tocsv('output/stock.csv')

### Данные из БД

Рассмотрим следующий пример.  
Доступны данные о состояниях различных типов транспортных средств. В базе есть 2 таблицы:
- `status_ts` содержит информацию о состояниях различных ТС  
- `ts_types` содержит наименования типов ТС  

Необходимо подготовить таблицу, содержащую валидные данные по бульдозерам:
- В данных не должно быть пропусков  
- Время указано в формате datetime  
- Кроме данных по бульдозерам других нет  
- Все состояния, кроме отсутствия данных  
- Для каждого состояния рассчитана продолжительность

In [17]:
connection = psycopg2.connect("dbname=etl user=etl password=xzdp9PGRLTTG48zS", host='localhost')

statuses = etl.fromdb(connection, 'SELECT * FROM status_ts')
ts_types = etl.fromdb(connection, 'SELECT * FROM ts_types')

# Вспомогательные функции
# Определяем фильтр для исключения строк с пустыми значениями
row_without_nones = lambda x: all(x[field] != '' for field in statuses.fieldnames())
# Перевод отметки времени в формат datetime
to_datetime = lambda x: dt.fromtimestamp(int(x))

Чтобы исключить строки с пропусками, используем функцию `select` и определенный выше фильтр `row_without_nones`

In [18]:
statuses.select(row_without_nones)

Начало,Окончание,id ТС,Состояние,Максимальная скорость,Средняя скорость,Пробег
1555202473,1555205804,1,Поездка,45.0,15.0,12.49
1555217924,1555222978,9,Поездка,45.0,15.0,18.95
1555227223,1555233071,4,Отсутствие данных,39.0,13.0,19.01
1555232810,1555241522,6,Остановка,33.0,11.0,23.96
1555233670,1555241008,3,Заправка,21.0,7.0,12.84


Переведём столбцы со временем в требуемый формат. Для этого необходимо воспользоваться функцией `convert`.

Сразу можем добавить расчёт продолжительности функцией `addfield`.  

In [19]:
statuses.\
    convert('Начало', to_datetime).\
    convert('Окончание', to_datetime).\
    addfield('Продолжительность', lambda x: x['Окончание'] - x['Начало'])

Начало,Окончание,id ТС,Состояние,Максимальная скорость,Средняя скорость,Пробег,Продолжительность
2019-04-14 00:41:13,2019-04-14 01:36:44,1,Поездка,45.0,15.0,12.49,0:55:31
2019-04-14 04:58:44,2019-04-14 06:22:58,9,Поездка,45.0,15.0,18.95,1:24:14
2019-04-14 07:33:43,2019-04-14 09:11:11,4,Отсутствие данных,39.0,13.0,19.01,1:37:28
2019-04-14 09:06:50,2019-04-14 11:32:02,6,Остановка,33.0,11.0,23.96,2:25:12
2019-04-14 09:21:10,2019-04-14 11:23:28,3,Заправка,21.0,7.0,12.84,2:02:18


Объединим обе таблицы и выберем данные только по бульдозерам, сразу уберём строки с состоянием "Отсутствие данных".

In [20]:
statuses.\
    join(ts_types, lkey='id ТС', rkey='id').\
    select(lambda x: 'Бульдозер' in x['Тип ТС'] and x['Состояние'] != 'Отсутствие данных')

Начало,Окончание,id ТС,Состояние,Максимальная скорость,Средняя скорость,Пробег,Тип ТС
1555202473,1555205804,1,Поездка,45.0,15.0,12.49,Бульдозер колесный
1555248579,1555249401,1,Заправка,56.0,19.0,3.9,Бульдозер колесный
1555256061,1555257832,1,Заправка,,17.0,7.53,Бульдозер колесный
1555259761,1555266553,1,Стоянка,25.0,8.0,13.58,Бульдозер колесный
1555261083,1555272591,1,Заправка,11.0,4.0,11.51,Бульдозер колесный


Все перечисленные операции можно произвести за раз, сформируем цепочку функций. 

Заметим, что столбец `id ТС` уже не требуется, его можно убрать функцией `cutout`.

В дополнение ко всему отсортируем таблицу по времени начала состояний, применив `sort`.

In [21]:
result_df = statuses.\
    join(ts_types, lkey='id ТС', rkey='id').\
    select(lambda x: 'Бульдозер' in x['Тип ТС'] and x['Состояние'] != 'Отсутствие данных').\
    select(row_without_nones).\
    convert('Начало', to_datetime).\
    convert('Окончание', to_datetime).\
    addfield('Продолжительность', lambda x: x['Окончание'] - x['Начало']).\
    convert('Начало', str).convert('Окончание', str).convert('Продолжительность', str).\
    cutout('id ТС').\
    sort('Начало')

In [22]:
result_df

Начало,Окончание,Состояние,Максимальная скорость,Средняя скорость,Пробег,Тип ТС,Продолжительность
2019-04-14 00:41:13,2019-04-14 01:36:44,Поездка,45.0,15.0,12.49,Бульдозер колесный,0:55:31
2019-04-14 13:29:39,2019-04-14 13:43:21,Заправка,56.0,19.0,3.9,Бульдозер колесный,0:13:42
2019-04-14 16:24:02,2019-04-14 19:06:27,Остановка,13.0,4.0,9.74,Бульдозер гусеничный,2:42:25
2019-04-14 16:36:01,2019-04-14 18:29:13,Стоянка,25.0,8.0,13.58,Бульдозер колесный,1:53:12
2019-04-14 16:58:03,2019-04-14 20:09:51,Заправка,11.0,4.0,11.51,Бульдозер колесный,3:11:48


In [23]:
# Импортируем библиотеку, позволяющую создавать таблицы в БД
import sqlalchemy as db

# Подготовим подключение
_user = 'etl'
_pass = 'xzdp9PGRLTTG48zS'
_host = 'localhost'
_port = 5432
target_db = db.create_engine(f"postgres://{_user}:{_pass}@{_host}:{_port}/etl")

result_df.todb(target_db, 'status_cleaned', create=True, drop=True, sample=0)

Проверим, что таблица создалась.

In [24]:
etl.fromdb(connection, 'SELECT * FROM status_cleaned')

Начало,Окончание,Состояние,Максимальная скорость,Средняя скорость,Пробег,Тип ТС,Продолжительность
2019-04-14 00:41:13,2019-04-14 01:36:44,Поездка,45.0,15.0,12.49,Бульдозер колесный,0:55:31
2019-04-14 13:29:39,2019-04-14 13:43:21,Заправка,56.0,19.0,3.9,Бульдозер колесный,0:13:42
2019-04-14 16:24:02,2019-04-14 19:06:27,Остановка,13.0,4.0,9.74,Бульдозер гусеничный,2:42:25
2019-04-14 16:36:01,2019-04-14 18:29:13,Стоянка,25.0,8.0,13.58,Бульдозер колесный,1:53:12
2019-04-14 16:58:03,2019-04-14 20:09:51,Заправка,11.0,4.0,11.51,Бульдозер колесный,3:11:48
