In [1]:
import requests
import json
import pandas as pd

### Task 1
#### You need to research data from Itunes API of TOP paid applications in Russia for next tasks:
- define the model of data by spliting them to main data - some kind of daily rate of definite market research and guides which could present additional attributes of main data with relation's information for storing into DB
- make presentation of which tool you choose for scheduled ETL process into DB



В тексте задачи сказано, что мы собираем некоторое количество ежедневных данных для исследования рынка. Предположим, что мы хотим смотреть на динамический рейтинг 100 самых популярных платных продуктов в iTunes. Для этого нам нужно систематически отправлять запрос к API с определенными промежутками времени. Для этой цели я предлагаю использовать `Kafka`. Далее с помощью запланированных заданий в `AirFlow` данные импортируются и предобрабатыватся. Сырые данные в табличной форме записываются в файловое хранилище в формате `.parquet`.
Затем трансформируются в набор таблиц, которые сохраняются в БД. На выходе мы можем автоматизировать выгрузку запросов из БД или работать с даннами в ручном режиме. 

![etl_pipeline.jpg](https://i.ibb.co/M7Jw520/etl-pipeline-upd.jpg)

Первый этап - выгрузка данных. Для разработки базы данных я скачаю их напрямую.

In [2]:
url = 'https://rss.applemarketingtools.com/api/v2/ru/apps/top-paid/100/apps.json'

In [3]:
response = requests.get(url)
print(response.status_code)

200


In [4]:
json_data = response.json()

In [5]:
def get_value(key: str, obj: object) -> object:
    """Достает значения из поля объекта json
    """
    
    if obj == None:
        return None
    else:
        if key in obj:
            return obj[key]
        
        if isinstance(obj, dict) or isinstance(obj, list):
            for k, v in obj.items():

                if isinstance(v, dict):
                    return get_value(key, v)
                
                elif isinstance(v, list):
                    for elm in range(len(v)):
                        return get_value(key, v[elm-1])

In [6]:
results = get_value('results', json_data)
df = pd.DataFrame.from_dict(results)
print(df.shape)

(99, 8)


На этапе первичной предобработки данные обогащаются дополнительными признаками: ранг и время выгрузки, - и сохраняются в `.parquet` для последующего хранения.

In [7]:
import datetime 

# добавим признак `rank` - значение рейтинга для каждой позиции
rank = [i for i in range(1, len(results)+1)]
df['rank'] = rank

# и дату выгрузки
df['date'] = datetime.datetime.now() 

print(df.shape)
df.head(2)

(99, 10)


Unnamed: 0,artistName,id,name,releaseDate,kind,artworkUrl100,genres,url,rank,date
0,Dmitry Filinsky,856861890,Антирадар HUD Speed Pro,2014-04-17,apps,https://is2-ssl.mzstatic.com/image/thumb/Purpl...,[],https://apps.apple.com/ru/app/%D0%B0%D0%BD%D1%...,1,2023-06-21 19:54:08.968204
1,Threema GmbH,578665578,Threema. Безопасный мессенджер,2012-12-28,apps,https://is4-ssl.mzstatic.com/image/thumb/Purpl...,[],https://apps.apple.com/ru/app/threema-%D0%B1%D...,2,2023-06-21 19:54:08.968204


In [8]:
# сохраняем данные на сервер 

import os

raw_data = './raw_data'
os.makedirs(raw_data, exist_ok=True)

df.to_parquet(os.path.join(raw_data, 'itunes_top_paid_100_apps.parquet'), index=False)

На втором этапе данные выгружаются из хранилища, нормализуются и разбиваются на таблицы фактов и расширений в соответствии со схемой базы данных.

![db_schema.jpg](https://i.ibb.co/7KZWJq1/bd-schema.png)

In [9]:
# загружаем сохраненный датасет

df = pd.read_parquet('raw_data/itunes_top_paid_100_apps.parquet')
print(df.shape)
df.head(2)

(99, 10)


Unnamed: 0,artistName,id,name,releaseDate,kind,artworkUrl100,genres,url,rank,date
0,Dmitry Filinsky,856861890,Антирадар HUD Speed Pro,2014-04-17,apps,https://is2-ssl.mzstatic.com/image/thumb/Purpl...,[],https://apps.apple.com/ru/app/%D0%B0%D0%BD%D1%...,1,2023-06-21 19:54:08.968204
1,Threema GmbH,578665578,Threema. Безопасный мессенджер,2012-12-28,apps,https://is4-ssl.mzstatic.com/image/thumb/Purpl...,[],https://apps.apple.com/ru/app/threema-%D0%B1%D...,2,2023-06-21 19:54:08.968204


In [10]:
# product_rating

product_id = df['id'].to_list()

df_product_rating = pd.DataFrame({
    'product_id': product_id,
    'rank': df['rank'].to_list(),
    'date': df['date'].to_list(),
})

print(len(df_product_rating))
df_product_rating.head(2)

99


Unnamed: 0,product_id,rank,date
0,856861890,1,2023-06-21 19:54:08.968204
1,578665578,2,2023-06-21 19:54:08.968204


In [11]:
# author - отбираем уникальных авторов
# для добавления новых строк в уже существующую таблицу нужно будет обновить функцию генерации `author_id`, 
# чтобы обеспечить уникальность

df_author = pd.DataFrame({
    'author_id': [i for i in range(1, len(set(df['artistName'].to_list()))+1)],
    'name': list(set(df['artistName'].to_list())),
})

print(len(df_author))
df_author.head(2)

90


Unnamed: 0,author_id,name
0,1,ARAMAIS AYRAPETYAN
1,2,Pixelmator Team


In [12]:
# kind - отбираем уникальные значения
# при добавлении наовых порций данных надо следить, чтобы в справочник попадали только новые и уникальные

df_kind = pd.DataFrame({
    'kind_id': [i for i in range(1, len(set(df['kind'].to_list()))+1)],
    'name': list(set(df['kind'].to_list())),
})
print(len(df_kind))
df_kind.head()

1


Unnamed: 0,kind_id,name
0,1,apps


In [13]:
# genre - преобразуем список словарей в справочник уникальных жанров
# при добавлении новых порций данных надо следить, чтобы в справочник попадали только новые и уникальные

tmp = df.copy()
tmp['genre_id'] = tmp['genres'].apply(lambda x: [get_value('genreId', el) for el in x if len(x) > 0])
tmp['name'] = tmp['genres'].apply(lambda x: [get_value('name', el) for el in x if len(x) > 0])
tmp['url'] = tmp['genres'].apply(lambda x: [get_value('url', el) for el in x if len(x) > 0])

tmp_g = tmp[['id', 'genre_id', 'name', 'url']].explode(['genre_id', 'name', 'url'])

df_genre = tmp_g.drop_duplicates().dropna().reset_index(drop=True)
print(len(df_genre))
df_genre.head(2)

174


Unnamed: 0,id,genre_id,name,url
0,866450515,6007,Производительность,https://itunes.apple.com/ru/genre/id6007
1,866450515,6012,Образ жизни,https://itunes.apple.com/ru/genre/id6012


In [14]:
# product - содержит основную информацию о продукте и внешние ключи для доступа к расширениям
# итоговое количество строк стало больше 100 из-за нормализации данных 

df_product = pd.DataFrame({
    'product_id': product_id,
    'name': df['name'].to_list(), 
    'release_date': df['releaseDate'].to_list(),
    'url': df['url'].to_list(),
    'art_url': df['artworkUrl100'].to_list(),
})

tmp = df[['id', 'artistName', 'kind']].copy()
tmp = tmp.merge(df_author, left_on='artistName', right_on='name', how='left')
tmp = tmp.merge(df_kind, left_on='kind', right_on='name', how='left')
tmp = tmp.merge(df_genre[['id', 'genre_id']], on='id', how='left')
tmp = tmp[['id', 'author_id', 'genre_id', 'kind_id']].rename(columns={'id': 'product_id'})

df_product = tmp.merge(df_product, on='product_id', how='left')
print(len(df_product))
df_product.head(2)

184


Unnamed: 0,product_id,author_id,genre_id,kind_id,name,release_date,url,art_url
0,856861890,62,,1,Антирадар HUD Speed Pro,2014-04-17,https://apps.apple.com/ru/app/%D0%B0%D0%BD%D1%...,https://is2-ssl.mzstatic.com/image/thumb/Purpl...
1,578665578,81,,1,Threema. Безопасный мессенджер,2012-12-28,https://apps.apple.com/ru/app/threema-%D0%B1%D...,https://is4-ssl.mzstatic.com/image/thumb/Purpl...


In [15]:
# удаляем дубликаты в таблице с жанрами
df_genre = df_genre.drop('id', axis=1).drop_duplicates().reset_index(drop=True)
print(len(df_genre))
df_genre.head(2)

28


Unnamed: 0,genre_id,name,url
0,6007,Производительность,https://itunes.apple.com/ru/genre/id6007
1,6012,Образ жизни,https://itunes.apple.com/ru/genre/id6012


В итоге в БД мы загружаем 5 таблиц:
- `product_rating` с данными о рейтинге для каждого продукта на момент выгрузки топа;
- `product` с информацией о продуктах и внешними ключами к другим расширениям;
- `author` с уникальными авторами;
- `genre` с уникальными жанрами и данными о них;
- `kind` с типом приложения. 

После того, как первичная порция данных будет загружена, в БД информация будет сохранятся неравномерно. На каждой итерации заливки данных в БД мы будем добавлять всю партицию в таблицу `product_rating`, а в остальные - только уникальные, т.е. если этих данных в таблице еще нет. 